Coverage for ibllib/io/flags.py: 38%
133 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 15:25 +0000
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 15:25 +0000
1#!/usr/bin/env python
2# -*- coding:utf-8 -*-
3# @Author: Niccolò Bonacchi
4# @Date: Tuesday, January 22nd 2019, 12:32:14 pm
5from pathlib import Path
6import logging
8logger_ = logging.getLogger(__name__)
10FLAG_FILE_NAMES = [
11 'transfer_me.flag', 'extract_me.flag', 'register_me.flag', 'flatiron.flag',
12 'extract_me.error', 'register_me.error', 'create_me.flag', 'compress_video.flag',
13 'compress_audio.flag', 'extract_ephys.flag',
14]
17def read_flag_file(fname):
18 """
19 Flag files are ``*.flag`` files within a session folder used to schedule some jobs
20 If they are empty, should return True
22 :param fname: full file path of the flag file
23 :type fname: str or pahlib.Path
24 :return: None
25 """
26 # the flag file may contains specific file names for a targeted extraction
27 with open(fname) as fid: 1ac
28 save = list(set(list(filter(None, fid.read().splitlines())))) 1ac
29 # if empty, extract everything by default
30 if len(save) == 0: 1ac
31 save = True 1ac
32 return save 1ac
35def excise_flag_file(fname, removed_files=None):
36 """
37 Remove one or several specific files if they figure within the file
38 If no file is left, deletes the flag.
40 :param fname: full file path of the flag file
41 :type fname: str or pahlib.Path
42 :return: None
43 """
44 if not removed_files: 1a
45 return
46 file_names = read_flag_file(fname) 1a
47 # if the file is empty, can't remove a specific file and return
48 if len(file_names) == 0: 1a
49 return
50 if isinstance(removed_files, str): 1a
51 removed_files = [removed_files] 1a
52 new_file_names = list(set(file_names).difference(set(removed_files))) 1a
53 # if the resulting file has no files in it, delete
54 if len(new_file_names) == 0: 1a
55 Path(fname).unlink()
56 else:
57 write_flag_file(fname, file_list=new_file_names, clobber=True) 1a
60def write_flag_file(fname, file_list: list = None, clobber=False):
61 """
62 Flag files are ``*.flag`` files within a session folder used to schedule some jobs
63 Each line references to a file to extract or register
65 :param fname: full file path of the flag file
66 :type fname: str or pathlib.Path
67 :param file_list: None or list of relative paths to write in the file
68 :type file_list: list
69 :param clobber: (False) overwrites the flag file if any
70 :type clobber: bool, optional
71 :return: None
72 """
73 exists = Path(fname).exists() 1a
74 if exists: 1a
75 has_files = Path(fname).stat().st_size != 0 1a
76 else:
77 has_files = False
78 if isinstance(file_list, str) and file_list: 1a
79 file_list = [file_list] 1a
80 if isinstance(file_list, bool): 1a
81 file_list = None 1a
82 if clobber: 1a
83 mode = 'w+' 1a
84 elif exists and has_files and file_list: 1a
85 mode = 'w+' 1a
86 file_list_flag = read_flag_file(fname) 1a
87 # if the file is empty, can't remove a specific file and return
88 if len(file_list_flag) == 0: 1a
89 file_list = [''] + file_list
90 else:
91 file_list = list(set(file_list + file_list_flag)) 1a
92 else:
93 mode = 'w+' 1a
94 if exists and not has_files: 1a
95 file_list = [] 1a
96 with open(fname, mode) as fid: 1a
97 if file_list: 1a
98 fid.write('\n'.join(file_list)) 1a
101def create_register_flags(root_data_folder, force=False, file_list=None):
102 ses_path = Path(root_data_folder).glob('**/raw_behavior_data')
103 for p in ses_path:
104 flag_file = Path(p).parent.joinpath('register_me.flag')
105 if p.parent.joinpath('flatiron.flag').is_file() and not force:
106 continue
107 if p.parent.joinpath('extract_me.error').is_file() and not force:
108 continue
109 if p.parent.joinpath('register_me.error').is_file() and not force:
110 continue
111 if force and p.parent.joinpath('flatiron.flag').is_file():
112 p.parent.joinpath('flatiron.flag').unlink()
113 write_flag_file(flag_file, file_list)
114 logger_.info('created flag: ' + str(flag_file))
117def create_extract_flags(root_data_folder, force=False, file_list=None):
118 # first part is to create extraction flags
119 ses_path = Path(root_data_folder).glob('**/raw_behavior_data')
120 for p in ses_path:
121 flag_file = Path(p).parent.joinpath('extract_me.flag')
122 if p.parent.joinpath('flatiron.flag').is_file() and not force:
123 continue
124 if p.parent.joinpath('extract_me.error').is_file() and not force:
125 continue
126 if p.parent.joinpath('register_me.error').is_file() and not force:
127 continue
128 if force and p.parent.joinpath('flatiron.flag').is_file():
129 p.parent.joinpath('flatiron.flag').unlink()
130 if force and p.parent.joinpath('register_me.flag').is_file():
131 p.parent.joinpath('register_me.flag').unlink()
132 write_flag_file(flag_file, file_list)
133 logger_.info('created flag: ' + str(flag_file))
136def create_transfer_flags(root_data_folder, force=False, file_list=None):
137 create_other_flags(root_data_folder, 'transfer_me.flag', force=False, file_list=None)
140def create_create_flags(root_data_folder, force=False, file_list=None):
141 create_other_flags(root_data_folder, 'create_me.flag', force=False, file_list=None)
144def create_other_flags(root_data_folder, name, force=False, file_list=None):
145 ses_path = Path(root_data_folder).glob('**/raw_behavior_data')
146 for p in ses_path:
147 flag_file = Path(p).parent.joinpath(name)
148 write_flag_file(flag_file)
149 logger_.info('created flag: ' + str(flag_file))
152def create_compress_video_flags(root_data_folder, flag_name='compress_video.flag', clobber=False):
153 # only create flags for raw_video_data folders:
154 video_paths = Path(root_data_folder).glob('**/raw_video_data')
155 for video_path in video_paths:
156 ses_path = video_path.parent
157 flag_file = ses_path.joinpath(flag_name)
158 vfiles = video_path.rglob('*.avi')
159 for vfile in vfiles:
160 logger_.info(str(vfile.relative_to(ses_path)) + ' added to ' + str(flag_file))
161 write_flag_file(flag_file, file_list=str(vfile.relative_to(ses_path)), clobber=clobber)
164def create_audio_flags(root_data_folder, flag_name):
165 # audio flags could be audio_ephys.flag, audio_training.flag
166 if flag_name not in ['audio_ephys.flag', 'audio_training.flag']:
167 raise ValueError('Flag name should be audio_ephys.flag or audio_training.flag')
168 audio_paths = Path(root_data_folder).glob('**/raw_behavior_data')
169 for audio_path in audio_paths:
170 ses_path = audio_path.parent
171 flag_file = ses_path.joinpath(flag_name)
172 afiles = audio_path.rglob('*.wav')
173 for afile in afiles:
174 logger_.info(str(afile.relative_to(ses_path)) + ' added to ' + str(flag_file))
175 write_flag_file(flag_file, file_list=str(afile.relative_to(ses_path)))
178def create_dlc_flags(root_path, dry=False, clobber=False, force=False):
179 # look for all mp4 raw video files
180 root_path = Path(root_path)
181 for file_mp4 in root_path.rglob('_iblrig_leftCamera.raw*.mp4'):
182 ses_path = file_mp4.parents[1]
183 file_label = file_mp4.stem.split('.')[0].split('_')[-1]
184 # skip flag creation if there is a file named _ibl_*Camera.dlc.npy
185 if (ses_path / 'alf' / f'_ibl_{file_label}.dlc.npy').exists() and not force:
186 continue
187 if not dry:
188 write_flag_file(ses_path / 'dlc_training.flag',
189 file_list=[str(file_mp4.relative_to(ses_path))],
190 clobber=clobber)
191 logger_.info(str(ses_path / 'dlc_training.flag'))
194def create_flags(root_data_folder: str or Path, flags: list,
195 force: bool = False, file_list: list = None) -> None:
196 ses_path = Path(root_data_folder).glob('**/raw_behavior_data')
197 for p in ses_path:
198 if 'create' in flags:
199 create_create_flags(root_data_folder, force=force, file_list=file_list)
200 elif 'transfer' in flags:
201 create_transfer_flags(root_data_folder, force=force, file_list=file_list)
202 elif 'extract' in flags:
203 create_extract_flags(root_data_folder, force=force, file_list=file_list)
204 elif 'register' in flags:
205 create_register_flags(root_data_folder, force=force, file_list=file_list)
208def delete_flags(root_data_folder):
209 for f in Path(root_data_folder).rglob('*.flag'):
210 f.unlink()
211 logger_.info('deleted flag: ' + str(f))
212 for f in Path(root_data_folder).rglob('*.error'):
213 f.unlink()
214 logger_.info('deleted flag: ' + str(f))