Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

"""This module provides a base interface class.""" 

 

from os import (path, getcwd) 

import csv 

import warnings 

 

from pydgilib_extra.dgilib_data import InterfaceData 

from pydgilib_extra.dgilib_extra_exceptions import InterfaceNotAvailableError 

from pydgilib_extra.dgilib_extra_config import POLLING 

 

 

class DGILibInterface(object): 

"""Provides a base interface class.""" 

 

interface_id = -1 

name = "interface_name" 

csv_header = ["timestamp", "value"] 

file_name_base = "log" 

polling_type = POLLING 

 

@staticmethod 

def _csv_reader_map(row): return (float(row[0]), float(row[1])) 

 

def __init__(self, *args, **kwargs): 

"""Instantiate DGILibInterface object.""" 

self.file_handle = None 

self.csv_writer = None 

# Argument parsing 

self.dgilib_extra = kwargs.get( 

"dgilib_extra", args[0] if args else None) 

self.verbose = kwargs.get("verbose", 0) 

if "file_name_base" in kwargs: 

self.file_name_base = kwargs["file_name_base"] 

# Set interface configuration 

if self.dgilib_extra is not None: 

self.set_config(*args, **kwargs) 

 

def get_config(self): 

"""get_config 

 

Get configuration options. 

 

Returns 

------- 

dict 

Configuration dictionary 

""" 

# Return the configuration 

return None 

 

def set_config(self, *args, **kwargs): 

"""Set configuration options.""" 

pass 

 

def enable(self): 

"""enable 

 

Enable the interface. 

""" 

if self.interface_id not in self.dgilib_extra.available_interfaces: 

raise InterfaceNotAvailableError( 

f"Interface {self.interface_id} not available. Available " + 

f"interfaces: {self.dgilib_extra.available_interfaces}") 

if self.interface_id not in self.dgilib_extra.enabled_interfaces: 

self.dgilib_extra.interface_enable(self.interface_id) 

self.dgilib_extra.enabled_interfaces.append(self.interface_id) 

 

def disable(self): 

"""disable 

 

Disable the interface. 

""" 

if self.interface_id in self.dgilib_extra.enabled_interfaces: 

self.dgilib_extra.interface_disable(self.interface_id) 

self.dgilib_extra.enabled_interfaces.remove(self.interface_id) 

 

def read(self, *args, **kwargs): 

"""read 

 

Read data from the interface. 

 

Returns 

------- 

InterfaceData 

 

""" 

# Return the data 

return None 

 

def write(self, *args, **kwargs): 

"""write 

 

Write data to the interface (currently unimplemented). 

 

""" 

pass 

 

def init_csv_writer(self, log_folder=getcwd(), newline='', mode='w'): 

""" 

init_csv_writer 

""" 

# Open file handle 

self.file_handle = open(path.join( 

log_folder, (self.file_name_base + '_' + self.name + ".csv")), 

mode, newline=newline) 

# Create csv.writer 

self.csv_writer = csv.writer(self.file_handle) 

# Write header to file 

self.csv_writer.writerow(self.csv_header) 

 

def close_csv_writer(self): 

""" 

close_csv_writer 

""" 

# Close file handle 

self.file_handle.close() 

 

def csv_write_rows(self, interface_data): 

""" 

csv_write_rows 

""" 

self.csv_writer.writerows(interface_data) 

 

def csv_read_file(self, file_path=None, newline='', mode='r'): 

""" 

csv_read_file 

""" 

if file_path is None: 

file_path = path.join( 

getcwd(), (self.file_name_base + '_' + self.name + ".csv")) 

with open(path.join(file_path), mode, newline=newline) as csv_file: 

interface_data = InterfaceData() 

reader = csv.reader(csv_file) 

header = next(reader) 

if header != self.csv_header: 

warnings.warn( 

f"Header of .csv file did not match expected value. Got " 

f"{header}, expected {self.csv_header}, file: " 

f"{path.join(file_path)}.") 

for row in reader: 

interface_data += self._csv_reader_map(row) 

return interface_data