Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

270

271

272

273

274

275

276

277

278

279

280

281

282

283

284

285

"""This module provides classes to store DGILib Logger Interface Data.""" 

 

from pydgilib.dgilib_config import ( 

INTERFACE_GPIO) 

from pydgilib_extra.dgilib_extra_config import (INTERFACES, INTERFACE_POWER) 

 

 

class InterfaceData(object): 

"""Class to store DGILib Logger Interface Data.""" 

 

__slots__ = ['timestamps', 'values'] 

 

def __init__(self, *args): 

"""Take tuple of timestamps and values.""" 

if (not args): 

self.timestamps = [] 

self.values = [] 

elif (len(args) == 1 and isinstance(args[0], InterfaceData)): 

self = args[0] 

elif (len(args) == 1 and valid_interface_data(args[0])): 

self.timestamps, self.values = args[0] 

elif (len(args) == 2 and isinstance(args[0], list) and 

isinstance(args[1], list)): 

self.timestamps, self.values = args 

elif (len(args) == 2 and isinstance(args[0], (int, float)) and 

isinstance(args[1], (int, float, list))): 

self.timestamps = [args[0]] 

self.values = [args[1]] 

else: 

raise ValueError( 

f"Samples passed to InterfaceData must be tuple([],[]) or " 

"timestamps, values or InterfaceData. Got {args}") 

 

def __iadd__(self, interface_data): 

"""Append new interface_data (in-place). 

 

Used to provide `interface_data += interface_data1` syntax 

""" 

if isinstance(interface_data, InterfaceData): 

self.timestamps.extend(interface_data.timestamps) 

self.values.extend(interface_data.values) 

else: 

assert valid_interface_data( 

interface_data), f"Samples passed to InterfaceData were not " \ 

"valid_interface_data. {interface_data}" 

if isinstance(interface_data[0], list): 

self.timestamps.extend(interface_data[0]) 

self.values.extend(interface_data[1]) 

else: 

self.timestamps.extend([interface_data[0]]) 

self.values.extend([interface_data[1]]) 

return self 

 

def __add__(self, interface_data): 

"""Append new interface_data (copy). 

 

Used to provide `interface_data2 = interface_data1 + interface_data` 

syntax 

""" 

data = InterfaceData() 

data += self 

data += interface_data 

return data 

 

def append(self, interface_data): 

"""Append interface_data.""" 

return self.extend(interface_data) 

 

def extend(self, interface_data): 

"""Append a list of interface_data.""" 

if isinstance(interface_data, InterfaceData): 

self += interface_data 

else: 

self += InterfaceData(interface_data) 

return self 

 

def __len__(self): 

"""Get the number of samples.""" 

if len(self.timestamps): 

return len(self.timestamps) 

else: 

return 0 

 

def __getitem__(self, index): 

"""Get item. 

 

Used to provide `timestamp, value = interface_data[5]` and 

`timestamp, value = interface_data[2:5]` syntax 

""" 

return (self.timestamps[index], self.values[index]) 

 

def __contains__(self, item): 

"""Contains. 

 

Used to provide `([1], [2]) in interface_data` syntax 

""" 

return all(any(item_timestamp == self_timestamp and 

item_value == self_value 

for self_timestamp, self_value in self) 

for item_timestamp, item_value in ( 

item if isinstance(item, InterfaceData) else 

InterfaceData(item))) 

 

def __str__(self): 

"""Print data. 

 

Used to provide `str(data)` syntax. 

""" 

return str(tuple(self)) 

 

def get_select_in_value(self, begin=0, end=None, start_time=None, 

end_time=None): 

""" 

get_select_in_value. 

 

Use to slice a items in the values when the values are iterables. 

 

Keyword Arguments: 

begin {int} -- Begin index of item in value (default: {0}) 

end {[type]} -- End index of item in value, if not supplied only 

the item the begin index will be returned (default: {None}) 

start_time {[type]} -- Start time of selection (default: {None}) 

end_time {[type]} -- End time of selection (default: {None}) 

 

Returns: 

[list] -- List of values that have timestamps between start_time 

and end_time 

""" 

 

if start_time is None and end_time is None: 

return [value[begin] for value in self.values] if end is None \ 

else [value[begin:end] for value in self.values] 

 

start_index = 0 

end_index = len(self.timestamps) - 1 

# Get the index of the first sample after the start_time 

if start_time is not None: 

start_index = self.get_index(start_time) 

if end_time is not None: 

end_index = self.get_index(end_time, start_index) 

 

return [value[begin] for value in self.values[start_index:end_index]] \ 

if end is None else \ 

[value[begin:end] for value in self.values[start_index:end_index]] 

 

def get_index(self, timestamp, start_index=0): 

"""Get the index of the first sample after the timestamp.""" 

index = start_index # Start at start_index (can speed up search) 

max_index = len(self.timestamps) - 1 

while index < max_index and self.timestamps[index] < timestamp: 

index += 1 

return index 

 

 

class LoggerData(dict): 

"""Class to store DGILib Logger Data.""" 

 

# __slots__ = [INTERFACE_GPIO, INTERFACE_POWER] 

 

def __init__(self, *args, **kwargs): 

"""Take list of interfaces for the data.""" 

# Call init function of dict 

super().__init__(self) 

# No args or kwargs were specified, populate args[0] with standard 

# interfaces 

if not args and not kwargs: 

args = [[INTERFACE_GPIO, INTERFACE_POWER]] 

# Args is list of interfaces, make data dict with interfaces as keys 

# and tuples of lists as values 

if args and isinstance(args[0], list): 

for interface in args[0]: 

self[interface] = InterfaceData() 

# Instantiate dict with arguments 

else: 

self.update(*args, **kwargs) 

 

for interface, interface_data in self.items(): 

if not isinstance(interface_data, InterfaceData): 

self[interface] = InterfaceData(interface_data) 

 

def __getattr__(self, attr): 

"""Get attribute. 

 

Used to provide `data.spi` syntax. 

""" 

return self[INTERFACES[attr]] 

 

def __setattr__(self, attr, value): 

"""Set attribute. 

 

Used to provide `data.spi = InterfaceData(([], []))` syntax. 

""" 

self[INTERFACES[attr]] = value 

 

def __iadd__(self, logger_data): 

"""Append new logger_data (in-place). 

 

Used to provide `logger_data1 += logger_data` syntax 

""" 

if not isinstance(logger_data, dict): 

raise ValueError( 

f"logger_data must be a dict or LoggerData. " 

"Got {type(logger_data)}") 

for interface, interface_data in logger_data.items(): 

self.extend(interface, interface_data) 

return self 

 

def __add__(self, logger_data): 

"""Append new logger_data (copy). 

 

Used to provide `logger_data2 = logger_data1 + logger_data` syntax 

""" 

data = LoggerData() 

data += self 

data += logger_data 

return data 

 

def __getstate__(self): 

return self 

 

def __setstate__(self, state): 

return LoggerData(state) 

 

# def __copy__(self): 

# return self 

 

def append(self, interface, interface_data): 

"""Append a sample to one of the interfaces.""" 

return self.extend(interface, interface_data) 

 

def extend(self, interface, interface_data): 

"""Append a list of samples to one of the interfaces.""" 

if interface in self.keys(): 

self[interface].extend(interface_data) 

elif not isinstance(interface_data, InterfaceData): 

self[interface] = InterfaceData(interface_data) 

else: 

self[interface] = interface_data 

return self 

 

def length(self, attr=None): 

"""Compute the number of samples for the interfaces. 

 

Return a dict of the number of samples for each interface. Or the 

length of the `attr` specified. 

""" 

if attr is None: 

return {key: len(interface_data) 

for key, interface_data in self.items()} 

elif attr in self.keys(): 

return len(self[attr]) 

elif attr in INTERFACES.keys(): 

return len(self[INTERFACES[attr]]) 

else: 

raise ValueError( 

f"attr must be a named or numbered interface. Got {attr}") 

 

def __str__(self): 

"""Print data. 

 

Used to provide `str(data)` syntax. 

""" 

output = "Interfaces:\n" 

for interface in self.keys(): 

if interface in INTERFACES.values(): 

for name, number in INTERFACES.items(): 

if interface == number: 

output += f"\t{number:4}: {name + ',':{' '}^{10}} samples: {len(self[interface]):7}\n" 

break 

else: 

output += f"\t{interface:4}: (unknown), samples: {len(self[interface]):7}\n" 

 

return output 

 

# # Load CSV 

 

# # Calculations 

 

 

def valid_interface_data(samples): 

"""Check if samples are valid InterfaceData.""" 

return (isinstance(samples, (tuple, list)) and 

len(samples) == 2 and 

all(isinstance(sample, (tuple, list, float, int)) for sample in samples) and 

(isinstance(samples[0], float) or len(samples[0]) == len(samples[1])))