Coverage for ibllib/pipes/misc.py: 82%

129 statements  

« prev     ^ index     » next       coverage.py v7.7.0, created at 2025-03-17 15:25 +0000

1"""Miscellaneous pipeline utility functions.""" 

2import ctypes 

3import os 

4import re 

5import shutil 

6import logging 

7from functools import wraps 

8from pathlib import Path 

9from typing import Union, List, Callable, Any 

10 

11import spikeglx 

12from one.alf.spec import is_uuid_string 

13from one.api import ONE 

14 

15from ibllib.io.misc import delete_empty_folders 

16 

17log = logging.getLogger(__name__) 

18 

19DEVICE_FLAG_MAP = {'neuropixel': 'ephys', 

20 'cameras': 'video', 

21 'widefield': 'widefield', 

22 'sync': 'sync'} 

23 

24 

25def probe_labels_from_session_path(session_path: Union[str, Path]) -> List[str]: 

26 """ 

27 Finds ephys probes according to the metadata spikeglx files. Only returns first subfolder 

28 name under raw_ephys_data folder, ie. raw_ephys_data/probe00/copy_of_probe00 won't be returned 

29 If there is a NP2.4 probe with several shanks, create several probes 

30 :param session_path: 

31 :return: list of strings 

32 """ 

33 plabels = [] 1d

34 raw_ephys_folder = Path(session_path).joinpath('raw_ephys_data') 1d

35 for meta_file in raw_ephys_folder.rglob('*.ap.meta'): 1d

36 if meta_file.parents[1] != raw_ephys_folder: 1d

37 continue 1d

38 meta = spikeglx.read_meta_data(meta_file) 1d

39 nshanks = spikeglx._get_nshanks_from_meta(meta) 1d

40 if nshanks > 1: 1d

41 for i in range(nshanks): 1d

42 plabels.append(meta_file.parts[-2] + 'abcdefghij'[i]) 1d

43 else: 

44 plabels.append(meta_file.parts[-2]) 1d

45 plabels.sort() 1d

46 return plabels 1d

47 

48 

49def create_alyx_probe_insertions( 

50 session_path: str, 

51 force: bool = False, 

52 one: object = None, 

53 model: str = None, 

54 labels: list = None, 

55): 

56 if one is None: 1ac

57 one = ONE(cache_rest=None, mode='local') 

58 eid = session_path if is_uuid_string(session_path) else one.path2eid(session_path) 1ac

59 if eid is None: 1ac

60 log.warning("Session not found on Alyx: please create session before creating insertions") 

61 if model is None: 1ac

62 probe_model = spikeglx.get_neuropixel_version_from_folder(session_path) 

63 pmodel = "3B2" if probe_model == "3B" else probe_model 

64 else: 

65 pmodel = model 1ac

66 labels = labels or probe_labels_from_session_path(session_path) 1ac

67 # create the qc fields in the json field 

68 qc_dict = {} 1ac

69 qc_dict.update({"qc": "NOT_SET"}) 1ac

70 qc_dict.update({"extended_qc": {}}) 1ac

71 

72 # create the dictionary 

73 insertions = [] 1ac

74 for plabel in labels: 1ac

75 insdict = {"session": eid, "name": plabel, "model": pmodel, "json": qc_dict} 1ac

76 # search for the corresponding insertion in Alyx 

77 alyx_insertion = one.alyx.get(f'/insertions?&session={str(eid)}&name={plabel}', clobber=True) 1ac

78 # if it doesn't exist, create it 

79 if len(alyx_insertion) == 0: 1ac

80 alyx_insertion = one.alyx.rest("insertions", "create", data=insdict) 1ac

81 else: 

82 iid = alyx_insertion[0]["id"] 

83 if force: 

84 alyx_insertion = one.alyx.rest("insertions", "update", id=iid, data=insdict) 

85 else: 

86 alyx_insertion = alyx_insertion[0] 

87 insertions.append(alyx_insertion) 1ac

88 return insertions 1ac

89 

90 

91def rename_ephys_files(session_folder: str) -> None: 

92 """rename_ephys_files is system agnostic (3A, 3B1, 3B2). 

93 Renames all ephys files to Alyx compatible filenames. Uses get_new_filename. 

94 

95 :param session_folder: Session folder path 

96 :type session_folder: str 

97 :return: None - Changes names of files on filesystem 

98 :rtype: None 

99 """ 

100 session_path = Path(session_folder) 1b

101 ap_files = session_path.rglob("*.ap.*") 1b

102 lf_files = session_path.rglob("*.lf.*") 1b

103 nidq_files = session_path.rglob("*.nidq.*") 1b

104 

105 for apf in ap_files: 1b

106 new_filename = get_new_filename(apf.name) 1b

107 shutil.move(str(apf), str(apf.parent / new_filename)) 1b

108 

109 for lff in lf_files: 1b

110 new_filename = get_new_filename(lff.name) 1b

111 shutil.move(str(lff), str(lff.parent / new_filename)) 1b

112 

113 for nidqf in nidq_files: 1b

114 # Ignore wiring files: these are usually created after the file renaming however this 

115 # function may be called a second time upon failed transfer. 

116 if 'wiring' in nidqf.name: 1b

117 continue 

118 new_filename = get_new_filename(nidqf.name) 1b

119 shutil.move(str(nidqf), str(nidqf.parent / new_filename)) 1b

120 

121 

122def get_new_filename(filename: str) -> str: 

123 """get_new_filename is system agnostic (3A, 3B1, 3B2). 

124 Gets an alyx compatible filename from any spikeglx ephys file. 

125 

126 :param filename: Name of an ephys file 

127 :return: New name for ephys file 

128 """ 

129 root = "_spikeglx_ephysData" 1eb

130 parts = filename.split('.') 1eb

131 if len(parts) < 3: 1eb

132 raise ValueError(fr'unrecognized filename "{filename}"') 1e

133 pattern = r'.*(?P<gt>_g\d+_t\d+)' 1eb

134 if not (match := re.match(pattern, parts[0])): 1eb

135 raise ValueError(fr'unrecognized filename "{filename}"') 1e

136 return '.'.join([root + match.group(1), *parts[1:]]) 1eb

137 

138 

139def move_ephys_files(session_folder: str) -> None: 

140 """move_ephys_files is system agnostic (3A, 3B1, 3B2). 

141 Moves all properly named ephys files to appropriate locations for transfer. 

142 Use rename_ephys_files function before this one. 

143 

144 :param session_folder: Session folder path 

145 :type session_folder: str 

146 :return: None - Moves files on filesystem 

147 :rtype: None 

148 """ 

149 session_path = Path(session_folder) 1b

150 raw_ephys_data_path = session_path / "raw_ephys_data" 1b

151 

152 imec_files = session_path.rglob("*.imec*") 1b

153 for imf in imec_files: 1b

154 # For 3B system probe0x == imecx 

155 probe_number = re.match(r'_spikeglx_ephysData_g\d_t\d.imec(\d+).*', imf.name) 1b

156 if not probe_number: 1b

157 # For 3A system imec files must be in a 'probexx' folder 

158 probe_label = re.search(r'probe\d+', str(imf)) 1b

159 assert probe_label, f'Cannot assign probe number to file {imf}' 1b

160 probe_label = probe_label.group() 1b

161 else: 

162 probe_number, = probe_number.groups() 1b

163 probe_label = f'probe{probe_number.zfill(2)}' 1b

164 raw_ephys_data_path.joinpath(probe_label).mkdir(exist_ok=True) 1b

165 shutil.move(imf, raw_ephys_data_path.joinpath(probe_label, imf.name)) 1b

166 

167 # NIDAq files (3B system only) 

168 nidq_files = session_path.rglob("*.nidq.*") 1b

169 for nidqf in nidq_files: 1b

170 shutil.move(str(nidqf), str(raw_ephys_data_path / nidqf.name)) 1b

171 # Delete all empty folders recursively 

172 delete_empty_folders(raw_ephys_data_path, dry=False, recursive=True) 1b

173 

174 

175def get_iblscripts_folder(): 

176 return str(Path().cwd().parent.parent) 

177 

178 

179class WindowsInhibitor: 

180 """Prevent OS sleep/hibernate in windows; code from: 

181 https://github.com/h3llrais3r/Deluge-PreventSuspendPlus/blob/master/preventsuspendplus/core.py 

182 API documentation: 

183 https://msdn.microsoft.com/en-us/library/windows/desktop/aa373208(v=vs.85).aspx""" 

184 ES_CONTINUOUS = 0x80000000 

185 ES_SYSTEM_REQUIRED = 0x00000001 

186 

187 @staticmethod 

188 def _set_thread_execution_state(state: int) -> None: 

189 result = ctypes.windll.kernel32.SetThreadExecutionState(state) 

190 if result == 0: 

191 log.error("Failed to set thread execution state.") 

192 

193 @staticmethod 

194 def inhibit(quiet: bool = False): 

195 if quiet: 

196 log.debug("Preventing Windows from going to sleep") 

197 else: 

198 print("Preventing Windows from going to sleep") 

199 WindowsInhibitor._set_thread_execution_state(WindowsInhibitor.ES_CONTINUOUS | WindowsInhibitor.ES_SYSTEM_REQUIRED) 

200 

201 @staticmethod 

202 def uninhibit(quiet: bool = False): 

203 if quiet: 

204 log.debug("Allowing Windows to go to sleep") 

205 else: 

206 print("Allowing Windows to go to sleep") 

207 WindowsInhibitor._set_thread_execution_state(WindowsInhibitor.ES_CONTINUOUS) 

208 

209 

210def sleepless(func: Callable[..., Any]) -> Callable[..., Any]: 

211 """ 

212 Decorator to ensure that the system doesn't enter sleep or idle mode during a long-running task. 

213 

214 This decorator wraps a function and sets the thread execution state to prevent 

215 the system from entering sleep or idle mode while the decorated function is 

216 running. 

217 

218 Parameters 

219 ---------- 

220 func : callable 

221 The function to decorate. 

222 

223 Returns 

224 ------- 

225 callable 

226 The decorated function. 

227 """ 

228 

229 @wraps(func) 1f

230 def inner(*args, **kwargs) -> Any: 1f

231 if os.name == 'nt': 1f

232 WindowsInhibitor().inhibit(quiet=True) 

233 result = func(*args, **kwargs) 1f

234 if os.name == 'nt': 1f

235 WindowsInhibitor().uninhibit(quiet=True) 

236 return result 1f

237 return inner 1f