"""This module provides classes to store DGILib Logger Interface Data."""
from pydgilib.dgilib_config import (
INTERFACE_GPIO)
from pydgilib_extra.dgilib_extra_config import (INTERFACES, INTERFACE_POWER)
[docs]class InterfaceData(object):
"""Class to store DGILib Logger Interface Data."""
__slots__ = ['timestamps', 'values']
def __init__(self, *args):
"""Take tuple of timestamps and values."""
if (not args):
self.timestamps = []
self.values = []
elif (len(args) == 1 and isinstance(args[0], InterfaceData)):
self = args[0]
elif (len(args) == 1 and valid_interface_data(args[0])):
self.timestamps, self.values = args[0]
elif (len(args) == 2 and isinstance(args[0], list) and
isinstance(args[1], list)):
self.timestamps, self.values = args
elif (len(args) == 2 and isinstance(args[0], (int, float)) and
isinstance(args[1], (int, float, list))):
self.timestamps = [args[0]]
self.values = [args[1]]
else:
raise ValueError(
f"Samples passed to InterfaceData must be tuple([],[]) or "
"timestamps, values or InterfaceData. Got {args}")
[docs] def __iadd__(self, interface_data):
"""Append new interface_data (in-place).
Used to provide `interface_data += interface_data1` syntax
"""
if isinstance(interface_data, InterfaceData):
self.timestamps.extend(interface_data.timestamps)
self.values.extend(interface_data.values)
else:
assert valid_interface_data(
interface_data), f"Samples passed to InterfaceData were not " \
"valid_interface_data. {interface_data}"
if isinstance(interface_data[0], list):
self.timestamps.extend(interface_data[0])
self.values.extend(interface_data[1])
else:
self.timestamps.extend([interface_data[0]])
self.values.extend([interface_data[1]])
return self
[docs] def __add__(self, interface_data):
"""Append new interface_data (copy).
Used to provide `interface_data2 = interface_data1 + interface_data`
syntax
"""
data = InterfaceData()
data += self
data += interface_data
return data
[docs] def append(self, interface_data):
"""Append interface_data."""
return self.extend(interface_data)
[docs] def extend(self, interface_data):
"""Append a list of interface_data."""
if isinstance(interface_data, InterfaceData):
self += interface_data
else:
self += InterfaceData(interface_data)
return self
[docs] def __len__(self):
"""Get the number of samples."""
if len(self.timestamps):
return len(self.timestamps)
else:
return 0
[docs] def __getitem__(self, index):
"""Get item.
Used to provide `timestamp, value = interface_data[5]` and
`timestamp, value = interface_data[2:5]` syntax
"""
return (self.timestamps[index], self.values[index])
[docs] def __contains__(self, item):
"""Contains.
Used to provide `([1], [2]) in interface_data` syntax
"""
return all(any(item_timestamp == self_timestamp and
item_value == self_value
for self_timestamp, self_value in self)
for item_timestamp, item_value in (
item if isinstance(item, InterfaceData) else
InterfaceData(item)))
[docs] def __str__(self):
"""Print data.
Used to provide `str(data)` syntax.
"""
return str(tuple(self))
[docs] def get_select_in_value(self, begin=0, end=None, start_time=None,
end_time=None):
"""
get_select_in_value.
Use to slice a items in the values when the values are iterables.
Keyword Arguments:
begin {int} -- Begin index of item in value (default: {0})
end {[type]} -- End index of item in value, if not supplied only
the item the begin index will be returned (default: {None})
start_time {[type]} -- Start time of selection (default: {None})
end_time {[type]} -- End time of selection (default: {None})
Returns:
[list] -- List of values that have timestamps between start_time
and end_time
"""
if start_time is None and end_time is None:
return [value[begin] for value in self.values] if end is None \
else [value[begin:end] for value in self.values]
start_index = 0
end_index = len(self.timestamps) - 1
# Get the index of the first sample after the start_time
if start_time is not None:
start_index = self.get_index(start_time)
if end_time is not None:
end_index = self.get_index(end_time, start_index)
return [value[begin] for value in self.values[start_index:end_index]] \
if end is None else \
[value[begin:end] for value in self.values[start_index:end_index]]
[docs] def get_index(self, timestamp, start_index=0):
"""Get the index of the first sample after the timestamp."""
index = start_index # Start at start_index (can speed up search)
max_index = len(self.timestamps) - 1
while index < max_index and self.timestamps[index] < timestamp:
index += 1
return index
[docs]class LoggerData(dict):
"""Class to store DGILib Logger Data."""
# __slots__ = [INTERFACE_GPIO, INTERFACE_POWER]
def __init__(self, *args, **kwargs):
"""Take list of interfaces for the data."""
# Call init function of dict
super().__init__(self)
# No args or kwargs were specified, populate args[0] with standard
# interfaces
if not args and not kwargs:
args = [[INTERFACE_GPIO, INTERFACE_POWER]]
# Args is list of interfaces, make data dict with interfaces as keys
# and tuples of lists as values
if args and isinstance(args[0], list):
for interface in args[0]:
self[interface] = InterfaceData()
# Instantiate dict with arguments
else:
self.update(*args, **kwargs)
for interface, interface_data in self.items():
if not isinstance(interface_data, InterfaceData):
self[interface] = InterfaceData(interface_data)
[docs] def __getattr__(self, attr):
"""Get attribute.
Used to provide `data.spi` syntax.
"""
return self[INTERFACES[attr]]
[docs] def __setattr__(self, attr, value):
"""Set attribute.
Used to provide `data.spi = InterfaceData(([], []))` syntax.
"""
self[INTERFACES[attr]] = value
[docs] def __iadd__(self, logger_data):
"""Append new logger_data (in-place).
Used to provide `logger_data1 += logger_data` syntax
"""
if not isinstance(logger_data, dict):
raise ValueError(
f"logger_data must be a dict or LoggerData. "
"Got {type(logger_data)}")
for interface, interface_data in logger_data.items():
self.extend(interface, interface_data)
return self
[docs] def __add__(self, logger_data):
"""Append new logger_data (copy).
Used to provide `logger_data2 = logger_data1 + logger_data` syntax
"""
data = LoggerData()
data += self
data += logger_data
return data
def __getstate__(self):
return self
def __setstate__(self, state):
return LoggerData(state)
# def __copy__(self):
# return self
[docs] def append(self, interface, interface_data):
"""Append a sample to one of the interfaces."""
return self.extend(interface, interface_data)
[docs] def extend(self, interface, interface_data):
"""Append a list of samples to one of the interfaces."""
if interface in self.keys():
self[interface].extend(interface_data)
elif not isinstance(interface_data, InterfaceData):
self[interface] = InterfaceData(interface_data)
else:
self[interface] = interface_data
return self
[docs] def length(self, attr=None):
"""Compute the number of samples for the interfaces.
Return a dict of the number of samples for each interface. Or the
length of the `attr` specified.
"""
if attr is None:
return {key: len(interface_data)
for key, interface_data in self.items()}
elif attr in self.keys():
return len(self[attr])
elif attr in INTERFACES.keys():
return len(self[INTERFACES[attr]])
else:
raise ValueError(
f"attr must be a named or numbered interface. Got {attr}")
[docs] def __str__(self):
"""Print data.
Used to provide `str(data)` syntax.
"""
output = "Interfaces:\n"
for interface in self.keys():
if interface in INTERFACES.values():
for name, number in INTERFACES.items():
if interface == number:
output += f"\t{number:4}: {name + ',':{' '}^{10}} samples: {len(self[interface]):7}\n"
break
else:
output += f"\t{interface:4}: (unknown), samples: {len(self[interface]):7}\n"
return output
# # Load CSV
# # Calculations
[docs]def valid_interface_data(samples):
"""Check if samples are valid InterfaceData."""
return (isinstance(samples, (tuple, list)) and
len(samples) == 2 and
all(isinstance(sample, (tuple, list, float, int)) for sample in samples) and
(isinstance(samples[0], float) or len(samples[0]) == len(samples[1])))