Coverage for ibllib/pipes/video_tasks.py: 58%

226 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-10-11 11:13 +0100

1import logging 

2import subprocess 

3import cv2 

4import traceback 

5from pathlib import Path 

6 

7from ibllib.io import ffmpeg, raw_daq_loaders 

8from ibllib.pipes import base_tasks 

9from ibllib.io.video import get_video_meta 

10from ibllib.io.extractors import camera 

11from ibllib.qc.camera import run_all_qc as run_camera_qc 

12from ibllib.misc import check_nvidia_driver 

13from ibllib.io.video import label_from_path, assert_valid_label 

14 

15_logger = logging.getLogger('ibllib') 

16 

17 

18class VideoRegisterRaw(base_tasks.VideoTask, base_tasks.RegisterRawDataTask): 

19 """ 

20 Task to register raw video data. Builds up list of files to register from list of cameras given in session params file 

21 """ 

22 

23 priority = 100 

24 job_size = 'small' 

25 

26 @property 

27 def signature(self): 

28 signature = { 1c

29 'input_files': [], 

30 'output_files': 

31 [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] + 

32 [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] + 

33 [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] + 

34 [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] + 

35 [('_iblrig_videoCodeFiles.raw*', self.device_collection, False)] 

36 } 

37 return signature 1c

38 

39 def assert_expected_outputs(self, raise_error=True): 

40 """ 

41 frameData replaces the timestamps file. Therefore if frameData is present, timestamps is 

42 optional and vice versa. 

43 """ 

44 assert self.status == 0 

45 _logger.info('Checking output files') 

46 everything_is_fine, files = self.assert_expected(self.output_files) 

47 

48 required = any('Camera.frameData' in x or 'Camera.timestamps' in x for x in map(str, files)) 

49 if not (everything_is_fine and required): 

50 for out in self.outputs: 

51 _logger.error(f"{out}") 

52 if raise_error: 

53 raise FileNotFoundError("Missing outputs after task completion") 

54 

55 return everything_is_fine, files 

56 

57 

58class VideoCompress(base_tasks.VideoTask): 

59 """ 

60 Task to compress raw video data from .avi to .mp4 format. 

61 """ 

62 priority = 90 

63 job_size = 'large' 

64 

65 @property 

66 def signature(self): 

67 signature = { 1acgh

68 'input_files': [(f'_iblrig_{cam}Camera.raw.*', self.device_collection, True) for cam in self.cameras], 

69 'output_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] 

70 } 

71 return signature 1acgh

72 

73 def _run(self): 

74 # TODO different compression parameters based on whether it is training or not based on number of cameras? 

75 # avi to mp4 compression 

76 if self.sync == 'bpod': 1acgh

77 command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 29 ' 1cg

78 '-nostats -codec:a copy {file_out}') 

79 else: 

80 command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 17 ' 1ah

81 '-loglevel 0 -codec:a copy {file_out}') 

82 

83 output_files = ffmpeg.iblrig_video_compression(self.session_path, command) 1acgh

84 

85 if len(output_files) == 0: 1acgh

86 _logger.info('No compressed videos found') 

87 return 

88 

89 return output_files 1acgh

90 

91 

92class VideoConvert(base_tasks.VideoTask): 

93 """ 

94 Task that converts compressed avi to mp4 format and renames video and camlog files. Specific to UCLA widefield implementation 

95 """ 

96 priority = 90 

97 job_size = 'small' 

98 

99 @property 

100 def signature(self): 

101 signature = { 1b

102 'input_files': [(f'{cam}_cam*.avi', self.device_collection, True) for cam in self.cameras] + 

103 [(f'{cam}_cam*.camlog', self.device_collection, False) for cam in self.cameras], 

104 'output_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + 

105 [(f'_iblrig_{cam}Camera.raw.camlog', self.device_collection, True) for cam in self.cameras] 

106 } 

107 

108 return signature 1b

109 

110 def _run(self): 

111 output_files = [] 1b

112 for cam in self.cameras: 1b

113 

114 # rename and register the camlog files 

115 camlog_file = next(self.session_path.joinpath(self.device_collection).glob(f'{cam}_cam*.camlog')) 1b

116 new_camlog_file = self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.camlog') 1b

117 camlog_file.replace(new_camlog_file) 1b

118 output_files.append(new_camlog_file) 1b

119 

120 # convert the avi files to mp4 

121 avi_file = next(self.session_path.joinpath(self.device_collection).glob(f'{cam}_cam*.avi')) 1b

122 mp4_file = self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.mp4') 1b

123 command2run = f"ffmpeg -i {str(avi_file)} -c:v copy -c:a copy -y {str(mp4_file)}" 1b

124 

125 process = subprocess.Popen(command2run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 1b

126 info, error = process.communicate() 1b

127 if process.returncode == 0: 1b

128 # check the video meta matched and remove the original file 

129 meta_avi = get_video_meta(avi_file) 1b

130 _ = meta_avi.pop('size') 1b

131 meta_mp4 = get_video_meta(mp4_file) 1b

132 match = True 1b

133 for key in meta_avi.keys(): 1b

134 if meta_avi[key] != meta_mp4[key]: 1b

135 match = False 

136 

137 # if all checks out we can remove the original avi 

138 if match: 1b

139 avi_file.unlink() 1b

140 output_files.append(mp4_file) 1b

141 else: 

142 _logger.error(f'avi and mp4 meta data do not match for {avi_file}') 

143 else: 

144 _logger.error(f'conversion to mp4 failed for {avi_file}: {error}') 

145 

146 return output_files 1b

147 

148 

149class VideoSyncQcCamlog(base_tasks.VideoTask): 

150 """ 

151 Task to sync camera timestamps to main DAQ timestamps when camlog files are used. Specific to UCLA widefield implementation 

152 """ 

153 priority = 40 

154 job_size = 'small' 

155 

156 @property 

157 def signature(self): 

158 signature = { 1f

159 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + 

160 [(f'_iblrig_{cam}Camera.raw.camlog', self.device_collection, False) for cam in self.cameras] + 

161 [(f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True), 

162 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True), 

163 (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True), 

164 ('*.wiring.json', self.sync_collection, True), 

165 ('*wheel.position.npy', 'alf', False), 

166 ('*wheel.timestamps.npy', 'alf', False)], 

167 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras] 

168 } 

169 

170 return signature 1f

171 

172 def _run(self, qc=True, **kwargs): 

173 

174 mp4_files = self.session_path.joinpath(self.device_collection).rglob('*.mp4') 1f

175 labels = [label_from_path(x) for x in mp4_files] 1f

176 

177 # Video timestamps extraction 

178 output_files = [] 1f

179 data, files = camera.extract_all(self.session_path, sync_type=self.sync, sync_collection=self.sync_collection, 1f

180 save=True, labels=labels, camlog=True) 

181 output_files.extend(files) 1f

182 

183 # Video QC 

184 if qc: 1f

185 run_camera_qc(self.session_path, update=True, one=self.one, cameras=labels, camlog=True, 

186 sync_collection=self.sync_collection, sync_type=self.sync) 

187 

188 return output_files 1f

189 

190 

191class VideoSyncQcBpod(base_tasks.VideoTask): 

192 """ 

193 Task to sync camera timestamps to main DAQ timestamps 

194 N.B Signatures only reflect new daq naming convention, non compatible with ephys when not running on server 

195 """ 

196 priority = 40 

197 job_size = 'small' 

198 

199 def __init__(self, *args, **kwargs): 

200 super().__init__(*args, **kwargs) 1cijkdl

201 # Task collection (this needs to be specified in the task kwargs) 

202 self.collection = self.get_task_collection(kwargs.get('collection', None)) 1cijkdl

203 # Task type (protocol) 

204 self.protocol = self.get_protocol(kwargs.get('protocol', None), task_collection=self.collection) 1cijkdl

205 

206 @property 

207 def signature(self): 

208 signature = { 1cd

209 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + 

210 [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] + 

211 [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] + 

212 [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] + 

213 [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] + 

214 [('_iblrig_taskData.raw.*', self.collection, True), 

215 ('_iblrig_taskSettings.raw.*', self.collection, True), 

216 ('*wheel.position.npy', 'alf', False), 

217 ('*wheel.timestamps.npy', 'alf', False)], 

218 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras] 

219 } 

220 

221 return signature 1cd

222 

223 def _run(self, **kwargs): 

224 

225 mp4_files = self.session_path.joinpath(self.device_collection).rglob('*.mp4') 1cd

226 labels = [label_from_path(x) for x in mp4_files] 1cd

227 

228 # Video timestamps extraction 

229 output_files = [] 1cd

230 data, files = camera.extract_all(self.session_path, sync_type=self.sync, sync_collection=self.sync_collection, 1cd

231 save=True, labels=labels, task_collection=self.collection) 

232 output_files.extend(files) 1cd

233 

234 # Video QC 

235 run_camera_qc(self.session_path, update=True, one=self.one, cameras=labels, 1cd

236 sync_collection=self.sync_collection, sync_type=self.sync) 

237 

238 return output_files 1cd

239 

240 

241class VideoSyncQcNidq(base_tasks.VideoTask): 

242 """ 

243 Task to sync camera timestamps to main DAQ timestamps 

244 N.B Signatures only reflect new daq naming convention, non compatible with ephys when not running on server 

245 """ 

246 priority = 40 

247 job_size = 'small' 

248 

249 @property 

250 def signature(self): 

251 signature = { 1e

252 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + 

253 [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] + 

254 [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] + 

255 [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] + 

256 [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] + 

257 [(f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True), 

258 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True), 

259 (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True), 

260 ('*.wiring.json', self.sync_collection, True), 

261 ('*wheel.position.npy', 'alf', False), 

262 ('*wheel.timestamps.npy', 'alf', False), 

263 ('*experiment.description*', '', False)], 

264 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras] 

265 } 

266 

267 return signature 1e

268 

269 def _run(self, **kwargs): 

270 

271 mp4_files = self.session_path.joinpath(self.device_collection).glob('*.mp4') 1e

272 labels = [label_from_path(x) for x in mp4_files] 1e

273 labels = [lab for lab in labels if lab in ('left', 'right', 'body', 'belly')] 1e

274 

275 kwargs = {} 1e

276 if self.sync_namespace == 'timeline': 1e

277 # Load sync from timeline file 

278 alf_path = self.session_path / self.sync_collection 

279 kwargs['sync'], kwargs['chmap'] = raw_daq_loaders.load_timeline_sync_and_chmap(alf_path) 

280 

281 # Video timestamps extraction 

282 output_files = [] 1e

283 data, files = camera.extract_all(self.session_path, sync_type=self.sync, sync_collection=self.sync_collection, 1e

284 save=True, labels=labels, **kwargs) 

285 output_files.extend(files) 1e

286 

287 # Video QC 

288 run_camera_qc(self.session_path, update=True, one=self.one, cameras=labels, 1e

289 sync_collection=self.sync_collection, sync_type=self.sync) 

290 

291 return output_files 1e

292 

293 

294class DLC(base_tasks.VideoTask): 

295 """ 

296 This task relies on a correctly installed dlc environment as per 

297 https://docs.google.com/document/d/1g0scP6_3EmaXCU4SsDNZWwDTaD9MG0es_grLA-d0gh0/edit# 

298 

299 If your environment is set up otherwise, make sure that you set the respective attributes: 

300 t = EphysDLC(session_path) 

301 t.dlcenv = Path('/path/to/your/dlcenv/bin/activate') 

302 t.scripts = Path('/path/to/your/iblscripts/deploy/serverpc/dlc') 

303 """ 

304 gpu = 1 

305 cpu = 4 

306 io_charge = 100 

307 level = 2 

308 force = True 

309 job_size = 'large' 

310 

311 dlcenv = Path.home().joinpath('Documents', 'PYTHON', 'envs', 'dlcenv', 'bin', 'activate') 

312 scripts = Path.home().joinpath('Documents', 'PYTHON', 'iblscripts', 'deploy', 'serverpc', 'dlc') 

313 

314 @property 

315 def signature(self): 

316 signature = { 

317 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras], 

318 'output_files': [(f'_ibl_{cam}Camera.dlc.pqt', 'alf', True) for cam in self.cameras] + 

319 [(f'{cam}Camera.ROIMotionEnergy.npy', 'alf', True) for cam in self.cameras] + 

320 [(f'{cam}ROIMotionEnergy.position.npy', 'alf', True)for cam in self.cameras] 

321 } 

322 

323 return signature 

324 

325 def _check_dlcenv(self): 

326 """Check that scripts are present, dlcenv can be activated and get iblvideo version""" 

327 assert len(list(self.scripts.rglob('run_dlc.*'))) == 2, \ 

328 f'Scripts run_dlc.sh and run_dlc.py do not exist in {self.scripts}' 

329 assert len(list(self.scripts.rglob('run_motion.*'))) == 2, \ 

330 f'Scripts run_motion.sh and run_motion.py do not exist in {self.scripts}' 

331 assert self.dlcenv.exists(), f"DLC environment does not exist in assumed location {self.dlcenv}" 

332 command2run = f"source {self.dlcenv}; python -c 'import iblvideo; print(iblvideo.__version__)'" 

333 process = subprocess.Popen( 

334 command2run, 

335 shell=True, 

336 stdout=subprocess.PIPE, 

337 stderr=subprocess.PIPE, 

338 executable="/bin/bash" 

339 ) 

340 info, error = process.communicate() 

341 if process.returncode != 0: 

342 raise AssertionError(f"DLC environment check failed\n{error.decode('utf-8')}") 

343 version = info.decode("utf-8").strip().split('\n')[-1] 

344 return version 

345 

346 @staticmethod 

347 def _video_intact(file_mp4): 

348 """Checks that the downloaded video can be opened and is not empty""" 

349 cap = cv2.VideoCapture(str(file_mp4)) 

350 frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) 

351 intact = True if frame_count > 0 else False 

352 cap.release() 

353 return intact 

354 

355 def _run(self, cams=None, overwrite=False): 

356 # Check that the cams are valid for DLC, remove the ones that aren't 

357 candidate_cams = cams or self.cameras 

358 cams = [] 

359 for cam in candidate_cams: 

360 try: 

361 cams.append(assert_valid_label(cam)) 

362 except ValueError: 

363 _logger.warning(f'{cam} is not a valid video label, this video will be skipped') 

364 # Set up 

365 self.session_id = self.one.path2eid(self.session_path) 

366 actual_outputs = [] 

367 

368 # Loop through cams 

369 for cam in cams: 

370 # Catch exceptions so that following cameras can still run 

371 try: 

372 # If all results exist and overwrite is False, skip computation 

373 expected_outputs_present, expected_outputs = self.assert_expected(self.output_files, silent=True) 

374 if overwrite is False and expected_outputs_present is True: 

375 actual_outputs.extend(expected_outputs) 

376 return actual_outputs 

377 else: 

378 file_mp4 = next(self.session_path.joinpath('raw_video_data').glob(f'_iblrig_{cam}Camera.raw*.mp4')) 

379 if not file_mp4.exists(): 

380 # In this case we set the status to Incomplete. 

381 _logger.error(f"No raw video file available for {cam}, skipping.") 

382 self.status = -3 

383 continue 

384 if not self._video_intact(file_mp4): 

385 _logger.error(f"Corrupt raw video file {file_mp4}") 

386 self.status = -1 

387 continue 

388 # Check that dlc environment is ok, shell scripts exists, and get iblvideo version, GPU addressable 

389 self.version = self._check_dlcenv() 

390 _logger.info(f'iblvideo version {self.version}') 

391 check_nvidia_driver() 

392 

393 _logger.info(f'Running DLC on {cam}Camera.') 

394 command2run = f"{self.scripts.joinpath('run_dlc.sh')} {str(self.dlcenv)} {file_mp4} {overwrite}" 

395 _logger.info(command2run) 

396 process = subprocess.Popen( 

397 command2run, 

398 shell=True, 

399 stdout=subprocess.PIPE, 

400 stderr=subprocess.PIPE, 

401 executable="/bin/bash", 

402 ) 

403 info, error = process.communicate() 

404 # info_str = info.decode("utf-8").strip() 

405 # _logger.info(info_str) 

406 if process.returncode != 0: 

407 error_str = error.decode("utf-8").strip() 

408 _logger.error(f'DLC failed for {cam}Camera.\n\n' 

409 f'++++++++ Output of subprocess for debugging ++++++++\n\n' 

410 f'{error_str}\n' 

411 f'++++++++++++++++++++++++++++++++++++++++++++\n') 

412 self.status = -1 

413 # We dont' run motion energy, or add any files if dlc failed to run 

414 continue 

415 dlc_result = next(self.session_path.joinpath('alf').glob(f'_ibl_{cam}Camera.dlc*.pqt')) 

416 actual_outputs.append(dlc_result) 

417 

418 _logger.info(f'Computing motion energy for {cam}Camera') 

419 command2run = f"{self.scripts.joinpath('run_motion.sh')} {str(self.dlcenv)} {file_mp4} {dlc_result}" 

420 _logger.info(command2run) 

421 process = subprocess.Popen( 

422 command2run, 

423 shell=True, 

424 stdout=subprocess.PIPE, 

425 stderr=subprocess.PIPE, 

426 executable="/bin/bash", 

427 ) 

428 info, error = process.communicate() 

429 # info_str = info.decode("utf-8").strip() 

430 # _logger.info(info_str) 

431 if process.returncode != 0: 

432 error_str = error.decode("utf-8").strip() 

433 _logger.error(f'Motion energy failed for {cam}Camera.\n\n' 

434 f'++++++++ Output of subprocess for debugging ++++++++\n\n' 

435 f'{error_str}\n' 

436 f'++++++++++++++++++++++++++++++++++++++++++++\n') 

437 self.status = -1 

438 continue 

439 actual_outputs.append(next(self.session_path.joinpath('alf').glob( 

440 f'{cam}Camera.ROIMotionEnergy*.npy'))) 

441 actual_outputs.append(next(self.session_path.joinpath('alf').glob( 

442 f'{cam}ROIMotionEnergy.position*.npy'))) 

443 except BaseException: 

444 _logger.error(traceback.format_exc()) 

445 self.status = -1 

446 continue 

447 # If status is Incomplete, check that there is at least one output. 

448 # # Otherwise make sure it gets set to Empty (outputs = None), and set status to -1 to make sure it doesn't slip 

449 if self.status == -3 and len(actual_outputs) == 0: 

450 actual_outputs = None 

451 self.status = -1 

452 return actual_outputs