"""This module wraps the logging functionality for DGILibExtra."""
from os import getcwd
from time import time
from pydgilib_extra.dgilib_data import LoggerData
from pydgilib_extra.dgilib_extra_config import (
LOGGER_CSV, LOGGER_OBJECT, LOGGER_PLOT, FILE_NAME_BASE, POLLING, POWER)
from pydgilib_extra.dgilib_plot import DGILibPlot
[docs]class DGILibLogger(object):
"""Wraps the logging functionality for DGILibExtra.
Interfaces:
- GPIO mode
- Power mode
"""
def __init__(self, *args, **kwargs):
"""Instantiate DGILibInterfacePower object."""
self.dgilib_extra = kwargs.get(
"dgilib_extra", args[0] if args else None)
# Get enabled loggers
self.loggers = kwargs.get("loggers", self.dgilib_extra.default_loggers)
# Enable the csv logger if file_name_base or log_folder has been
# specified.
if LOGGER_CSV not in self.loggers and ("file_name_base" in kwargs or
"log_folder" in kwargs):
self.loggers.append(LOGGER_CSV)
# file_name_base - merely the optional base of the filename
# (Preferably leave standard).
# log_folder - where log files will be saved
self.file_name_base = kwargs.get("file_name_base", FILE_NAME_BASE)
self.log_folder = kwargs.get("log_folder", getcwd())
# Enable the plot logger if figure has been specified.
if (LOGGER_PLOT not in self.loggers and
("fig" in kwargs or "ax" in kwargs)):
self.loggers.append(LOGGER_PLOT)
# Set self.figure if LOGGER_PLOT enabled
# Create axes self.axes if LOGGER_PLOT is enabled
if LOGGER_PLOT in self.loggers:
self.plotobj = DGILibPlot(self.dgilib_extra, *args, **kwargs)
self.refresh_plot = self.plotobj.refresh_plot
self.plot_still_exists = self.plotobj.plot_still_exists
self.keep_plot_alive = self.plotobj.keep_plot_alive
# Force logging in object if logging in plot
if (LOGGER_OBJECT not in self.loggers):
self.loggers.append(LOGGER_OBJECT)
[docs] def start(self):
"""Call to start logging."""
if LOGGER_CSV in self.loggers:
for interface in self.dgilib_extra.interfaces.values():
interface.init_csv_writer(self.log_folder)
# Start the data polling
self.start_polling()
# Create data structure self.data if LOGGER_OBJECT is enabled
if LOGGER_OBJECT in self.loggers:
self.dgilib_extra.empty_data()
[docs] def update_callback(self, return_data=False):
"""Call to get new data."""
if return_data:
logger_data = LoggerData()
# Get data
for interface_id, interface in self.dgilib_extra.interfaces.items():
# Read from the interface
interface_data = interface.read()
# Check if any data has arrived
if interface_data:
if LOGGER_CSV in self.loggers:
interface.csv_write_rows(interface_data)
# Merge data into self.data if LOGGER_OBJECT is enabled
if LOGGER_OBJECT in self.loggers:
self.dgilib_extra.data[interface_id] += interface_data
# Update the plot if LOGGER_PLOT is enabled
if LOGGER_PLOT in self.loggers:
self.plotobj.update_plot(self.dgilib_extra.data)
if return_data:
logger_data[interface_id] += interface_data
# Return the data
if return_data:
return logger_data
[docs] def stop(self, return_data=False):
"""Call to stop logging."""
# Stop the data polling
self.stop_polling()
# Get last data from buffer
if LOGGER_OBJECT in self.loggers:
self.update_callback()
else:
data = self.update_callback(return_data)
# Close file handle
if LOGGER_CSV in self.loggers:
for interface in self.dgilib_extra.interfaces.values():
interface.close_csv_writer()
if LOGGER_OBJECT in self.loggers:
return self.dgilib_extra.data
elif return_data:
return data
[docs] def log(self, duration=10, stop_function=None, min_duration=0.2):
"""Run the logger for the specified amount of time.
Parameters
----------
duration : int
Amount of time to log data (default: `10`).
stop_function : callable
Function that will be evaluated on the
collected data. If it returns `True` the logging will be
stopped even if the duration has not been reached (default:
`None`).
Returns
-------
LoggerData
Returns the logged data as a :class:`LoggerData` object if
`LOGGER_OBJECT` was passed to the logger.
"""
self.start()
if LOGGER_PLOT in self.loggers:
# So that the plot has xmax (being time) as big as duration now
if self.plotobj is type(DGILibPlot):
self.plotobj.xmax = duration
cur_time = time()
end_time = cur_time + duration
min_time = cur_time + min_duration
if stop_function is None:
while time() < end_time:
self.update_callback()
elif LOGGER_OBJECT in self.loggers:
while cur_time < end_time:
self.update_callback()
if (cur_time > min_time) and \
stop_function(self.dgilib_extra.data):
break
cur_time = time()
else:
while cur_time < end_time:
if (cur_time > min_time) and \
stop_function(self.update_callback(True)):
break
cur_time = time()
self.stop()
if LOGGER_OBJECT in self.loggers:
return self.dgilib_extra.data
[docs] def which_polling(self, interface_ids=None):
"""which_polling
Determines on which polling types need to be started or stopped based
on the interface ids and instantiated interfaces in dgilib extra.
Parameters
----------
interface_ids : list
List of interface ids (default: all enabled interfaces)
Returns
-------
tuple(bool, bool, ...)
Tuple of bool that are `True` when that interface type
should be started or stopped
"""
if interface_ids is None:
interface_ids = self.dgilib_extra.enabled_interfaces
polling = power = False
for interface_id in interface_ids:
polling |= POLLING == \
self.dgilib_extra.interfaces[interface_id].polling_type
power |= POWER == \
self.dgilib_extra.interfaces[interface_id].polling_type
return polling, power
[docs] def start_polling(self, interface_ids=None):
"""start_polling
Starts polling on the specified interfaces. By default polling will be
started for all enabled interfaces.
Parameters
----------
interface_ids : list(int)
List of interface ids (default: all enabled interfaces)
"""
polling, power = self.which_polling(interface_ids)
if polling:
self.dgilib_extra.start_polling()
if power:
self.dgilib_extra.auxiliary_power_start()
[docs] def stop_polling(self, interface_ids=None):
"""stop_polling
Stops polling on the specified interfaces. By default polling will be
stopped for all enabled interfaces.
Parameters
----------
interface_ids : list(int)
List of interface ids (default: all enabled interfaces)
"""
polling, power = self.which_polling(interface_ids)
if polling:
self.dgilib_extra.stop_polling()
if power:
self.dgilib_extra.auxiliary_power_stop()