Coverage for ibllib/pipes/sync_tasks.py: 95%

82 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-08 17:16 +0100

1import logging 

2 

3from ibllib.pipes import base_tasks 

4from ibllib.io.extractors.ephys_fpga import extract_sync 

5from iblutil.util import Bunch 

6 

7import spikeglx 

8 

9_logger = logging.getLogger('ibllib') 

10 

11 

12class SyncRegisterRaw(base_tasks.RegisterRawDataTask): 

13 """Task to register raw DAQ data. 

14 

15 Registers DAQ software output for a given device. The object should be _*_DAQdata, where the 

16 namespace identifies the DAQ model or acquisition software, e.g. 'mcc', 'ni' or 'ni-usb-6211'. 

17 At minimum there should be a raw data dataset of the form _*_DAQdata.raw*, e.g. 

18 '_mc_DAQdata.raw.pqt'. The following are optional attribute datasets: 

19 - _*_DAQdata.timestamps.npy: for timeline the timestamps array is separate from the samples. 

20 - _*_DAQdata.meta.json: for timeline all acquisition meta data (e.g. sample rate, channel 

21 names) are stored in a separate file. 

22 - _*_DAQdata.wiring.json: for SpikeGLX the channel map is stored in this file. 

23 _timeline_softwareEvents.log.htsv: UDP messages and other software events in DAQ time. 

24 """ 

25 priority = 100 

26 job_size = 'small' 

27 

28 @property 

29 def signature(self): 

30 signature = { 1jk

31 'input_files': [], 

32 'output_files': [(f'*DAQdata.raw.{self.sync_ext}', self.sync_collection, True), 

33 ('*DAQdata.timestamps.npy', self.sync_collection, False), 

34 ('*DAQdata.meta.json', self.sync_collection, False), 

35 ('*DAQdata.wiring.json', self.sync_collection, False), 

36 ('*softwareEvents.log.htsv', self.sync_collection, False)] 

37 } 

38 return signature 1jk

39 

40 

41class SyncMtscomp(base_tasks.DynamicTask): 

42 """ 

43 Task to rename, compress and register raw daq data with .bin format collected using NIDAQ 

44 """ 

45 

46 priority = 90 

47 cpu = 2 

48 io_charge = 30 # this jobs reads raw ap files 

49 job_size = 'small' 

50 

51 @property 

52 def signature(self): 

53 signature = { 1idfbca

54 'input_files': [('*.*bin', self.sync_collection, True), 

55 ('*.meta', self.sync_collection, True), 

56 ('*.wiring.json', self.sync_collection, True)], 

57 'output_files': [(f'_{self.sync_namespace}_DAQdata.raw.cbin', self.sync_collection, True), 

58 (f'_{self.sync_namespace}_DAQdata.raw.ch', self.sync_collection, True), 

59 (f'_{self.sync_namespace}_DAQdata.raw.meta', self.sync_collection, True), 

60 (f'_{self.sync_namespace}_DAQdata.wiring.json', self.sync_collection, True)] 

61 } 

62 return signature 1idfbca

63 

64 def _run(self): 

65 

66 out_files = [] 1dfbca

67 

68 # Detect the wiring file and rename (if it hasn't already been renamed) 

69 wiring_file = next(self.session_path.joinpath(self.sync_collection).glob('*.wiring.json'), None) 1dfbca

70 if wiring_file is not None: 1dfbca

71 if 'DAQdata.wiring' not in wiring_file.stem: 1dfbca

72 new_wiring_file = wiring_file.parent.joinpath(f'_{self.sync_namespace}_DAQdata.wiring.json') 1bc

73 wiring_file.replace(new_wiring_file) 1bc

74 else: 

75 new_wiring_file = wiring_file 1dfa

76 

77 out_files.append(new_wiring_file) 1dfbca

78 

79 # Search for .bin files in the sync_collection folder 

80 bin_file = next(self.session_path.joinpath(self.sync_collection).glob('*.*bin'), None) 1dfbca

81 

82 # If we don't have a .bin/ .cbin file anymore see if we can still find the .ch and .meta files 

83 if bin_file is None: 1dfbca

84 for ext in ['ch', 'meta']: 1f

85 ext_file = next(self.session_path.joinpath(self.sync_collection).glob(f'*.{ext}'), None) 1f

86 if ext_file is not None: 1f

87 if 'DAQdata.raw' not in ext_file.stem: 1f

88 new_ext_file = ext_file.parent.joinpath(f'_{self.sync_namespace}_DAQdata.raw{ext_file.suffix}') 1f

89 ext_file.replace(new_ext_file) 1f

90 else: 

91 new_ext_file = ext_file 1f

92 out_files.append(new_ext_file) 1f

93 

94 return out_files if len(out_files) > 0 else None 1f

95 

96 # If we do find the .bin file, compress files (only if they haven't already been compressed) 

97 sr = spikeglx.Reader(bin_file) 1dbca

98 if sr.is_mtscomp: 1dbca

99 sr.close() 1b

100 cbin_file = bin_file 1b

101 assert cbin_file.suffix == '.cbin' 1b

102 else: 

103 cbin_file = sr.compress_file() 1dca

104 sr.close() 1dca

105 bin_file.unlink() 1dca

106 

107 # Rename files (only if they haven't already been renamed) 

108 if 'DAQdata.raw' not in cbin_file.stem: 1dbca

109 new_bin_file = cbin_file.parent.joinpath(f'_{self.sync_namespace}_DAQdata.raw{cbin_file.suffix}') 1dbca

110 cbin_file.replace(new_bin_file) 1dbca

111 

112 meta_file = cbin_file.with_suffix('.meta') 1dbca

113 new_meta_file = new_bin_file.with_suffix('.meta') 1dbca

114 meta_file.replace(new_meta_file) 1dbca

115 

116 ch_file = cbin_file.with_suffix('.ch') 1dbca

117 new_ch_file = new_bin_file.with_suffix('.ch') 1dbca

118 ch_file.replace(new_ch_file) 1dbca

119 else: 

120 new_bin_file = cbin_file 

121 new_meta_file = cbin_file.with_suffix('.meta') 

122 new_ch_file = cbin_file.with_suffix('.ch') 

123 

124 out_files.append(new_bin_file) 1dbca

125 out_files.append(new_ch_file) 1dbca

126 out_files.append(new_meta_file) 1dbca

127 

128 return out_files 1dbca

129 

130 

131class SyncPulses(base_tasks.DynamicTask): 

132 """ 

133 Extract sync pulses from NIDAQ .bin / .cbin file 

134 N.B Only extracts sync from sync collection (i.e not equivalent to EphysPulses that extracts sync pulses for each probe) 

135 

136 # TODO generalise to other daq and file formats, generalise to 3A probes 

137 """ 

138 

139 priority = 90 

140 cpu = 2 

141 io_charge = 30 # this jobs reads raw ap files 

142 job_size = 'small' 

143 

144 @property 

145 def signature(self): 

146 signature = { 1iga

147 'input_files': [(f'_{self.sync_namespace}_DAQdata.raw.*bin', self.sync_collection, True), 

148 (f'_{self.sync_namespace}_DAQdata.raw.ch', self.sync_collection, True), 

149 (f'_{self.sync_namespace}_DAQdata.raw.meta', self.sync_collection, True), 

150 (f'_{self.sync_namespace}_DAQdata.wiring.json', self.sync_collection, True)], 

151 'output_files': [(f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True), 

152 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True), 

153 (f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True)] 

154 } 

155 return signature 1iga

156 

157 def _run(self, overwrite=False): 

158 bin_file = next(self.session_path.joinpath(self.sync_collection).glob('*.*bin'), None) 1hga

159 if not bin_file: 1hga

160 return [] 

161 

162 # TODO this is a hack, once we refactor the sync tasks should make generic extract_sync 

163 # that doesn't rely on output of glob_ephys_files 

164 files = [Bunch({'nidq': bin_file, 'label': ''})] 1hga

165 

166 _, outputs = extract_sync(self.session_path, ephys_files=files, overwrite=overwrite, namespace=self.sync_namespace) 1hga

167 

168 return outputs 1hga