Coverage for ibllib/pipes/misc.py: 82%
129 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 15:25 +0000
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 15:25 +0000
1"""Miscellaneous pipeline utility functions."""
2import ctypes
3import os
4import re
5import shutil
6import logging
7from functools import wraps
8from pathlib import Path
9from typing import Union, List, Callable, Any
11import spikeglx
12from one.alf.spec import is_uuid_string
13from one.api import ONE
15from ibllib.io.misc import delete_empty_folders
17log = logging.getLogger(__name__)
19DEVICE_FLAG_MAP = {'neuropixel': 'ephys',
20 'cameras': 'video',
21 'widefield': 'widefield',
22 'sync': 'sync'}
25def probe_labels_from_session_path(session_path: Union[str, Path]) -> List[str]:
26 """
27 Finds ephys probes according to the metadata spikeglx files. Only returns first subfolder
28 name under raw_ephys_data folder, ie. raw_ephys_data/probe00/copy_of_probe00 won't be returned
29 If there is a NP2.4 probe with several shanks, create several probes
30 :param session_path:
31 :return: list of strings
32 """
33 plabels = [] 1d
34 raw_ephys_folder = Path(session_path).joinpath('raw_ephys_data') 1d
35 for meta_file in raw_ephys_folder.rglob('*.ap.meta'): 1d
36 if meta_file.parents[1] != raw_ephys_folder: 1d
37 continue 1d
38 meta = spikeglx.read_meta_data(meta_file) 1d
39 nshanks = spikeglx._get_nshanks_from_meta(meta) 1d
40 if nshanks > 1: 1d
41 for i in range(nshanks): 1d
42 plabels.append(meta_file.parts[-2] + 'abcdefghij'[i]) 1d
43 else:
44 plabels.append(meta_file.parts[-2]) 1d
45 plabels.sort() 1d
46 return plabels 1d
49def create_alyx_probe_insertions(
50 session_path: str,
51 force: bool = False,
52 one: object = None,
53 model: str = None,
54 labels: list = None,
55):
56 if one is None: 1ac
57 one = ONE(cache_rest=None, mode='local')
58 eid = session_path if is_uuid_string(session_path) else one.path2eid(session_path) 1ac
59 if eid is None: 1ac
60 log.warning("Session not found on Alyx: please create session before creating insertions")
61 if model is None: 1ac
62 probe_model = spikeglx.get_neuropixel_version_from_folder(session_path)
63 pmodel = "3B2" if probe_model == "3B" else probe_model
64 else:
65 pmodel = model 1ac
66 labels = labels or probe_labels_from_session_path(session_path) 1ac
67 # create the qc fields in the json field
68 qc_dict = {} 1ac
69 qc_dict.update({"qc": "NOT_SET"}) 1ac
70 qc_dict.update({"extended_qc": {}}) 1ac
72 # create the dictionary
73 insertions = [] 1ac
74 for plabel in labels: 1ac
75 insdict = {"session": eid, "name": plabel, "model": pmodel, "json": qc_dict} 1ac
76 # search for the corresponding insertion in Alyx
77 alyx_insertion = one.alyx.get(f'/insertions?&session={str(eid)}&name={plabel}', clobber=True) 1ac
78 # if it doesn't exist, create it
79 if len(alyx_insertion) == 0: 1ac
80 alyx_insertion = one.alyx.rest("insertions", "create", data=insdict) 1ac
81 else:
82 iid = alyx_insertion[0]["id"]
83 if force:
84 alyx_insertion = one.alyx.rest("insertions", "update", id=iid, data=insdict)
85 else:
86 alyx_insertion = alyx_insertion[0]
87 insertions.append(alyx_insertion) 1ac
88 return insertions 1ac
91def rename_ephys_files(session_folder: str) -> None:
92 """rename_ephys_files is system agnostic (3A, 3B1, 3B2).
93 Renames all ephys files to Alyx compatible filenames. Uses get_new_filename.
95 :param session_folder: Session folder path
96 :type session_folder: str
97 :return: None - Changes names of files on filesystem
98 :rtype: None
99 """
100 session_path = Path(session_folder) 1b
101 ap_files = session_path.rglob("*.ap.*") 1b
102 lf_files = session_path.rglob("*.lf.*") 1b
103 nidq_files = session_path.rglob("*.nidq.*") 1b
105 for apf in ap_files: 1b
106 new_filename = get_new_filename(apf.name) 1b
107 shutil.move(str(apf), str(apf.parent / new_filename)) 1b
109 for lff in lf_files: 1b
110 new_filename = get_new_filename(lff.name) 1b
111 shutil.move(str(lff), str(lff.parent / new_filename)) 1b
113 for nidqf in nidq_files: 1b
114 # Ignore wiring files: these are usually created after the file renaming however this
115 # function may be called a second time upon failed transfer.
116 if 'wiring' in nidqf.name: 1b
117 continue
118 new_filename = get_new_filename(nidqf.name) 1b
119 shutil.move(str(nidqf), str(nidqf.parent / new_filename)) 1b
122def get_new_filename(filename: str) -> str:
123 """get_new_filename is system agnostic (3A, 3B1, 3B2).
124 Gets an alyx compatible filename from any spikeglx ephys file.
126 :param filename: Name of an ephys file
127 :return: New name for ephys file
128 """
129 root = "_spikeglx_ephysData" 1eb
130 parts = filename.split('.') 1eb
131 if len(parts) < 3: 1eb
132 raise ValueError(fr'unrecognized filename "{filename}"') 1e
133 pattern = r'.*(?P<gt>_g\d+_t\d+)' 1eb
134 if not (match := re.match(pattern, parts[0])): 1eb
135 raise ValueError(fr'unrecognized filename "{filename}"') 1e
136 return '.'.join([root + match.group(1), *parts[1:]]) 1eb
139def move_ephys_files(session_folder: str) -> None:
140 """move_ephys_files is system agnostic (3A, 3B1, 3B2).
141 Moves all properly named ephys files to appropriate locations for transfer.
142 Use rename_ephys_files function before this one.
144 :param session_folder: Session folder path
145 :type session_folder: str
146 :return: None - Moves files on filesystem
147 :rtype: None
148 """
149 session_path = Path(session_folder) 1b
150 raw_ephys_data_path = session_path / "raw_ephys_data" 1b
152 imec_files = session_path.rglob("*.imec*") 1b
153 for imf in imec_files: 1b
154 # For 3B system probe0x == imecx
155 probe_number = re.match(r'_spikeglx_ephysData_g\d_t\d.imec(\d+).*', imf.name) 1b
156 if not probe_number: 1b
157 # For 3A system imec files must be in a 'probexx' folder
158 probe_label = re.search(r'probe\d+', str(imf)) 1b
159 assert probe_label, f'Cannot assign probe number to file {imf}' 1b
160 probe_label = probe_label.group() 1b
161 else:
162 probe_number, = probe_number.groups() 1b
163 probe_label = f'probe{probe_number.zfill(2)}' 1b
164 raw_ephys_data_path.joinpath(probe_label).mkdir(exist_ok=True) 1b
165 shutil.move(imf, raw_ephys_data_path.joinpath(probe_label, imf.name)) 1b
167 # NIDAq files (3B system only)
168 nidq_files = session_path.rglob("*.nidq.*") 1b
169 for nidqf in nidq_files: 1b
170 shutil.move(str(nidqf), str(raw_ephys_data_path / nidqf.name)) 1b
171 # Delete all empty folders recursively
172 delete_empty_folders(raw_ephys_data_path, dry=False, recursive=True) 1b
175def get_iblscripts_folder():
176 return str(Path().cwd().parent.parent)
179class WindowsInhibitor:
180 """Prevent OS sleep/hibernate in windows; code from:
181 https://github.com/h3llrais3r/Deluge-PreventSuspendPlus/blob/master/preventsuspendplus/core.py
182 API documentation:
183 https://msdn.microsoft.com/en-us/library/windows/desktop/aa373208(v=vs.85).aspx"""
184 ES_CONTINUOUS = 0x80000000
185 ES_SYSTEM_REQUIRED = 0x00000001
187 @staticmethod
188 def _set_thread_execution_state(state: int) -> None:
189 result = ctypes.windll.kernel32.SetThreadExecutionState(state)
190 if result == 0:
191 log.error("Failed to set thread execution state.")
193 @staticmethod
194 def inhibit(quiet: bool = False):
195 if quiet:
196 log.debug("Preventing Windows from going to sleep")
197 else:
198 print("Preventing Windows from going to sleep")
199 WindowsInhibitor._set_thread_execution_state(WindowsInhibitor.ES_CONTINUOUS | WindowsInhibitor.ES_SYSTEM_REQUIRED)
201 @staticmethod
202 def uninhibit(quiet: bool = False):
203 if quiet:
204 log.debug("Allowing Windows to go to sleep")
205 else:
206 print("Allowing Windows to go to sleep")
207 WindowsInhibitor._set_thread_execution_state(WindowsInhibitor.ES_CONTINUOUS)
210def sleepless(func: Callable[..., Any]) -> Callable[..., Any]:
211 """
212 Decorator to ensure that the system doesn't enter sleep or idle mode during a long-running task.
214 This decorator wraps a function and sets the thread execution state to prevent
215 the system from entering sleep or idle mode while the decorated function is
216 running.
218 Parameters
219 ----------
220 func : callable
221 The function to decorate.
223 Returns
224 -------
225 callable
226 The decorated function.
227 """
229 @wraps(func) 1f
230 def inner(*args, **kwargs) -> Any: 1f
231 if os.name == 'nt': 1f
232 WindowsInhibitor().inhibit(quiet=True)
233 result = func(*args, **kwargs) 1f
234 if os.name == 'nt': 1f
235 WindowsInhibitor().uninhibit(quiet=True)
236 return result 1f
237 return inner 1f