Coverage for ibllib/pipes/misc.py: 59%
696 statements
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
1"""Miscellaneous pipeline utility functions."""
2import ctypes
3import hashlib
4import json
5import os
6import re
7import shutil
8import subprocess
9import sys
10import time
11import logging
12import warnings
13from functools import wraps
14from pathlib import Path
15from typing import Union, List, Callable, Any
16from inspect import signature
17import uuid
18import socket
19import traceback
20import tempfile
22import spikeglx
23from iblutil.io import hashfile, params
24from iblutil.util import range_str
25from one.alf.files import get_session_path
26from one.alf.spec import is_uuid_string, is_session_path, describe
27from one.api import ONE
29import ibllib.io.flags as flags
30import ibllib.io.raw_data_loaders as raw
31from ibllib.io.misc import delete_empty_folders
32import ibllib.io.session_params as sess_params
34log = logging.getLogger(__name__)
36DEVICE_FLAG_MAP = {'neuropixel': 'ephys',
37 'cameras': 'video',
38 'widefield': 'widefield',
39 'sync': 'sync'}
42def subjects_data_folder(folder: Path, rglob: bool = False) -> Path:
43 """Given a root_data_folder will try to find a 'Subjects' data folder.
45 If Subjects folder is passed will return it directly."""
46 if not isinstance(folder, Path): 1imfdage
47 folder = Path(folder) 1fdge
48 if rglob: 1imfdage
49 func = folder.rglob 1imfdage
50 else:
51 func = folder.glob
53 # Try to find Subjects folder one level
54 if folder.name.lower() != 'subjects': 1imfdage
55 # Try to find Subjects folder if folder.glob
56 spath = [x for x in func('*') if x.name.lower() == 'subjects'] 1imfdage
57 if not spath: 1imfdage
58 raise ValueError('No "Subjects" folder in children folders')
59 elif len(spath) > 1: 1imfdage
60 raise ValueError(f'Multiple "Subjects" folder in children folders: {spath}')
61 else:
62 folder = folder / spath[0] 1imfdage
64 return folder 1imfdage
67def cli_ask_default(prompt: str, default: str):
68 """
69 Prompt the user for input, display the default option and return user input or default
70 :param prompt: String to display to user
71 :param default: The default value to return if user doesn't enter anything
72 :return: User input or default
73 """
74 return input(f'{prompt} [default: {default}]: ') or default 1j
77def cli_ask_options(prompt: str, options: list, default_idx: int = 0) -> str:
78 parsed_options = [str(x) for x in options]
79 if default_idx is not None:
80 parsed_options[default_idx] = f"[{parsed_options[default_idx]}]"
81 options_str = " (" + " | ".join(parsed_options) + ")> "
82 ans = input(prompt + options_str) or str(options[default_idx])
83 if ans not in [str(x) for x in options]:
84 return cli_ask_options(prompt, options, default_idx=default_idx)
85 return ans
88def behavior_exists(session_path: str, include_devices=False) -> bool:
89 """
90 Returns True if the session has a task behaviour folder
91 :param session_path:
92 :return:
93 """
94 session_path = Path(session_path) 1zcfdage
95 if include_devices and session_path.joinpath("_devices").exists(): 1zcfdage
96 return True
97 if session_path.joinpath("raw_behavior_data").exists(): 1zcfdage
98 return True 1zcfdage
99 return any(session_path.glob('raw_task_data_*')) 1zca
102def check_transfer(src_session_path, dst_session_path):
103 """
104 Check all the files in the source directory match those in the destination directory. Function
105 will throw assertion errors/exceptions if number of files do not match, file names do not
106 match, or if file sizes do not match.
108 :param src_session_path: The source directory that was copied
109 :param dst_session_path: The copy target directory
110 """
111 src_files = sorted([x for x in Path(src_session_path).rglob('*') if x.is_file()]) 1y
112 dst_files = sorted([x for x in Path(dst_session_path).rglob('*') if x.is_file()]) 1y
113 assert len(src_files) == len(dst_files), 'Not all files transferred' 1y
114 for s, d in zip(src_files, dst_files): 1y
115 assert s.name == d.name, 'file name mismatch' 1y
116 assert s.stat().st_size == d.stat().st_size, 'file size mismatch' 1y
119def rename_session(session_path: str, new_subject=None, new_date=None, new_number=None,
120 ask: bool = False) -> Path:
121 """Rename a session. Prompts the user for the new subject name, data and number and then moves
122 session path to new session path.
124 :param session_path: A session path to rename
125 :type session_path: str
126 :param new_subject: A new subject name, if none provided, the user is prompted for one
127 :param new_date: A new session date, if none provided, the user is prompted for one
128 :param new_number: A new session number, if none provided, the user is prompted for one
129 :param ask: used to ensure prompt input from user, defaults to False
130 :type ask: bool
131 :return: The renamed session path
132 :rtype: Path
133 """
134 session_path = get_session_path(session_path) 1p
135 if session_path is None: 1p
136 raise ValueError('Session path not valid ALF session folder') 1p
137 mouse = session_path.parts[-3] 1p
138 date = session_path.parts[-2] 1p
139 sess = session_path.parts[-1] 1p
140 new_mouse = new_subject or mouse 1p
141 new_date = new_date or date 1p
142 new_sess = new_number or sess 1p
143 if ask: 1p
144 new_mouse = input(f"Please insert subject NAME [current value: {mouse}]> ") 1p
145 new_date = input(f"Please insert new session DATE [current value: {date}]> ") 1p
146 new_sess = input(f"Please insert new session NUMBER [current value: {sess}]> ") 1p
148 new_session_path = Path(*session_path.parts[:-3]).joinpath(new_mouse, new_date, 1p
149 new_sess.zfill(3))
150 assert is_session_path(new_session_path), 'invalid subject, date or number' 1p
152 if new_session_path.exists(): 1p
153 ans = input(f'Warning: session path {new_session_path} already exists.\nWould you like to '
154 f'move {new_session_path} to a backup directory? [y/N] ')
155 if (ans or 'n').lower() in ['n', 'no']:
156 print(f'Manual intervention required, data exists in the following directory: '
157 f'{session_path}')
158 return
159 if backup_session(new_session_path):
160 print(f'Backup was successful, removing directory {new_session_path}...')
161 shutil.rmtree(str(new_session_path), ignore_errors=True)
162 shutil.move(str(session_path), str(new_session_path)) 1p
163 print(session_path, "--> renamed to:") 1p
164 print(new_session_path) 1p
166 return new_session_path 1p
169def backup_session(folder_path, root=None, extra=''):
170 """
171 Used to move the contents of a session to a backup folder, likely before the folder is
172 removed.
174 Parameters
175 ----------
176 folder_path : str, pathlib.Path
177 The folder path to remove.
178 root : str, pathlib.Path
179 Copy folder tree relative to this. If None, copies from the session_path root.
180 extra : str, pathlib.Path
181 Extra folder parts to append to destination root path.
183 Returns
184 -------
185 pathlib.Path
186 The location of the backup data, if succeeded to copy.
187 """
188 if not root: 1q
189 if session_path := get_session_path(folder_path): 1q
190 root = session_path.parents[2] 1q
191 else:
192 root = folder_path.parent 1q
193 folder_path = Path(folder_path) 1q
194 bk_path = Path(tempfile.gettempdir(), 'backup_sessions', extra, folder_path.relative_to(root)) 1q
195 if folder_path.exists(): 1q
196 if not folder_path.is_dir(): 1q
197 log.error(f'The given folder path is not a directory: {folder_path}') 1q
198 return 1q
199 try: 1q
200 log.debug(f'Created path: {bk_path.parent}') 1q
201 bk_path = shutil.copytree(folder_path, bk_path) 1q
202 log.info(f'Copied contents from {folder_path} to {bk_path}') 1q
203 return bk_path 1q
204 except FileExistsError: 1q
205 log.error(f'A backup session for the given path already exists: {bk_path}, ' 1q
206 f'manual intervention is necessary.')
207 except shutil.Error as ex: 1q
208 log.error('Failed to copy files from %s to %s: %s', folder_path, bk_path, ex) 1q
209 else:
210 log.error('The given session path does not exist: %s', folder_path) 1q
213def copy_with_check(src, dst, **kwargs):
214 dst = Path(dst)
215 if dst.exists() and Path(src).stat().st_size == dst.stat().st_size:
216 return dst
217 elif dst.exists():
218 dst.unlink()
219 return shutil.copy2(src, dst, **kwargs)
222def transfer_session_folders(local_sessions: list, remote_subject_folder, subfolder_to_transfer):
223 """
224 Used to determine which local session folders should be transferred to which remote session folders, will prompt the user
225 when necessary.
227 Parameters
228 ----------
229 local_sessions : list
230 Required list of local session folder paths to sync to local server.
231 remote_subject_folder : str, pathlib.Path
232 The remote location of the subject folder (typically pulled from the params).
233 subfolder_to_transfer : str
234 Which subfolder to sync
236 Returns
237 -------
238 list of tuples
239 For each session, a tuple of (source, destination) of attempted file transfers.
240 list of bool
241 A boolean True/False for success/failure of the transfer.
242 """
243 transfer_list = [] # list of sessions to transfer 1cfdage
244 skip_list = "" # "list" of sessions to skip and the reason for the skip 1cfdage
245 # Iterate through all local sessions in the given list
246 for local_session in local_sessions: 1cfdage
247 # Set expected remote_session location and perform simple error state checks
248 remote_session = remote_subject_folder.joinpath(*local_session.parts[-3:]) 1cfdage
249 # Skip session if ...
250 if subfolder_to_transfer: 1cfdage
251 if not local_session.joinpath(subfolder_to_transfer).exists(): 1cfdage
252 msg = f"{local_session} - skipping session, no '{subfolder_to_transfer}' folder found locally"
253 log.warning(msg)
254 skip_list += msg + "\n"
255 continue
256 if not remote_session.parent.exists(): 1cfdage
257 msg = f"{local_session} - no matching remote session date folder found for the given local session" 1ca
258 log.info(msg) 1ca
259 skip_list += msg + "\n" 1ca
260 continue 1ca
261 if not behavior_exists(remote_session): 1cfdage
262 msg = f"{local_session} - skipping session, no behavior data found in remote folder {remote_session}" 1ca
263 log.warning(msg) 1ca
264 skip_list += msg + "\n" 1ca
265 continue 1ca
267 # Determine if there are multiple session numbers from the date path
268 local_sessions_for_date = get_session_numbers_from_date_path(local_session.parent) 1cfdage
269 remote_sessions_for_date = get_session_numbers_from_date_path(remote_session.parent) 1cfdage
270 remote_session_pick = None 1cfdage
271 if len(local_sessions_for_date) > 1 or len(remote_sessions_for_date) > 1: 1cfdage
272 # Format folder size output for end user to review
273 local_session_numbers_with_size = remote_session_numbers_with_size = "" 1ca
274 for lsfd in local_sessions_for_date: 1ca
275 size_in_gb = round(get_directory_size(local_session.parent / lsfd, in_gb=True), 2) 1ca
276 local_session_numbers_with_size += lsfd + " (" + str(size_in_gb) + " GB)\n" 1ca
277 for rsfd in remote_sessions_for_date: 1ca
278 size_in_gb = round(get_directory_size(remote_session.parent / rsfd, in_gb=True), 2) 1ca
279 remote_session_numbers_with_size += rsfd + " (" + str(size_in_gb) + " GB)\n" 1ca
280 log.info(f"\n\nThe following local session folder(s) were found on this acquisition PC:\n\n" 1ca
281 f"{''.join(local_session_numbers_with_size)}\nThe following remote session folder(s) were found on the "
282 f"server:\n\n{''.join(remote_session_numbers_with_size)}\n")
284 def _remote_session_picker(sessions_for_date): 1ca
285 resp = "s" 1ca
286 resp_invalid = True 1ca
287 while resp_invalid: # loop until valid user input 1ca
288 resp = input(f"\n\n--- USER INPUT NEEDED ---\nWhich REMOTE session number would you like to transfer your " 1ca
289 f"local session to? Options {range_str(map(int, sessions_for_date))} or "
290 f"[s]kip/[h]elp/[e]xit> ").strip().lower()
291 if resp == "h": 1ca
292 print("An example session filepath:\n")
293 describe("number") # Explain what a session number is
294 input("Press enter to continue")
295 elif resp == "s" or resp == "e": # exit loop 1ca
296 resp_invalid = False
297 elif len(resp) <= 3: 1ca
298 resp_invalid = False if [i for i in sessions_for_date if int(resp) == int(i)] else None 1ca
299 else:
300 print("Invalid response. Please try again.")
301 return resp 1ca
303 log.info(f"Evaluation for local session " 1ca
304 f"{local_session.parts[-3]}/{local_session.parts[-2]}/{local_session.parts[-1]}...")
305 user_response = _remote_session_picker(remote_sessions_for_date) 1ca
306 if user_response == "s": 1ca
307 msg = f"{local_session} - Local session skipped due to user input"
308 log.info(msg)
309 skip_list += msg + "\n"
310 continue
311 elif user_response == "e": 1ca
312 log.info("Exiting, no files transferred.")
313 return
314 else:
315 remote_session_pick = remote_session.parent / user_response.zfill(3) 1ca
317 # Append to the transfer_list
318 transfer_tuple = (local_session, remote_session_pick) if remote_session_pick else (local_session, remote_session) 1cfdage
319 transfer_list.append(transfer_tuple) 1cfdage
320 log.info(f"{transfer_tuple[0]}, {transfer_tuple[1]} - Added to the transfer list") 1cfdage
322 # Verify that the number of local transfer_list entries match the number of remote transfer_list entries
323 if len(transfer_list) != len(set(dst for _, dst in transfer_list)): 1cfdage
324 raise RuntimeError(
325 "An invalid combination of sessions were picked; the most likely cause of this error is multiple local "
326 "sessions being selected for a single remote session. Please rerun the script."
327 )
329 # Call rsync/rdiff function for every entry in the transfer list
330 success = [] 1cfdage
331 for src, dst in transfer_list: 1cfdage
332 if subfolder_to_transfer: 1cfdage
333 success.append(rsync_paths(src / subfolder_to_transfer, dst / subfolder_to_transfer)) 1cfdage
334 else:
335 success.append(rsync_paths(src, dst))
336 if not success[-1]: 1cfdage
337 log.error("File transfer failed, check log for reason.")
339 # Notification to user for any transfers were skipped
340 log.warning(f"Video transfers that were not completed:\n\n{skip_list}") if skip_list else log.info("No transfers skipped.") 1cfdage
341 return transfer_list, success 1cfdage
344def transfer_folder(src: Path, dst: Path, force: bool = False) -> None:
345 """functionality has been replaced by transfer_session_folders function"""
346 print(f"Attempting to copy:\n{src}\n--> {dst}")
347 if force:
348 print(f"Removing {dst}")
349 shutil.rmtree(dst, ignore_errors=True)
350 else:
351 try:
352 check_transfer(src, dst)
353 print("All files already copied, use force=True to re-copy")
354 return
355 except AssertionError:
356 pass
357 print(f"Copying all files:\n{src}\n--> {dst}")
358 # rsync_folder(src, dst, '**transfer_me.flag')
359 if sys.version_info.minor < 8:
360 # dirs_exist_ok kwarg not supported in < 3.8
361 shutil.rmtree(dst, ignore_errors=True)
362 shutil.copytree(src, dst, copy_function=copy_with_check)
363 else:
364 shutil.copytree(src, dst, dirs_exist_ok=True, copy_function=copy_with_check)
365 # If folder was created delete the src_flag_file
366 if check_transfer(src, dst) is None:
367 print("All files copied")
368 # rdiff-backup --compare /tmp/tmpw9o1zgn0 /tmp/tmp82gg36rm
369 # No changes found. Directory matches archive data.
372def load_params_dict(params_fname: str) -> dict:
373 params_fpath = Path(params.getfile(params_fname))
374 if not params_fpath.exists():
375 return None
376 with open(params_fpath, "r") as f:
377 out = json.load(f)
378 return out
381def load_videopc_params():
382 """(DEPRECATED) This will be removed in favour of iblrigv8 functions."""
383 warnings.warn('load_videopc_params will be removed in favour of iblrigv8', FutureWarning)
384 if not load_params_dict("videopc_params"):
385 create_videopc_params()
386 return load_params_dict("videopc_params")
389def load_ephyspc_params():
390 if not load_params_dict("ephyspc_params"):
391 create_ephyspc_params()
392 return load_params_dict("ephyspc_params")
395def create_basic_transfer_params(param_str='transfer_params', local_data_path=None,
396 remote_data_path=None, clobber=False, **kwargs):
397 """Create some basic parameters common to all acquisition rigs.
399 Namely prompt user for the local root data path and the remote (lab server) data path.
400 NB: All params stored in uppercase by convention.
402 Parameters
403 ----------
404 param_str : str
405 The name of the parameters to load/save.
406 local_data_path : str, pathlib.Path
407 The local root data path, stored with the DATA_FOLDER_PATH key. If None, user is prompted.
408 remote_data_path : str, pathlib.Path, bool
409 The local root data path, stored with the REMOTE_DATA_FOLDER_PATH key. If None, user is prompted.
410 If False, the REMOTE_DATA_PATH key is not updated or is set to False if clobber = True.
411 clobber : bool
412 If True, any parameters in existing parameter file not found as keyword args will be removed,
413 otherwise the user is prompted for these also.
414 **kwargs
415 Extra parameters to set. If value is None, the user is prompted.
417 Returns
418 -------
419 dict
420 The parameters written to disc.
422 Examples
423 --------
424 Set up basic transfer parameters for modality acquisition PC
426 >>> par = create_basic_transfer_params()
428 Set up basic transfer paramers without prompting the user
430 >>> par = create_basic_transfer_params(
431 ... local_data_path='/iblrig_data/Subjects',
432 ... remote_data_path='/mnt/iblserver.champalimaud.pt/ibldata/Subjects')
434 Prompt user for extra parameter using custom prompt (will call function with current default)
436 >>> from functools import partial
437 >>> par = create_basic_transfer_params(
438 ... custom_arg=partial(cli_ask_default, 'Please enter custom arg value'))
440 Set up with no remote path (NB: if not the first time, use clobber=True to save param key)
442 >>> par = create_basic_transfer_params(remote_data_path=False)
444 """
445 parameters = params.as_dict(params.read(param_str, {})) or {} 1jrsfdge
446 if local_data_path is None: 1jrsfdge
447 local_data_path = parameters.get('DATA_FOLDER_PATH') 1jrs
448 if not local_data_path or clobber: 1jrs
449 local_data_path = cli_ask_default("Where's your LOCAL 'Subjects' data folder?", local_data_path) 1j
450 parameters['DATA_FOLDER_PATH'] = local_data_path 1jrsfdge
452 if remote_data_path is None: 1jrsfdge
453 remote_data_path = parameters.get('REMOTE_DATA_FOLDER_PATH') 1jrs
454 if remote_data_path in (None, '') or clobber: 1jrs
455 remote_data_path = cli_ask_default("Where's your REMOTE 'Subjects' data folder?", remote_data_path) 1j
456 if remote_data_path is not False: 1jrsfdge
457 parameters['REMOTE_DATA_FOLDER_PATH'] = remote_data_path 1jrsfdge
458 elif 'REMOTE_DATA_FOLDER_PATH' not in parameters or clobber: 1j
459 parameters['REMOTE_DATA_FOLDER_PATH'] = False # Always assume no remote path 1j
461 # Deal with extraneous parameters
462 for k, v in kwargs.items(): 1jrsfdge
463 if callable(v): # expect function handle with default value as input 1j
464 n_pars = len(signature(v).parameters) 1j
465 parameters[k.upper()] = v(parameters.get(k.upper())) if n_pars > 0 else v() 1j
466 elif v is None: # generic prompt for key 1j
467 parameters[k.upper()] = cli_ask_default( 1j
468 f'Enter a value for parameter {k.upper()}', parameters.get(k.upper())
469 )
470 else: # assign value to parameter
471 parameters[k.upper()] = str(v) 1j
473 defined = list(map(str.upper, ('DATA_FOLDER_PATH', 'REMOTE_DATA_FOLDER_PATH', 'TRANSFER_LABEL', *kwargs.keys()))) 1jrsfdge
474 if clobber: 1jrsfdge
475 # Delete any parameters in parameter dict that were not passed as keyword args into function
476 parameters = {k: v for k, v in parameters.items() if k in defined} 1j
477 else:
478 # Prompt for any other parameters that weren't passed into function
479 for k in filter(lambda x: x not in defined, map(str.upper, parameters.keys())): 1jrsfdge
480 parameters[k] = cli_ask_default(f'Enter a value for parameter {k}', parameters.get(k)) 1j
482 if 'TRANSFER_LABEL' not in parameters: 1jrsfdge
483 parameters['TRANSFER_LABEL'] = f'{socket.gethostname()}_{uuid.getnode()}' 1j
485 # Write parameters
486 params.write(param_str, parameters) 1jrsfdge
487 return parameters 1jrsfdge
490def create_videopc_params(force=False, silent=False):
491 """(DEPRECATED) This will be removed in favour of iblrigv8 functions."""
492 url = 'https://github.com/int-brain-lab/iblrig/blob/videopc/docs/source/video.rst'
493 warnings.warn(f'create_videopc_params is deprecated, see {url}', FutureWarning)
494 if Path(params.getfile("videopc_params")).exists() and not force:
495 print(f"{params.getfile('videopc_params')} exists already, exiting...")
496 print(Path(params.getfile("videopc_params")).exists())
497 return
498 if silent:
499 data_folder_path = r"D:\iblrig_data\Subjects"
500 remote_data_folder_path = r"\\iblserver.champalimaud.pt\ibldata\Subjects"
501 body_cam_idx = 0
502 left_cam_idx = 1
503 right_cam_idx = 2
504 else:
505 data_folder_path = cli_ask_default(
506 r"Where's your LOCAL 'Subjects' data folder?", r"D:\iblrig_data\Subjects"
507 )
508 remote_data_folder_path = cli_ask_default(
509 r"Where's your REMOTE 'Subjects' data folder?",
510 r"\\iblserver.champalimaud.pt\ibldata\Subjects",
511 )
512 body_cam_idx = cli_ask_default("Please select the index of the BODY camera", "0")
513 left_cam_idx = cli_ask_default("Please select the index of the LEFT camera", "1")
514 right_cam_idx = cli_ask_default("Please select the index of the RIGHT camera", "2")
516 param_dict = {
517 "DATA_FOLDER_PATH": data_folder_path,
518 "REMOTE_DATA_FOLDER_PATH": remote_data_folder_path,
519 "BODY_CAM_IDX": body_cam_idx,
520 "LEFT_CAM_IDX": left_cam_idx,
521 "RIGHT_CAM_IDX": right_cam_idx,
522 }
523 params.write("videopc_params", param_dict)
524 print(f"Created {params.getfile('videopc_params')}")
525 print(param_dict)
526 return param_dict
529def create_ephyspc_params(force=False, silent=False):
530 if Path(params.getfile("ephyspc_params")).exists() and not force:
531 print(f"{params.getfile('ephyspc_params')} exists already, exiting...")
532 print(Path(params.getfile("ephyspc_params")).exists())
533 return
534 if silent:
535 data_folder_path = r"D:\iblrig_data\Subjects"
536 remote_data_folder_path = r"\\iblserver.champalimaud.pt\ibldata\Subjects"
537 probe_types = {"PROBE_TYPE_00": "3A", "PROBE_TYPE_01": "3B"}
538 else:
539 data_folder_path = cli_ask_default(
540 r"Where's your LOCAL 'Subjects' data folder?", r"D:\iblrig_data\Subjects"
541 )
542 remote_data_folder_path = cli_ask_default(
543 r"Where's your REMOTE 'Subjects' data folder?",
544 r"\\iblserver.champalimaud.pt\ibldata\Subjects",
545 )
546 n_probes = int(cli_ask_default("How many probes are you using?", '2'))
547 assert 100 > n_probes > 0, 'Please enter number between 1, 99 inclusive'
548 probe_types = {}
549 for i in range(n_probes):
550 probe_types[f'PROBE_TYPE_{i:02}'] = cli_ask_options(
551 f"What's the type of PROBE {i:02}?", ["3A", "3B"])
552 param_dict = {
553 "DATA_FOLDER_PATH": data_folder_path,
554 "REMOTE_DATA_FOLDER_PATH": remote_data_folder_path,
555 **probe_types
556 }
557 params.write("ephyspc_params", param_dict)
558 print(f"Created {params.getfile('ephyspc_params')}")
559 print(param_dict)
560 return param_dict
563def rdiff_install() -> bool:
564 """
565 For windows:
566 * if the rdiff-backup executable does not already exist on the system
567 * downloads rdiff-backup zip file
568 * copies the executable to the C:\tools folder
570 For linux/mac:
571 * runs a pip install rdiff-backup
573 Returns:
574 True when install is successful, False when an error is encountered
575 """
576 if os.name == "nt": 1Bi
577 # ensure tools folder exists
578 tools_folder = "C:\\tools\\"
579 os.mkdir(tools_folder) if not Path(tools_folder).exists() else None
581 rdiff_cmd_loc = tools_folder + "rdiff-backup.exe"
582 if not Path(rdiff_cmd_loc).exists():
583 import requests
584 import zipfile
585 from io import BytesIO
587 url = "https://github.com/rdiff-backup/rdiff-backup/releases/download/v2.0.5/rdiff-backup-2.0.5.win32exe.zip"
588 log.info("Downloading zip file for rdiff-backup.")
589 # Download the file by sending the request to the URL, ensure success by status code
590 if requests.get(url).status_code == 200:
591 log.info("Download complete for rdiff-backup zip file.")
592 # extracting the zip file contents
593 zipfile = zipfile.ZipFile(BytesIO(requests.get(url).content))
594 zipfile.extractall("C:\\Temp")
595 rdiff_folder_name = zipfile.namelist()[0] # attempting a bit of future-proofing
596 # move the executable to the C:\tools folder
597 shutil.copy("C:\\Temp\\" + rdiff_folder_name + "rdiff-backup.exe", rdiff_cmd_loc)
598 shutil.rmtree("C:\\Temp\\" + rdiff_folder_name) # cleanup temp folder
599 try: # attempt to call the rdiff command
600 subprocess.run([rdiff_cmd_loc, "--version"], check=True)
601 except (FileNotFoundError, subprocess.CalledProcessError) as e:
602 log.error("rdiff-backup installation did not complete.\n", e)
603 return False
604 return True
605 else:
606 log.error("Download request status code not 200, something did not go as expected.")
607 return False
608 else: # anything not Windows
609 try: # package should not be installed via the requirements.txt to accommodate windows 1Bi
610 subprocess.run(["pip", "install", "rdiff-backup"], check=True) 1Bi
611 except subprocess.CalledProcessError as e:
612 log.error("rdiff-backup pip install did not complete.\n", e)
613 return False
614 return True 1Bi
617def get_directory_size(dir_path: Path, in_gb=False) -> float:
618 """
619 Used to determine total size of all files in a given session_path, including all child directories
621 Args:
622 dir_path (Path): path we want to get the total size of
623 in_gb (bool): set to True for returned value to be in gigabytes
625 Returns:
626 float: sum of all files in the given directory path (in bytes by default, in GB if specified)
627 """
628 total = 0 1ca
629 with iter(os.scandir(dir_path)) as it: 1ca
630 for entry in it: 1ca
631 if entry.is_file(): 1ca
632 total += entry.stat().st_size 1ca
633 elif entry.is_dir(): 1ca
634 total += get_directory_size(entry.path) 1ca
635 if in_gb: 1ca
636 return total / 1024 / 1024 / 1024 # in GB 1ca
637 return total # in bytes 1ca
640def get_session_numbers_from_date_path(date_path: Path) -> list:
641 """
642 Retrieves session numbers when given a date path
644 Args:
645 date_path (Path): path to date, i.e. \\\\server\\some_lab\\Subjects\\Date"
647 Returns:
648 (list): Found sessions as a sorted list
649 """
650 contents = Path(date_path).glob('*') 1cfdage
651 folders = filter(lambda x: x.is_dir() and re.match(r'^\d{3}$', x.name), contents) 1cfdage
652 sessions_as_set = set(map(lambda x: x.name, folders)) 1cfdage
653 sessions_as_sorted_list = sorted(sessions_as_set) 1cfdage
654 return sessions_as_sorted_list 1cfdage
657def rsync_paths(src: Path, dst: Path) -> bool:
658 """
659 Used to run the rsync algorithm via a rdiff-backup command on the paths contained on the provided source and destination.
660 This function relies on the rdiff-backup package and is run from the command line, i.e. subprocess.run(). Full documentation
661 can be found here - https://rdiff-backup.net/docs/rdiff-backup.1.html
663 Parameters
664 ----------
665 src : Path
666 source path that contains data to be transferred
667 dst : Path
668 destination path that will receive the transferred data
670 Returns
671 -------
672 bool
673 True for success, False for failure
675 Raises
676 ------
677 FileNotFoundError, subprocess.CalledProcessError
678 """
679 # Set rdiff_cmd_loc based on OS type (assuming C:\tools is not in Windows PATH environ)
680 rdiff_cmd_loc = "C:\\tools\\rdiff-backup.exe" if os.name == "nt" else "rdiff-backup" 1tcimfdage
681 try: # Check if rdiff-backup command is available 1tcimfdage
682 subprocess.run([rdiff_cmd_loc, "--version"], check=True) 1tcimfdage
683 except (FileNotFoundError, subprocess.CalledProcessError) as e: 1i
684 if not rdiff_install(): # Attempt to install rdiff 1i
685 log.error("rdiff-backup command is unavailable, transfers can not continue.\n", e)
686 raise
688 log.info("Attempting to transfer data: " + str(src) + " -> " + str(dst)) 1tcimfdage
689 WindowsInhibitor().inhibit() if os.name == "nt" else None # prevent Windows from going to sleep 1tcimfdage
690 try: 1tcimfdage
691 rsync_command = [rdiff_cmd_loc, "--verbosity", str(0), 1tcimfdage
692 "--create-full-path", "--backup-mode", "--no-acls", "--no-eas",
693 "--no-file-statistics", "--exclude", "**transfer_me.flag",
694 str(src), str(dst)]
695 subprocess.run(rsync_command, check=True) 1tcimfdage
696 time.sleep(1) # give rdiff-backup a second to complete all logging operations 1tcimfdage
697 except (FileNotFoundError, subprocess.CalledProcessError) as e:
698 log.error("Transfer failed with code %i.\n", e.returncode)
699 if e.stderr:
700 log.error(e.stderr)
701 return False
702 log.info("Validating transfer completed...") 1tcimfdage
703 try: # Validate the transfers succeeded 1tcimfdage
704 rsync_validate = [rdiff_cmd_loc, "--verify", str(dst)] 1tcimfdage
705 subprocess.run(rsync_validate, check=True) 1tcimfdage
706 except (FileNotFoundError, subprocess.CalledProcessError) as e:
707 log.error(f"Validation for destination {dst} failed.\n", e)
708 return False
709 log.info("Cleaning up rdiff files...") 1tcimfdage
710 shutil.rmtree(dst / "rdiff-backup-data") 1tcimfdage
711 WindowsInhibitor().uninhibit() if os.name == 'nt' else None # allow Windows to go to sleep 1tcimfdage
712 return True 1tcimfdage
715def confirm_ephys_remote_folder(local_folder=False, remote_folder=False, force=False, iblscripts_folder=False,
716 session_path=None):
717 """
718 :param local_folder: The full path to the local Subjects folder
719 :param remote_folder: the full path to the remote Subjects folder
720 :param force:
721 :param iblscripts_folder:
722 :return:
723 """
724 # FIXME: session_path can be relative
725 pars = load_ephyspc_params()
726 if not iblscripts_folder:
727 import deploy
728 iblscripts_folder = Path(deploy.__file__).parent.parent
729 if not local_folder:
730 local_folder = pars["DATA_FOLDER_PATH"]
731 if not remote_folder:
732 remote_folder = pars["REMOTE_DATA_FOLDER_PATH"]
733 local_folder = Path(local_folder)
734 remote_folder = Path(remote_folder)
735 # Check for Subjects folder
736 local_folder = subjects_data_folder(local_folder, rglob=True)
737 remote_folder = subjects_data_folder(remote_folder, rglob=True)
739 log.info(f"local folder: {local_folder}")
740 log.info(f"remote folder: {remote_folder}")
741 if session_path is None:
742 src_session_paths = [x.parent for x in local_folder.rglob("transfer_me.flag")]
743 else:
744 src_session_paths = session_path if isinstance(session_path, list) else [session_path]
746 if not src_session_paths:
747 log.info("Nothing to transfer, exiting...")
748 return
749 for session_path in src_session_paths:
750 log.info(f"Found : {session_path}")
751 log.info(f"Found: {len(src_session_paths)} sessions to transfer, starting transferring now")
753 for session_path in src_session_paths:
754 log.info(f"Transferring session: {session_path}")
755 # Rename ephys files
756 # FIXME: if transfer has failed and wiring file is there renaming will fail!
757 rename_ephys_files(str(session_path))
758 # Move ephys files
759 move_ephys_files(str(session_path))
760 # Copy wiring files
761 copy_wiring_files(str(session_path), iblscripts_folder)
762 try:
763 create_alyx_probe_insertions(str(session_path))
764 except BaseException:
765 log.error(traceback.print_exc())
766 log.info("Probe creation failed, please create the probe insertions manually. Continuing transfer...")
767 msg = f"Transfer {session_path }to {remote_folder} with the same name?"
768 resp = input(msg + "\n[y]es/[r]ename/[s]kip/[e]xit\n ^\n> ") or "y"
769 resp = resp.lower()
770 log.info(resp)
771 if resp not in ["y", "r", "s", "e", "yes", "rename", "skip", "exit"]:
772 return confirm_ephys_remote_folder(
773 local_folder=local_folder,
774 remote_folder=remote_folder,
775 force=force,
776 iblscripts_folder=iblscripts_folder,
777 )
778 elif resp == "y" or resp == "yes":
779 pass
780 elif resp == "r" or resp == "rename":
781 session_path = rename_session(session_path)
782 if not session_path:
783 continue
784 elif resp == "s" or resp == "skip":
785 continue
786 elif resp == "e" or resp == "exit":
787 return
789 remote_session_path = remote_folder / Path(*session_path.parts[-3:])
790 if not behavior_exists(remote_session_path, include_devices=True):
791 log.error(f"No behavior folder found in {remote_session_path}: skipping session...")
792 return
793 # TODO: Check flagfiles on src.and dst + alf dir in session folder then remove
794 # Try catch? wher catch condition is force transfer maybe
795 transfer_folder(session_path / "raw_ephys_data", remote_session_path / "raw_ephys_data", force=force)
796 # if behavior extract_me.flag exists remove it, because of ephys flag
797 flag_file = session_path / "transfer_me.flag"
798 if flag_file.exists(): # this file only exists for the iblrig v7 and lower
799 flag_file.unlink()
800 if (remote_session_path / "extract_me.flag").exists():
801 (remote_session_path / "extract_me.flag").unlink()
802 # Create remote flags
803 create_ephys_transfer_done_flag(remote_session_path)
804 check_create_raw_session_flag(remote_session_path)
807def probe_labels_from_session_path(session_path: Union[str, Path]) -> List[str]:
808 """
809 Finds ephys probes according to the metadata spikeglx files. Only returns first subfolder
810 name under raw_ephys_data folder, ie. raw_ephys_data/probe00/copy_of_probe00 won't be returned
811 If there is a NP2.4 probe with several shanks, create several probes
812 :param session_path:
813 :return: list of strings
814 """
815 plabels = [] 1v
816 raw_ephys_folder = Path(session_path).joinpath('raw_ephys_data') 1v
817 for meta_file in raw_ephys_folder.rglob('*.ap.meta'): 1v
818 if meta_file.parents[1] != raw_ephys_folder: 1v
819 continue 1v
820 meta = spikeglx.read_meta_data(meta_file) 1v
821 nshanks = spikeglx._get_nshanks_from_meta(meta) 1v
822 if nshanks > 1: 1v
823 for i in range(nshanks): 1v
824 plabels.append(meta_file.parts[-2] + 'abcdefghij'[i]) 1v
825 else:
826 plabels.append(meta_file.parts[-2]) 1v
827 plabels.sort() 1v
828 return plabels 1v
831def create_alyx_probe_insertions(
832 session_path: str,
833 force: bool = False,
834 one: object = None,
835 model: str = None,
836 labels: list = None,
837):
838 if one is None: 1bu
839 one = ONE(cache_rest=None, mode='local')
840 eid = session_path if is_uuid_string(session_path) else one.path2eid(session_path) 1bu
841 if eid is None: 1bu
842 log.warning("Session not found on Alyx: please create session before creating insertions")
843 if model is None: 1bu
844 probe_model = spikeglx.get_neuropixel_version_from_folder(session_path)
845 pmodel = "3B2" if probe_model == "3B" else probe_model
846 else:
847 pmodel = model 1bu
848 labels = labels or probe_labels_from_session_path(session_path) 1bu
849 # create the qc fields in the json field
850 qc_dict = {} 1bu
851 qc_dict.update({"qc": "NOT_SET"}) 1bu
852 qc_dict.update({"extended_qc": {}}) 1bu
854 # create the dictionary
855 insertions = [] 1bu
856 for plabel in labels: 1bu
857 insdict = {"session": eid, "name": plabel, "model": pmodel, "json": qc_dict} 1bu
858 # search for the corresponding insertion in Alyx
859 alyx_insertion = one.alyx.get(f'/insertions?&session={eid}&name={plabel}', clobber=True) 1bu
860 # if it doesn't exist, create it
861 if len(alyx_insertion) == 0: 1bu
862 alyx_insertion = one.alyx.rest("insertions", "create", data=insdict) 1bu
863 else:
864 iid = alyx_insertion[0]["id"]
865 if force:
866 alyx_insertion = one.alyx.rest("insertions", "update", id=iid, data=insdict)
867 else:
868 alyx_insertion = alyx_insertion[0]
869 insertions.append(alyx_insertion) 1bu
870 return insertions 1bu
873def create_ephys_flags(session_folder: str):
874 """
875 Create flags for processing an ephys session. Should be called after move_ephys_files
876 :param session_folder: A path to an ephys session
877 :return:
878 """
879 session_path = Path(session_folder) 1A
880 flags.write_flag_file(session_path.joinpath("extract_ephys.flag")) 1A
881 flags.write_flag_file(session_path.joinpath("raw_ephys_qc.flag")) 1A
882 for probe_path in session_path.joinpath('raw_ephys_data').glob('probe*'): 1A
883 flags.write_flag_file(probe_path.joinpath("spike_sorting.flag")) 1A
886def create_ephys_transfer_done_flag(session_folder: str) -> None:
887 session_path = Path(session_folder) 1oC
888 flags.write_flag_file(session_path.joinpath("ephys_data_transferred.flag")) 1oC
891def create_video_transfer_done_flag(session_folder: str) -> None:
892 session_path = Path(session_folder) 1oDa
893 flags.write_flag_file(session_path.joinpath("video_data_transferred.flag")) 1oDa
896def create_transfer_done_flag(session_folder: str, flag_name: str) -> None:
897 session_path = Path(session_folder) 1de
898 flags.write_flag_file(session_path.joinpath(f"{flag_name}_data_transferred.flag")) 1de
901def check_create_raw_session_flag(session_folder: str) -> None:
902 session_path = Path(session_folder) 1olk
904 # if we have an experiment description file read in whether we expect video, ephys widefield etc, don't do it just based
905 # on the task protocol
906 experiment_description = sess_params.read_params(session_path) 1olk
908 def check_status(expected, flag): 1olk
909 if expected is not False and flag.exists(): 1lk
910 return True 1lk
911 if expected is False and not flag.exists(): 1lk
912 return True
913 else:
914 return False 1lk
916 if experiment_description is not None: 1olk
918 if any(session_path.joinpath('_devices').glob('*')): 1lk
919 return
921 # Find the devices in the experiment description file
922 devices = list() 1lk
923 for key in DEVICE_FLAG_MAP.keys(): 1lk
924 if experiment_description.get('devices', {}).get(key, None) is not None: 1lk
925 devices.append(key) 1lk
926 # In case of widefield the sync also needs to be in it's own folder
927 if 'widefield' in devices: 1lk
928 devices.append('sync') 1k
930 expected_flags = [session_path.joinpath(f'{DEVICE_FLAG_MAP[dev]}_data_transferred.flag') for dev in devices] 1lk
932 expected = [] 1lk
933 flag_files = [] 1lk
934 for dev, fl in zip(devices, expected_flags): 1lk
935 status = check_status(dev, fl) 1lk
936 if status: 1lk
937 flag_files.append(fl) 1lk
938 expected.append(status) 1lk
940 # In this case all the copying has completed
941 if all(expected): 1lk
942 # make raw session flag
943 flags.write_flag_file(session_path.joinpath("raw_session.flag")) 1lk
944 # and unlink individual copy flags
945 for fl in flag_files: 1lk
946 fl.unlink() 1lk
948 return 1lk
950 ephys = session_path.joinpath("ephys_data_transferred.flag") 1o
951 video = session_path.joinpath("video_data_transferred.flag") 1o
953 sett = raw.load_settings(session_path) 1o
954 if sett is None: 1o
955 log.error(f"No flag created for {session_path}")
956 return
958 is_biased = True if "biased" in sett["PYBPOD_PROTOCOL"] else False 1o
959 is_training = True if "training" in sett["PYBPOD_PROTOCOL"] else False 1o
960 is_habituation = True if "habituation" in sett["PYBPOD_PROTOCOL"] else False 1o
961 if video.exists() and (is_biased or is_training or is_habituation): 1o
962 flags.write_flag_file(session_path.joinpath("raw_session.flag")) 1o
963 video.unlink() 1o
964 if video.exists() and ephys.exists(): 1o
965 flags.write_flag_file(session_path.joinpath("raw_session.flag")) 1o
966 ephys.unlink() 1o
967 video.unlink() 1o
970def rename_ephys_files(session_folder: str) -> None:
971 """rename_ephys_files is system agnostic (3A, 3B1, 3B2).
972 Renames all ephys files to Alyx compatible filenames. Uses get_new_filename.
974 :param session_folder: Session folder path
975 :type session_folder: str
976 :return: None - Changes names of files on filesystem
977 :rtype: None
978 """
979 session_path = Path(session_folder) 1h
980 ap_files = session_path.rglob("*.ap.*") 1h
981 lf_files = session_path.rglob("*.lf.*") 1h
982 nidq_files = session_path.rglob("*.nidq.*") 1h
984 for apf in ap_files: 1h
985 new_filename = get_new_filename(apf.name) 1h
986 shutil.move(str(apf), str(apf.parent / new_filename)) 1h
988 for lff in lf_files: 1h
989 new_filename = get_new_filename(lff.name) 1h
990 shutil.move(str(lff), str(lff.parent / new_filename)) 1h
992 for nidqf in nidq_files: 1h
993 # Ignore wiring files: these are usually created after the file renaming however this
994 # function may be called a second time upon failed transfer.
995 if 'wiring' in nidqf.name: 1h
996 continue
997 new_filename = get_new_filename(nidqf.name) 1h
998 shutil.move(str(nidqf), str(nidqf.parent / new_filename)) 1h
1001def get_new_filename(filename: str) -> str:
1002 """get_new_filename is system agnostic (3A, 3B1, 3B2).
1003 Gets an alyx compatible filename from any spikeglx ephys file.
1005 :param filename: Name of an ephys file
1006 :return: New name for ephys file
1007 """
1008 root = "_spikeglx_ephysData" 1wh
1009 parts = filename.split('.') 1wh
1010 if len(parts) < 3: 1wh
1011 raise ValueError(fr'unrecognized filename "{filename}"') 1w
1012 pattern = r'.*(?P<gt>_g\d+_t\d+)' 1wh
1013 if not (match := re.match(pattern, parts[0])): 1wh
1014 raise ValueError(fr'unrecognized filename "{filename}"') 1w
1015 return '.'.join([root + match.group(1), *parts[1:]]) 1wh
1018def move_ephys_files(session_folder: str) -> None:
1019 """move_ephys_files is system agnostic (3A, 3B1, 3B2).
1020 Moves all properly named ephys files to appropriate locations for transfer.
1021 Use rename_ephys_files function before this one.
1023 :param session_folder: Session folder path
1024 :type session_folder: str
1025 :return: None - Moves files on filesystem
1026 :rtype: None
1027 """
1028 session_path = Path(session_folder) 1h
1029 raw_ephys_data_path = session_path / "raw_ephys_data" 1h
1031 imec_files = session_path.rglob("*.imec*") 1h
1032 for imf in imec_files: 1h
1033 # For 3B system probe0x == imecx
1034 probe_number = re.match(r'_spikeglx_ephysData_g\d_t\d.imec(\d+).*', imf.name) 1h
1035 if not probe_number: 1h
1036 # For 3A system imec files must be in a 'probexx' folder
1037 probe_label = re.search(r'probe\d+', str(imf)) 1h
1038 assert probe_label, f'Cannot assign probe number to file {imf}' 1h
1039 probe_label = probe_label.group() 1h
1040 else:
1041 probe_number, = probe_number.groups() 1h
1042 probe_label = f'probe{probe_number.zfill(2)}' 1h
1043 raw_ephys_data_path.joinpath(probe_label).mkdir(exist_ok=True) 1h
1044 shutil.move(imf, raw_ephys_data_path.joinpath(probe_label, imf.name)) 1h
1046 # NIDAq files (3B system only)
1047 nidq_files = session_path.rglob("*.nidq.*") 1h
1048 for nidqf in nidq_files: 1h
1049 shutil.move(str(nidqf), str(raw_ephys_data_path / nidqf.name)) 1h
1050 # Delete all empty folders recursively
1051 delete_empty_folders(raw_ephys_data_path, dry=False, recursive=True) 1h
1054def create_custom_ephys_wirings(iblscripts_folder: str):
1055 iblscripts_path = Path(iblscripts_folder)
1056 PARAMS = load_ephyspc_params()
1057 probe_set = set(v for k, v in PARAMS.items() if k.startswith('PROBE_TYPE'))
1059 params_path = iblscripts_path.parent / "iblscripts_params"
1060 params_path.mkdir(parents=True, exist_ok=True)
1061 wirings_path = iblscripts_path / "deploy" / "ephyspc" / "wirings"
1062 for k, v in PARAMS.items():
1063 if not k.startswith('PROBE_TYPE_'):
1064 continue
1065 probe_label = f'probe{k[-2:]}'
1066 if v not in ('3A', '3B'):
1067 raise ValueError(f'Unsupported probe type "{v}"')
1068 shutil.copy(
1069 wirings_path / f"{v}.wiring.json", params_path / f"{v}_{probe_label}.wiring.json"
1070 )
1071 print(f"Created {v}.wiring.json in {params_path} for {probe_label}")
1072 if "3B" in probe_set:
1073 shutil.copy(wirings_path / "nidq.wiring.json", params_path / "nidq.wiring.json")
1074 print(f"Created nidq.wiring.json in {params_path}")
1075 print(f"\nYou can now modify your wiring files from folder {params_path}")
1078def get_iblscripts_folder():
1079 return str(Path().cwd().parent.parent)
1082def copy_wiring_files(session_folder, iblscripts_folder):
1083 """Run after moving files to probe folders"""
1084 PARAMS = load_ephyspc_params()
1085 if PARAMS["PROBE_TYPE_00"] != PARAMS["PROBE_TYPE_01"]:
1086 print("Having different probe types is not supported")
1087 raise NotImplementedError()
1088 session_path = Path(session_folder)
1089 iblscripts_path = Path(iblscripts_folder)
1090 iblscripts_params_path = iblscripts_path.parent / "iblscripts_params"
1091 wirings_path = iblscripts_path / "deploy" / "ephyspc" / "wirings"
1092 termination = '.wiring.json'
1093 # Determine system
1094 ephys_system = PARAMS["PROBE_TYPE_00"]
1095 # Define where to get the files from (determine if custom wiring applies)
1096 src_wiring_path = iblscripts_params_path if iblscripts_params_path.exists() else wirings_path
1097 probe_wiring_file_path = src_wiring_path / f"{ephys_system}{termination}"
1099 if ephys_system == "3B":
1100 # Copy nidq file
1101 nidq_files = session_path.rglob("*.nidq.bin")
1102 for nidqf in nidq_files:
1103 nidq_wiring_name = ".".join(str(nidqf.name).split(".")[:-1]) + termination
1104 shutil.copy(
1105 str(src_wiring_path / f"nidq{termination}"),
1106 str(session_path / "raw_ephys_data" / nidq_wiring_name),
1107 )
1108 # If system is either (3A OR 3B) copy a wiring file for each ap.bin file
1109 for binf in session_path.rglob("*.ap.bin"):
1110 probe_label = re.search(r'probe\d+', str(binf))
1111 if probe_label:
1112 wiring_name = ".".join(str(binf.name).split(".")[:-2]) + termination
1113 dst_path = session_path / "raw_ephys_data" / probe_label.group() / wiring_name
1114 shutil.copy(probe_wiring_file_path, dst_path)
1117def multi_parts_flags_creation(root_paths: Union[list, str, Path]) -> List[Path]:
1118 """
1119 Creates the sequence files to run spike sorting in batches
1120 A sequence file is a json file with the following fields:
1121 sha1: a unique hash of the metafiles involved
1122 probe: a string with the probe name
1123 index: the index within the sequence
1124 nrecs: the length of the sequence
1125 files: a list of files
1126 :param root_paths:
1127 :return:
1128 """
1129 from one.alf import io as alfio 1n
1130 # "001/raw_ephys_data/probe00/_spikeglx_ephysData_g0_t0.imec0.ap.meta",
1131 if isinstance(root_paths, str) or isinstance(root_paths, Path): 1n
1132 root_paths = [root_paths] 1n
1133 recordings = {} 1n
1134 for root_path in root_paths: 1n
1135 for meta_file in root_path.rglob("*.ap.meta"): 1n
1136 # we want to make sure that the file is just under session_path/raw_ephys_data/{probe_label}
1137 session_path = alfio.files.get_session_path(meta_file) 1n
1138 raw_ephys_path = session_path.joinpath('raw_ephys_data') 1n
1139 if meta_file.parents[1] != raw_ephys_path: 1n
1140 log.warning(f"{meta_file} is not in a probe directory and will be skipped")
1141 continue
1142 # stack the meta-file in the probe label key of the recordings dictionary
1143 plabel = meta_file.parts[-2] 1n
1144 recordings[plabel] = recordings.get(plabel, []) + [meta_file] 1n
1145 # once we have all of the files
1146 for k in recordings: 1n
1147 nrecs = len(recordings[k]) 1n
1148 recordings[k].sort() 1n
1149 # the identifier of the overarching recording sequence is the hash of hashes of the files
1150 m = hashlib.sha1() 1n
1151 for i, meta_file in enumerate(recordings[k]): 1n
1152 hash = hashfile.sha1(meta_file) 1n
1153 m.update(hash.encode()) 1n
1154 # writes the sequence files
1155 for i, meta_file in enumerate(recordings[k]): 1n
1156 sequence_file = meta_file.parent.joinpath(meta_file.name.replace('ap.meta', 'sequence.json')) 1n
1157 with open(sequence_file, 'w+') as fid: 1n
1158 json.dump(dict(sha1=m.hexdigest(), probe=k, index=i, nrecs=len(recordings[k]), 1n
1159 files=list(map(str, recordings[k]))), fid)
1160 log.info(f"{k}: {i}/{nrecs} written sequence file {recordings}") 1n
1161 return recordings 1n
1164class WindowsInhibitor:
1165 """Prevent OS sleep/hibernate in windows; code from:
1166 https://github.com/h3llrais3r/Deluge-PreventSuspendPlus/blob/master/preventsuspendplus/core.py
1167 API documentation:
1168 https://msdn.microsoft.com/en-us/library/windows/desktop/aa373208(v=vs.85).aspx"""
1169 ES_CONTINUOUS = 0x80000000
1170 ES_SYSTEM_REQUIRED = 0x00000001
1172 @staticmethod
1173 def _set_thread_execution_state(state: int) -> None:
1174 result = ctypes.windll.kernel32.SetThreadExecutionState(state)
1175 if result == 0:
1176 log.error("Failed to set thread execution state.")
1178 @staticmethod
1179 def inhibit(quiet: bool = False):
1180 if quiet:
1181 log.debug("Preventing Windows from going to sleep")
1182 else:
1183 print("Preventing Windows from going to sleep")
1184 WindowsInhibitor._set_thread_execution_state(WindowsInhibitor.ES_CONTINUOUS | WindowsInhibitor.ES_SYSTEM_REQUIRED)
1186 @staticmethod
1187 def uninhibit(quiet: bool = False):
1188 if quiet:
1189 log.debug("Allowing Windows to go to sleep")
1190 else:
1191 print("Allowing Windows to go to sleep")
1192 WindowsInhibitor._set_thread_execution_state(WindowsInhibitor.ES_CONTINUOUS)
1195def sleepless(func: Callable[..., Any]) -> Callable[..., Any]:
1196 """
1197 Decorator to ensure that the system doesn't enter sleep or idle mode during a long-running task.
1199 This decorator wraps a function and sets the thread execution state to prevent
1200 the system from entering sleep or idle mode while the decorated function is
1201 running.
1203 Parameters
1204 ----------
1205 func : callable
1206 The function to decorate.
1208 Returns
1209 -------
1210 callable
1211 The decorated function.
1212 """
1214 @wraps(func) 1x
1215 def inner(*args, **kwargs) -> Any: 1x
1216 if os.name == 'nt': 1x
1217 WindowsInhibitor().inhibit(quiet=True)
1218 result = func(*args, **kwargs) 1x
1219 if os.name == 'nt': 1x
1220 WindowsInhibitor().uninhibit(quiet=True)
1221 return result 1x
1222 return inner 1x