Coverage for ibllib/io/flags.py: 38%

133 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-08 17:16 +0100

1#!/usr/bin/env python 

2# -*- coding:utf-8 -*- 

3# @Author: Niccolò Bonacchi 

4# @Date: Tuesday, January 22nd 2019, 12:32:14 pm 

5from pathlib import Path 

6import logging 

7 

8logger_ = logging.getLogger(__name__) 

9 

10FLAG_FILE_NAMES = [ 

11 'transfer_me.flag', 'extract_me.flag', 'register_me.flag', 'flatiron.flag', 

12 'extract_me.error', 'register_me.error', 'create_me.flag', 'compress_video.flag', 

13 'compress_audio.flag', 'extract_ephys.flag', 

14] 

15 

16 

17def read_flag_file(fname): 

18 """ 

19 Flag files are ``*.flag`` files within a session folder used to schedule some jobs 

20 If they are empty, should return True 

21 

22 :param fname: full file path of the flag file 

23 :type fname: str or pahlib.Path 

24 :return: None 

25 """ 

26 # the flag file may contains specific file names for a targeted extraction 

27 with open(fname) as fid: 1aqcrdef

28 save = list(set(list(filter(None, fid.read().splitlines())))) 1aqcrdef

29 # if empty, extract everything by default 

30 if len(save) == 0: 1aqcrdef

31 save = True 1aqrde

32 return save 1aqcrdef

33 

34 

35def excise_flag_file(fname, removed_files=None): 

36 """ 

37 Remove one or several specific files if they figure within the file 

38 If no file is left, deletes the flag. 

39 

40 :param fname: full file path of the flag file 

41 :type fname: str or pahlib.Path 

42 :return: None 

43 """ 

44 if not removed_files: 1a

45 return 

46 file_names = read_flag_file(fname) 1a

47 # if the file is empty, can't remove a specific file and return 

48 if len(file_names) == 0: 1a

49 return 

50 if isinstance(removed_files, str): 1a

51 removed_files = [removed_files] 1a

52 new_file_names = list(set(file_names).difference(set(removed_files))) 1a

53 # if the resulting file has no files in it, delete 

54 if len(new_file_names) == 0: 1a

55 Path(fname).unlink() 

56 else: 

57 write_flag_file(fname, file_list=new_file_names, clobber=True) 1a

58 

59 

60def write_flag_file(fname, file_list: list = None, clobber=False): 

61 """ 

62 Flag files are ``*.flag`` files within a session folder used to schedule some jobs 

63 Each line references to a file to extract or register 

64 

65 :param fname: full file path of the flag file 

66 :type fname: str or pathlib.Path 

67 :param file_list: None or list of relative paths to write in the file 

68 :type file_list: list 

69 :param clobber: (False) overwrites the flag file if any 

70 :type clobber: bool, optional 

71 :return: None 

72 """ 

73 exists = Path(fname).exists() 1bakglmnohcidepfj

74 if exists: 1bakglmnohcidepfj

75 has_files = Path(fname).stat().st_size != 0 1ag

76 else: 

77 has_files = False 1bkglmnohcidepfj

78 if isinstance(file_list, str) and file_list: 1bakglmnohcidepfj

79 file_list = [file_list] 1a

80 if isinstance(file_list, bool): 1bakglmnohcidepfj

81 file_list = None 1a

82 if clobber: 1bakglmnohcidepfj

83 mode = 'w+' 1a

84 elif exists and has_files and file_list: 1bakglmnohcidepfj

85 mode = 'w+' 1a

86 file_list_flag = read_flag_file(fname) 1a

87 # if the file is empty, can't remove a specific file and return 

88 if len(file_list_flag) == 0: 1a

89 file_list = [''] + file_list 

90 else: 

91 file_list = list(set(file_list + file_list_flag)) 1a

92 else: 

93 mode = 'w+' 1bakglmnohcidepfj

94 if exists and not has_files: 1bakglmnohcidepfj

95 file_list = [] 1ag

96 with open(fname, mode) as fid: 1bakglmnohcidepfj

97 if file_list: 1bakglmnohcidepfj

98 fid.write('\n'.join(file_list)) 1ahcifj

99 

100 

101def create_register_flags(root_data_folder, force=False, file_list=None): 

102 ses_path = Path(root_data_folder).glob('**/raw_behavior_data') 

103 for p in ses_path: 

104 flag_file = Path(p).parent.joinpath('register_me.flag') 

105 if p.parent.joinpath('flatiron.flag').is_file() and not force: 

106 continue 

107 if p.parent.joinpath('extract_me.error').is_file() and not force: 

108 continue 

109 if p.parent.joinpath('register_me.error').is_file() and not force: 

110 continue 

111 if force and p.parent.joinpath('flatiron.flag').is_file(): 

112 p.parent.joinpath('flatiron.flag').unlink() 

113 write_flag_file(flag_file, file_list) 

114 logger_.info('created flag: ' + str(flag_file)) 

115 

116 

117def create_extract_flags(root_data_folder, force=False, file_list=None): 

118 # first part is to create extraction flags 

119 ses_path = Path(root_data_folder).glob('**/raw_behavior_data') 

120 for p in ses_path: 

121 flag_file = Path(p).parent.joinpath('extract_me.flag') 

122 if p.parent.joinpath('flatiron.flag').is_file() and not force: 

123 continue 

124 if p.parent.joinpath('extract_me.error').is_file() and not force: 

125 continue 

126 if p.parent.joinpath('register_me.error').is_file() and not force: 

127 continue 

128 if force and p.parent.joinpath('flatiron.flag').is_file(): 

129 p.parent.joinpath('flatiron.flag').unlink() 

130 if force and p.parent.joinpath('register_me.flag').is_file(): 

131 p.parent.joinpath('register_me.flag').unlink() 

132 write_flag_file(flag_file, file_list) 

133 logger_.info('created flag: ' + str(flag_file)) 

134 

135 

136def create_transfer_flags(root_data_folder, force=False, file_list=None): 

137 create_other_flags(root_data_folder, 'transfer_me.flag', force=False, file_list=None) 

138 

139 

140def create_create_flags(root_data_folder, force=False, file_list=None): 

141 create_other_flags(root_data_folder, 'create_me.flag', force=False, file_list=None) 

142 

143 

144def create_other_flags(root_data_folder, name, force=False, file_list=None): 

145 ses_path = Path(root_data_folder).glob('**/raw_behavior_data') 

146 for p in ses_path: 

147 flag_file = Path(p).parent.joinpath(name) 

148 write_flag_file(flag_file) 

149 logger_.info('created flag: ' + str(flag_file)) 

150 

151 

152def create_compress_video_flags(root_data_folder, flag_name='compress_video.flag', clobber=False): 

153 # only create flags for raw_video_data folders: 

154 video_paths = Path(root_data_folder).glob('**/raw_video_data') 

155 for video_path in video_paths: 

156 ses_path = video_path.parent 

157 flag_file = ses_path.joinpath(flag_name) 

158 vfiles = video_path.rglob('*.avi') 

159 for vfile in vfiles: 

160 logger_.info(str(vfile.relative_to(ses_path)) + ' added to ' + str(flag_file)) 

161 write_flag_file(flag_file, file_list=str(vfile.relative_to(ses_path)), clobber=clobber) 

162 

163 

164def create_audio_flags(root_data_folder, flag_name): 

165 # audio flags could be audio_ephys.flag, audio_training.flag 

166 if flag_name not in ['audio_ephys.flag', 'audio_training.flag']: 

167 raise ValueError('Flag name should be audio_ephys.flag or audio_training.flag') 

168 audio_paths = Path(root_data_folder).glob('**/raw_behavior_data') 

169 for audio_path in audio_paths: 

170 ses_path = audio_path.parent 

171 flag_file = ses_path.joinpath(flag_name) 

172 afiles = audio_path.rglob('*.wav') 

173 for afile in afiles: 

174 logger_.info(str(afile.relative_to(ses_path)) + ' added to ' + str(flag_file)) 

175 write_flag_file(flag_file, file_list=str(afile.relative_to(ses_path))) 

176 

177 

178def create_dlc_flags(root_path, dry=False, clobber=False, force=False): 

179 # look for all mp4 raw video files 

180 root_path = Path(root_path) 

181 for file_mp4 in root_path.rglob('_iblrig_leftCamera.raw*.mp4'): 

182 ses_path = file_mp4.parents[1] 

183 file_label = file_mp4.stem.split('.')[0].split('_')[-1] 

184 # skip flag creation if there is a file named _ibl_*Camera.dlc.npy 

185 if (ses_path / 'alf' / f'_ibl_{file_label}.dlc.npy').exists() and not force: 

186 continue 

187 if not dry: 

188 write_flag_file(ses_path / 'dlc_training.flag', 

189 file_list=[str(file_mp4.relative_to(ses_path))], 

190 clobber=clobber) 

191 logger_.info(str(ses_path / 'dlc_training.flag')) 

192 

193 

194def create_flags(root_data_folder: str or Path, flags: list, 

195 force: bool = False, file_list: list = None) -> None: 

196 ses_path = Path(root_data_folder).glob('**/raw_behavior_data') 

197 for p in ses_path: 

198 if 'create' in flags: 

199 create_create_flags(root_data_folder, force=force, file_list=file_list) 

200 elif 'transfer' in flags: 

201 create_transfer_flags(root_data_folder, force=force, file_list=file_list) 

202 elif 'extract' in flags: 

203 create_extract_flags(root_data_folder, force=force, file_list=file_list) 

204 elif 'register' in flags: 

205 create_register_flags(root_data_folder, force=force, file_list=file_list) 

206 

207 

208def delete_flags(root_data_folder): 

209 for f in Path(root_data_folder).rglob('*.flag'): 

210 f.unlink() 

211 logger_.info('deleted flag: ' + str(f)) 

212 for f in Path(root_data_folder).rglob('*.error'): 

213 f.unlink() 

214 logger_.info('deleted flag: ' + str(f))