Source code for pydgilib_extra.dgilib_calculations


"""This module holds the functions that do calculations on Interface Data."""

import warnings

from pydgilib_extra.dgilib_extra_config import NUM_PINS


[docs]class StreamingCalculation(object): def __init__(self): self.data = [] self.index = 0
[docs]class GPIOAugmentEdges(StreamingCalculation): """GPIO Augment Edges."""
[docs] def gpio_augment_edges(self, gpio_data, delay_time=0, switch_time=0): """GPIO Augment Edges (streaming). Augments the edges of the GPIO data by inserting an extra sample of the previous pin values at moment before a switch occurs (minus switch_time ). The switch time is measured to be around 0.3 ms. Also delays all time stamps by delay_time. The delay time seems to vary a lot between different projects and should be manually specified for the best accuracy. Can insert the last datapoint again at the time specified (has to be after last sample). :param gpio_data: InterfaceData object of GPIO data. :type gpio_data: InterfaceData :param delay_time: Switch time of GPIO pin. :type delay_time: float :param switch_time: Switch time of GPIO pin. :type switch_time: float :return: InterfaceData object of augmented GPIO data. :rtype: InterfaceData """ if not len(self.data): self.data = [True] * NUM_PINS pin_states = self.data # iterate over the list and insert items at the same time: i = 0 while i < len(gpio_data): #print(gpio_data.timestamps[i], gpio_data.values[i]) if gpio_data.values[i] != pin_states: # This inserts a time sample at time + switch time (so moves # the time stamp into the future) gpio_data.timestamps.insert( i, gpio_data.timestamps[i] - switch_time) # This inserts the last datapoint again at the time the next # switch actually arrived (without switch time) gpio_data.values.insert(i, pin_states) i += 1 pin_states = gpio_data.values[i] i += 1 self.data = pin_states # Delay all time stamps by delay_time gpio_data.timestamps = [ t + delay_time for t in gpio_data.timestamps] return gpio_data
[docs]def power_and_time_per_pulse( logger_data, pin, start_time=0.01, end_time=float("Inf"), stop_function=None, initialized=False, pulse_direction=True): """Calculate power and time per pulse. Takes the data and a pin and returns a list of power and time sums for each pulse of the specified pin. NOTE: This returns time durations in the sampling frame of the power timestamps. Hence the list of times this function returns will not exactly match with the difference between rise timestamp and fall timestamp of the of the GPIO pin. :param data: LoggerData object. Needs to have GPIO and Power data. :type data: LoggerData :param pin: Number of the GPIO pin to be used. :type pin: int :param start_time: First timestamp to consider (defaults to 0.01 to skip GPIO initialization). :type start_time: float :param end_time: Last timestamp to consider. :type end_time: float :param stop_function: Function to evaluate on `pin_values`. If it returns True the loop will stop. :type stop_function: function :param initialized: If False: Skip first occurrences of all pins high :type initialized: bool :param pulse_direction: If True: detect pulse as False -> True -> False, else detect pulse as True -> False -> True :type pulse_direction: bool :return: List of list of power and time sums. :rtype: tuple(list(float), list(float)) """ pin_value = False pulse_start_time = 0 pulse_end_time = 0 charges = [] times = [] power_index = 0 # Loop over all gpio samples for timestamp, pin_values in logger_data.gpio: # Skip all samples until initialization has finished if not initialized: if all(pin_values): continue else: initialized = True if stop_function is not None and stop_function(pin_values): break # Detect inside start and end time if timestamp > start_time and timestamp <= end_time: # Detect rising edge (if pulse_direction else falling edge) if not pin_value and (pin_values[pin] ^ (not pulse_direction)): pin_value = pulse_direction pulse_start_time = timestamp # Detect falling edge (if pulse_direction else rising edge) if pin_value and (pin_values[pin] ^ pulse_direction): pin_value = False pulse_end_time = timestamp # Get the index of the power sample corresponding to the # rising edge power_index = start_index = logger_data.power.get_index( pulse_start_time, power_index) # Get the index of the power sample corresponding to the # falling edge power_index = end_index = logger_data.power.get_index( pulse_end_time, power_index) # Make sure the start index is larger than 0 if start_index < 1: start_index = 1 warnings.warn( "Corrected a start_index of 0 in " + "power_and_time_per_pulse.") # Sum charges and append to charges array charges.append(sum(logger_data.power.values[i] * (logger_data.power.timestamps[i] - logger_data.power.timestamps[i-1]) for i in range(start_index, end_index))) times.append(logger_data.power.timestamps[end_index] - logger_data.power.timestamps[start_index]) # TODO: Check for off-by-one errors using a pytest testcases return charges, times
[docs]def rise_and_fall_times( logger_data, pin, start_time=0.01, end_time=float("Inf"), stop_function=None, initialized=False, pulse_direction=True): """rise_and_fall_times. [description] Arguments: logger_data {[type]} -- [description] pin {[type]} -- [description] Keyword Arguments: start_time {float} -- [description] (default: {0.01}) end_time {[type]} -- [description] (default: {float("Inf")}) stop_function {[type]} -- [description] (default: {None}) initialized {bool} -- [description] (default: {False}) pulse_direction {bool} -- [description] (default: {True}) Returns: [type] -- [description] """ pin_value = False rise_times = [] fall_times = [] # Loop over all gpio samples for timestamp, pin_values in logger_data.gpio: # Skip all samples until initialization has finished if not initialized: if all(pin_values): continue else: initialized = True if stop_function is not None and stop_function(pin_values): break # Detect inside start and end time if timestamp > start_time and timestamp <= end_time: # Detect rising edge (if pulse_direction else falling edge) if not pin_value and (pin_values[pin] ^ (not pulse_direction)): pin_value = pulse_direction rise_times.append(timestamp) # Detect falling edge (if pulse_direction else rising edge) if pin_value and (pin_values[pin] ^ pulse_direction): pin_value = False fall_times.append(timestamp) return rise_times, fall_times
[docs]def calculate_average(power_data, start_time=None, end_time=None, start_index=1): """Calculate average value of the power_data using the left Riemann sum. Arguments: power_data {InterfaceData} -- Instance of InterfaceData with power samples. Keyword Arguments: start_time {float} -- Timestamp of first sample to include (default: {None}) end_time {float} -- Timestamp to of last sample to include (default: {None}) start_index {int} -- Index to start search for start time at (default: {1}) Returns: [type] -- [description] """ if start_time is None: start_index = 1 else: start_index = power_data.get_index(start_time, start_index) if end_time is None: end_index = len(power_data) - 1 else: end_index = power_data.get_index(end_time, start_index) # Make sure the start index is larger than 0 if start_index < 1: start_index = 1 warnings.warn( "Corrected a start_index of 0 in calculate_average.") return (sum(power_data.values[i] * (power_data.timestamps[i] - power_data.timestamps[i - 1]) for i in range(start_index, end_index)) / (power_data.timestamps[end_index] - power_data.timestamps[start_index]))