Coverage for ibllib/pipes/video_tasks.py: 45%

396 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-08 17:16 +0100

1import logging 

2import subprocess 

3import time 

4import traceback 

5from pathlib import Path 

6 

7import cv2 

8import pandas as pd 

9import numpy as np 

10 

11from ibllib.qc.dlc import DlcQC 

12from ibllib.io import ffmpeg, raw_daq_loaders 

13from ibllib.pipes import base_tasks 

14from ibllib.io.video import get_video_meta 

15from ibllib.io.extractors import camera 

16from ibllib.qc.camera import run_all_qc as run_camera_qc 

17from ibllib.misc import check_nvidia_driver 

18from ibllib.io.video import label_from_path, assert_valid_label 

19from ibllib.plots.snapshot import ReportSnapshot 

20from ibllib.plots.figures import dlc_qc_plot 

21from brainbox.behavior.dlc import likelihood_threshold, get_licks, get_pupil_diameter, get_smooth_pupil_diameter 

22 

23_logger = logging.getLogger('ibllib') 

24 

25 

26class VideoRegisterRaw(base_tasks.VideoTask, base_tasks.RegisterRawDataTask): 

27 """ 

28 Task to register raw video data. Builds up list of files to register from list of cameras given in session params file 

29 """ 

30 

31 priority = 100 

32 job_size = 'small' 

33 

34 @property 

35 def signature(self): 

36 signature = { 1adijklemc

37 'input_files': [], 

38 'output_files': 

39 [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] + 

40 [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] + 

41 [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] + 

42 [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] + 

43 [('_iblrig_videoCodeFiles.raw*', self.device_collection, False)] 

44 } 

45 return signature 1adijklemc

46 

47 def assert_expected_outputs(self, raise_error=True): 

48 """ 

49 frameData replaces the timestamps file. Therefore if frameData is present, timestamps is 

50 optional and vice versa. 

51 """ 

52 assert self.status == 0 

53 _logger.info('Checking output files') 

54 everything_is_fine, files = self.assert_expected(self.output_files) 

55 

56 required = any('Camera.frameData' in x or 'Camera.timestamps' in x for x in map(str, files)) 

57 if not (everything_is_fine and required): 

58 for out in self.outputs: 

59 _logger.error(f'{out}') 

60 if raise_error: 

61 raise FileNotFoundError('Missing outputs after task completion') 

62 

63 return everything_is_fine, files 

64 

65 

66class VideoCompress(base_tasks.VideoTask): 

67 """ 

68 Task to compress raw video data from .avi to .mp4 format. 

69 """ 

70 priority = 90 

71 job_size = 'large' 

72 

73 @property 

74 def signature(self): 

75 signature = { 1adijklempqc

76 'input_files': [(f'_iblrig_{cam}Camera.raw.*', self.device_collection, True) for cam in self.cameras], 

77 'output_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] 

78 } 

79 return signature 1adijklempqc

80 

81 def _run(self): 

82 # TODO different compression parameters based on whether it is training or not based on number of cameras? 

83 # avi to mp4 compression 

84 if self.sync == 'bpod': 1adpqc

85 command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 29 ' 1dpc

86 '-nostats -codec:a copy {file_out}') 

87 else: 

88 command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 17 ' 1aq

89 '-loglevel 0 -codec:a copy {file_out}') 

90 

91 output_files = ffmpeg.iblrig_video_compression(self.session_path, command) 1adpqc

92 

93 if len(output_files) == 0: 1adpqc

94 _logger.info('No compressed videos found') 1c

95 return 1c

96 

97 return output_files 1adpqc

98 

99 

100class VideoConvert(base_tasks.VideoTask): 

101 """ 

102 Task that converts compressed avi to mp4 format and renames video and camlog files. Specific to UCLA widefield implementation 

103 """ 

104 priority = 90 

105 job_size = 'small' 

106 

107 @property 

108 def signature(self): 

109 signature = { 1ob

110 'input_files': [(f'{cam}_cam*.avi', self.device_collection, True) for cam in self.cameras] + 

111 [(f'{cam}_cam*.camlog', self.device_collection, False) for cam in self.cameras], 

112 'output_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + 

113 [(f'_iblrig_{cam}Camera.raw.camlog', self.device_collection, True) for cam in self.cameras] 

114 } 

115 

116 return signature 1ob

117 

118 def _run(self): 

119 output_files = [] 1b

120 for cam in self.cameras: 1b

121 

122 # rename and register the camlog files 

123 camlog_file = next(self.session_path.joinpath(self.device_collection).glob(f'{cam}_cam*.camlog')) 1b

124 new_camlog_file = self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.camlog') 1b

125 camlog_file.replace(new_camlog_file) 1b

126 output_files.append(new_camlog_file) 1b

127 

128 # convert the avi files to mp4 

129 avi_file = next(self.session_path.joinpath(self.device_collection).glob(f'{cam}_cam*.avi')) 1b

130 mp4_file = self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.mp4') 1b

131 command2run = f'ffmpeg -i {str(avi_file)} -c:v copy -c:a copy -y {str(mp4_file)}' 1b

132 

133 process = subprocess.Popen(command2run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 1b

134 info, error = process.communicate() 1b

135 if process.returncode == 0: 1b

136 # check the video meta matched and remove the original file 

137 meta_avi = get_video_meta(avi_file) 1b

138 _ = meta_avi.pop('size') 1b

139 meta_mp4 = get_video_meta(mp4_file) 1b

140 match = True 1b

141 for key in meta_avi.keys(): 1b

142 if meta_avi[key] != meta_mp4[key]: 1b

143 match = False 

144 

145 # if all checks out we can remove the original avi 

146 if match: 1b

147 avi_file.unlink() 1b

148 output_files.append(mp4_file) 1b

149 else: 

150 _logger.error(f'avi and mp4 meta data do not match for {avi_file}') 

151 else: 

152 _logger.error(f'conversion to mp4 failed for {avi_file}: {error}') 

153 

154 return output_files 1b

155 

156 

157class VideoSyncQcCamlog(base_tasks.VideoTask): 

158 """ 

159 Task to sync camera timestamps to main DAQ timestamps when camlog files are used. Specific to UCLA widefield implementation 

160 """ 

161 priority = 40 

162 job_size = 'small' 

163 

164 @property 

165 def signature(self): 

166 signature = { 1on

167 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + 

168 [(f'_iblrig_{cam}Camera.raw.camlog', self.device_collection, False) for cam in self.cameras] + 

169 [(f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True), 

170 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True), 

171 (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True), 

172 ('*.wiring.json', self.sync_collection, True), 

173 ('*wheel.position.npy', 'alf', False), 

174 ('*wheel.timestamps.npy', 'alf', False)], 

175 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras] 

176 } 

177 

178 return signature 1on

179 

180 def _run(self, qc=True, **kwargs): 

181 

182 mp4_files = self.session_path.joinpath(self.device_collection).rglob('*.mp4') 1n

183 labels = [label_from_path(x) for x in mp4_files] 1n

184 

185 # Video timestamps extraction 

186 output_files = [] 1n

187 data, files = camera.extract_all(self.session_path, sync_type=self.sync, sync_collection=self.sync_collection, 1n

188 save=True, labels=labels, camlog=True) 

189 output_files.extend(files) 1n

190 

191 # Video QC 

192 if qc: 1n

193 run_camera_qc(self.session_path, update=True, one=self.one, cameras=labels, camlog=True, 

194 sync_collection=self.sync_collection, sync_type=self.sync) 

195 

196 return output_files 1n

197 

198 

199class VideoSyncQcBpod(base_tasks.VideoTask): 

200 """ 

201 Task to sync camera timestamps to main DAQ timestamps 

202 N.B Signatures only reflect new daq naming convention, non-compatible with ephys when not running on server 

203 """ 

204 priority = 40 

205 job_size = 'small' 

206 

207 def __init__(self, *args, **kwargs): 

208 super().__init__(*args, **kwargs) 1dersfc

209 # Task collection (this needs to be specified in the task kwargs) 

210 self.collection = self.get_task_collection(kwargs.get('collection', None)) 1dersfc

211 # Task type (protocol) 

212 self.protocol = self.get_protocol(kwargs.get('protocol', None), task_collection=self.collection) 1dersfc

213 

214 @property 

215 def signature(self): 

216 signature = { 1dfc

217 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + 

218 [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] + 

219 [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] + 

220 [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] + 

221 [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] + 

222 [('_iblrig_taskData.raw.*', self.collection, True), 

223 ('_iblrig_taskSettings.raw.*', self.collection, True), 

224 ('*wheel.position.npy', 'alf', False), 

225 ('*wheel.timestamps.npy', 'alf', False)], 

226 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras] 

227 } 

228 

229 return signature 1dfc

230 

231 def _run(self, **kwargs): 

232 

233 mp4_files = self.session_path.joinpath(self.device_collection).rglob('*.mp4') 1dfc

234 labels = [label_from_path(x) for x in mp4_files] 1dfc

235 

236 # Video timestamps extraction 

237 output_files = [] 1dfc

238 data, files = camera.extract_all(self.session_path, sync_type=self.sync, sync_collection=self.sync_collection, 1dfc

239 save=True, labels=labels, task_collection=self.collection) 

240 output_files.extend(files) 1dfc

241 

242 # Video QC 

243 run_camera_qc(self.session_path, update=True, one=self.one, cameras=labels, 1dfc

244 sync_collection=self.sync_collection, sync_type=self.sync) 

245 

246 return output_files 1dfc

247 

248 

249class VideoSyncQcNidq(base_tasks.VideoTask): 

250 """ 

251 Task to sync camera timestamps to main DAQ timestamps 

252 N.B Signatures only reflect new daq naming convention, non-compatible with ephys when not running on server 

253 """ 

254 priority = 40 

255 job_size = 'small' 

256 

257 @property 

258 def signature(self): 

259 signature = { 1aijklemg

260 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + 

261 [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] + 

262 [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] + 

263 [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] + 

264 [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] + 

265 [(f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True), 

266 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True), 

267 (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True), 

268 ('*.wiring.json', self.sync_collection, True), 

269 ('*wheel.position.npy', 'alf', False), 

270 ('*wheel.timestamps.npy', 'alf', False), 

271 ('*experiment.description*', '', False)], 

272 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras] 

273 } 

274 

275 return signature 1aijklemg

276 

277 def _run(self, **kwargs): 

278 

279 mp4_files = self.session_path.joinpath(self.device_collection).glob('*.mp4') 1g

280 labels = [label_from_path(x) for x in mp4_files] 1g

281 labels = [lab for lab in labels if lab in ('left', 'right', 'body', 'belly')] 1g

282 

283 kwargs = {} 1g

284 if self.sync_namespace == 'timeline': 1g

285 # Load sync from timeline file 

286 alf_path = self.session_path / self.sync_collection 

287 kwargs['sync'], kwargs['chmap'] = raw_daq_loaders.load_timeline_sync_and_chmap(alf_path) 

288 

289 # Video timestamps extraction 

290 output_files = [] 1g

291 data, files = camera.extract_all(self.session_path, sync_type=self.sync, sync_collection=self.sync_collection, 1g

292 save=True, labels=labels, **kwargs) 

293 output_files.extend(files) 1g

294 

295 # Video QC 

296 run_camera_qc(self.session_path, update=True, one=self.one, cameras=labels, 1g

297 sync_collection=self.sync_collection, sync_type=self.sync) 

298 

299 return output_files 1g

300 

301 

302class DLC(base_tasks.VideoTask): 

303 """ 

304 This task relies on a correctly installed dlc environment as per 

305 https://docs.google.com/document/d/1g0scP6_3EmaXCU4SsDNZWwDTaD9MG0es_grLA-d0gh0/edit# 

306 

307 If your environment is set up otherwise, make sure that you set the respective attributes: 

308 t = EphysDLC(session_path) 

309 t.dlcenv = Path('/path/to/your/dlcenv/bin/activate') 

310 t.scripts = Path('/path/to/your/iblscripts/deploy/serverpc/dlc') 

311 """ 

312 gpu = 1 

313 cpu = 4 

314 io_charge = 100 

315 level = 2 

316 force = True 

317 job_size = 'large' 

318 

319 dlcenv = Path.home().joinpath('Documents', 'PYTHON', 'envs', 'dlcenv', 'bin', 'activate') 

320 scripts = Path.home().joinpath('Documents', 'PYTHON', 'iblscripts', 'deploy', 'serverpc', 'dlc') 

321 

322 @property 

323 def signature(self): 

324 signature = { 1aijklemo

325 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras], 

326 'output_files': [(f'_ibl_{cam}Camera.dlc.pqt', 'alf', True) for cam in self.cameras] + 

327 [(f'{cam}Camera.ROIMotionEnergy.npy', 'alf', True) for cam in self.cameras] + 

328 [(f'{cam}ROIMotionEnergy.position.npy', 'alf', True)for cam in self.cameras] 

329 } 

330 

331 return signature 1aijklemo

332 

333 def _check_dlcenv(self): 

334 """Check that scripts are present, dlcenv can be activated and get iblvideo version""" 

335 assert len(list(self.scripts.rglob('run_dlc.*'))) == 2, \ 

336 f'Scripts run_dlc.sh and run_dlc.py do not exist in {self.scripts}' 

337 assert len(list(self.scripts.rglob('run_motion.*'))) == 2, \ 

338 f'Scripts run_motion.sh and run_motion.py do not exist in {self.scripts}' 

339 assert self.dlcenv.exists(), f'DLC environment does not exist in assumed location {self.dlcenv}' 

340 command2run = f"source {self.dlcenv}; python -c 'import iblvideo; print(iblvideo.__version__)'" 

341 process = subprocess.Popen( 

342 command2run, 

343 shell=True, 

344 stdout=subprocess.PIPE, 

345 stderr=subprocess.PIPE, 

346 executable='/bin/bash' 

347 ) 

348 info, error = process.communicate() 

349 if process.returncode != 0: 

350 raise AssertionError(f"DLC environment check failed\n{error.decode('utf-8')}") 

351 version = info.decode('utf-8').strip().split('\n')[-1] 

352 return version 

353 

354 @staticmethod 

355 def _video_intact(file_mp4): 

356 """Checks that the downloaded video can be opened and is not empty""" 

357 cap = cv2.VideoCapture(str(file_mp4)) 

358 frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) 

359 intact = True if frame_count > 0 else False 

360 cap.release() 

361 return intact 

362 

363 def _run(self, cams=None, overwrite=False): 

364 # Check that the cams are valid for DLC, remove the ones that aren't 

365 candidate_cams = cams or self.cameras 

366 cams = [] 

367 for cam in candidate_cams: 

368 try: 

369 cams.append(assert_valid_label(cam)) 

370 except ValueError: 

371 _logger.warning(f'{cam} is not a valid video label, this video will be skipped') 

372 # Set up 

373 self.session_id = self.one.path2eid(self.session_path) 

374 actual_outputs = [] 

375 

376 # Loop through cams 

377 for cam in cams: 

378 # Catch exceptions so that following cameras can still run 

379 try: 

380 # If all results exist and overwrite is False, skip computation 

381 expected_outputs_present, expected_outputs = self.assert_expected(self.output_files, silent=True) 

382 if overwrite is False and expected_outputs_present is True: 

383 actual_outputs.extend(expected_outputs) 

384 return actual_outputs 

385 else: 

386 file_mp4 = next(self.session_path.joinpath('raw_video_data').glob(f'_iblrig_{cam}Camera.raw*.mp4')) 

387 if not file_mp4.exists(): 

388 # In this case we set the status to Incomplete. 

389 _logger.error(f'No raw video file available for {cam}, skipping.') 

390 self.status = -3 

391 continue 

392 if not self._video_intact(file_mp4): 

393 _logger.error(f'Corrupt raw video file {file_mp4}') 

394 self.status = -1 

395 continue 

396 # Check that dlc environment is ok, shell scripts exists, and get iblvideo version, GPU addressable 

397 self.version = self._check_dlcenv() 

398 _logger.info(f'iblvideo version {self.version}') 

399 check_nvidia_driver() 

400 

401 _logger.info(f'Running DLC on {cam}Camera.') 

402 command2run = f"{self.scripts.joinpath('run_dlc.sh')} {str(self.dlcenv)} {file_mp4} {overwrite}" 

403 _logger.info(command2run) 

404 process = subprocess.Popen( 

405 command2run, 

406 shell=True, 

407 stdout=subprocess.PIPE, 

408 stderr=subprocess.PIPE, 

409 executable='/bin/bash', 

410 ) 

411 info, error = process.communicate() 

412 # info_str = info.decode("utf-8").strip() 

413 # _logger.info(info_str) 

414 if process.returncode != 0: 

415 error_str = error.decode('utf-8').strip() 

416 _logger.error(f'DLC failed for {cam}Camera.\n\n' 

417 f'++++++++ Output of subprocess for debugging ++++++++\n\n' 

418 f'{error_str}\n' 

419 f'++++++++++++++++++++++++++++++++++++++++++++\n') 

420 self.status = -1 

421 # We dont' run motion energy, or add any files if dlc failed to run 

422 continue 

423 dlc_result = next(self.session_path.joinpath('alf').glob(f'_ibl_{cam}Camera.dlc*.pqt')) 

424 actual_outputs.append(dlc_result) 

425 

426 _logger.info(f'Computing motion energy for {cam}Camera') 

427 command2run = f"{self.scripts.joinpath('run_motion.sh')} {str(self.dlcenv)} {file_mp4} {dlc_result}" 

428 _logger.info(command2run) 

429 process = subprocess.Popen( 

430 command2run, 

431 shell=True, 

432 stdout=subprocess.PIPE, 

433 stderr=subprocess.PIPE, 

434 executable='/bin/bash', 

435 ) 

436 info, error = process.communicate() 

437 # info_str = info.decode('utf-8').strip() 

438 # _logger.info(info_str) 

439 if process.returncode != 0: 

440 error_str = error.decode('utf-8').strip() 

441 _logger.error(f'Motion energy failed for {cam}Camera.\n\n' 

442 f'++++++++ Output of subprocess for debugging ++++++++\n\n' 

443 f'{error_str}\n' 

444 f'++++++++++++++++++++++++++++++++++++++++++++\n') 

445 self.status = -1 

446 continue 

447 actual_outputs.append(next(self.session_path.joinpath('alf').glob( 

448 f'{cam}Camera.ROIMotionEnergy*.npy'))) 

449 actual_outputs.append(next(self.session_path.joinpath('alf').glob( 

450 f'{cam}ROIMotionEnergy.position*.npy'))) 

451 except Exception: 

452 _logger.error(traceback.format_exc()) 

453 self.status = -1 

454 continue 

455 # If status is Incomplete, check that there is at least one output. 

456 # # Otherwise make sure it gets set to Empty (outputs = None), and set status to -1 to make sure it doesn't slip 

457 if self.status == -3 and len(actual_outputs) == 0: 

458 actual_outputs = None 

459 self.status = -1 

460 return actual_outputs 

461 

462 

463class EphysPostDLC(base_tasks.VideoTask): 

464 """ 

465 The post_dlc task takes dlc traces as input and computes useful quantities, as well as qc. 

466 """ 

467 io_charge = 90 

468 level = 3 

469 force = True 

470 

471 def __init__(self, *args, **kwargs): 

472 super().__init__(*args, **kwargs) 1aijklemo

473 self.trials_collection = kwargs.get('trials_collection', 'alf') 1aijklemo

474 

475 @property 

476 def signature(self): 

477 return { 

478 'input_files': [(f'_ibl_{cam}Camera.dlc.pqt', 'alf', True) for cam in self.cameras] + 

479 [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras] + 

480 # the following are required for the DLC plot only 

481 # they are not strictly required, some plots just might be skipped 

482 # In particular the raw videos don't need to be downloaded as they can be streamed 

483 [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + 

484 [(f'{cam}ROIMotionEnergy.position.npy', 'alf', False) for cam in self.cameras] + 

485 # The trials table is used in the DLC QC, however this is not an essential dataset 

486 [('_ibl_trials.table.pqt', self.trials_collection, False)], 

487 'output_files': [(f'_ibl_{cam}Camera.features.pqt', 'alf', True) for cam in self.cameras] + 

488 [('licks.times.npy', 'alf', True)] 

489 } 

490 

491 def _run(self, overwrite=True, run_qc=True, plot_qc=True): 

492 """ 

493 Run the PostDLC task. Returns a list of file locations for the output files in signature. The created plot 

494 (dlc_qc_plot.png) is not returned, but saved in session_path/snapshots and uploaded to Alyx as a note. 

495 

496 :param overwrite: bool, whether to recompute existing output files (default is False). 

497 Note that the dlc_qc_plot will be (re-)computed even if overwrite = False 

498 :param run_qc: bool, whether to run the DLC QC (default is True) 

499 :param plot_qc: book, whether to create the dlc_qc_plot (default is True) 

500 

501 """ 

502 # Check if output files exist locally 

503 exist, output_files = self.assert_expected(self.signature['output_files'], silent=True) 

504 if exist and not overwrite: 

505 _logger.warning('EphysPostDLC outputs exist and overwrite=False, skipping computations of outputs.') 

506 else: 

507 if exist and overwrite: 

508 _logger.warning('EphysPostDLC outputs exist and overwrite=True, overwriting existing outputs.') 

509 # Find all available DLC files 

510 dlc_files = list(Path(self.session_path).joinpath('alf').rglob('_ibl_*Camera.dlc.*')) 

511 for dlc_file in dlc_files: 

512 _logger.debug(dlc_file) 

513 output_files = [] 

514 combined_licks = [] 

515 

516 for dlc_file in dlc_files: 

517 # Catch unforeseen exceptions and move on to next cam 

518 try: 

519 cam = label_from_path(dlc_file) 

520 # load dlc trace and camera times 

521 dlc = pd.read_parquet(dlc_file) 

522 dlc_thresh = likelihood_threshold(dlc, 0.9) 

523 # try to load respective camera times 

524 try: 

525 dlc_t = np.load(next(Path(self.session_path).joinpath('alf').rglob(f'_ibl_{cam}Camera.times.*npy'))) 

526 times = True 

527 if dlc_t.shape[0] == 0: 

528 _logger.error(f'camera.times empty for {cam} camera. ' 

529 f'Computations using camera.times will be skipped') 

530 self.status = -1 

531 times = False 

532 elif dlc_t.shape[0] < len(dlc_thresh): 

533 _logger.error(f'Camera times shorter than DLC traces for {cam} camera. ' 

534 f'Computations using camera.times will be skipped') 

535 self.status = -1 

536 times = 'short' 

537 except StopIteration: 

538 self.status = -1 

539 times = False 

540 _logger.error(f'No camera.times for {cam} camera. ' 

541 f'Computations using camera.times will be skipped') 

542 # These features are only computed from left and right cam 

543 if cam in ('left', 'right'): 

544 features = pd.DataFrame() 

545 # If camera times are available, get the lick time stamps for combined array 

546 if times is True: 

547 _logger.info(f'Computing lick times for {cam} camera.') 

548 combined_licks.append(get_licks(dlc_thresh, dlc_t)) 

549 elif times is False: 

550 _logger.warning(f'Skipping lick times for {cam} camera as no camera.times available') 

551 elif times == 'short': 

552 _logger.warning(f'Skipping lick times for {cam} camera as camera.times are too short') 

553 # Compute pupil diameter, raw and smoothed 

554 _logger.info(f'Computing raw pupil diameter for {cam} camera.') 

555 features['pupilDiameter_raw'] = get_pupil_diameter(dlc_thresh) 

556 try: 

557 _logger.info(f'Computing smooth pupil diameter for {cam} camera.') 

558 features['pupilDiameter_smooth'] = get_smooth_pupil_diameter(features['pupilDiameter_raw'], 

559 cam) 

560 except Exception: 

561 _logger.error(f'Computing smooth pupil diameter for {cam} camera failed, saving all NaNs.') 

562 _logger.error(traceback.format_exc()) 

563 features['pupilDiameter_smooth'] = np.nan 

564 # Save to parquet 

565 features_file = Path(self.session_path).joinpath('alf', f'_ibl_{cam}Camera.features.pqt') 

566 features.to_parquet(features_file) 

567 output_files.append(features_file) 

568 

569 # For all cams, compute DLC QC if times available 

570 if run_qc is True and times in [True, 'short']: 

571 # Setting download_data to False because at this point the data should be there 

572 qc = DlcQC(self.session_path, side=cam, one=self.one, download_data=False) 

573 qc.run(update=True) 

574 else: 

575 if times is False: 

576 _logger.warning(f'Skipping QC for {cam} camera as no camera.times available') 

577 if not run_qc: 

578 _logger.warning(f'Skipping QC for {cam} camera as run_qc=False') 

579 

580 except Exception: 

581 _logger.error(traceback.format_exc()) 

582 self.status = -1 

583 continue 

584 

585 # Combined lick times 

586 if len(combined_licks) > 0: 

587 lick_times_file = Path(self.session_path).joinpath('alf', 'licks.times.npy') 

588 np.save(lick_times_file, sorted(np.concatenate(combined_licks))) 

589 output_files.append(lick_times_file) 

590 else: 

591 _logger.warning('No lick times computed for this session.') 

592 

593 if plot_qc: 

594 _logger.info('Creating DLC QC plot') 

595 try: 

596 session_id = self.one.path2eid(self.session_path) 

597 fig_path = self.session_path.joinpath('snapshot', 'dlc_qc_plot.png') 

598 if not fig_path.parent.exists(): 

599 fig_path.parent.mkdir(parents=True, exist_ok=True) 

600 fig = dlc_qc_plot(self.session_path, one=self.one, cameras=self.cameras, device_collection=self.device_collection, 

601 trials_collection=self.trials_collection) 

602 fig.savefig(fig_path) 

603 fig.clf() 

604 snp = ReportSnapshot(self.session_path, session_id, one=self.one) 

605 snp.outputs = [fig_path] 

606 snp.register_images(widths=['orig'], 

607 function=str(dlc_qc_plot.__module__) + '.' + str(dlc_qc_plot.__name__)) 

608 except Exception: 

609 _logger.error('Could not create and/or upload DLC QC Plot') 

610 _logger.error(traceback.format_exc()) 

611 self.status = -1 

612 

613 return output_files 

614 

615 

616class LightningPose(base_tasks.VideoTask): 

617 # TODO: make one task per cam? 

618 gpu = 1 

619 io_charge = 100 

620 level = 2 

621 force = True 

622 job_size = 'large' 

623 

624 env = Path.home().joinpath('Documents', 'PYTHON', 'envs', 'litpose', 'bin', 'activate') 

625 scripts = Path.home().joinpath('Documents', 'PYTHON', 'iblscripts', 'deploy', 'serverpc', 'litpose') 

626 

627 @property 

628 def signature(self): 

629 signature = { 1h

630 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras], 

631 'output_files': [(f'_ibl_{cam}Camera.lightningPose.pqt', 'alf', True) for cam in self.cameras] 

632 } 

633 

634 return signature 1h

635 

636 @staticmethod 

637 def _video_intact(file_mp4): 

638 """Checks that the downloaded video can be opened and is not empty""" 

639 cap = cv2.VideoCapture(str(file_mp4)) 

640 frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) 

641 intact = True if frame_count > 0 else False 

642 cap.release() 

643 return intact 

644 

645 def _check_env(self): 

646 """Check that scripts are present, env can be activated and get iblvideo version""" 

647 assert len(list(self.scripts.rglob('run_litpose.*'))) == 2, \ 

648 f'Scripts run_litpose.sh and run_litpose.py do not exist in {self.scripts}' 

649 assert self.env.exists(), f"environment does not exist in assumed location {self.env}" 

650 command2run = f"source {self.env}; python -c 'import iblvideo; print(iblvideo.__version__)'" 

651 process = subprocess.Popen( 

652 command2run, 

653 shell=True, 

654 stdout=subprocess.PIPE, 

655 stderr=subprocess.PIPE, 

656 executable="/bin/bash" 

657 ) 

658 info, error = process.communicate() 

659 if process.returncode != 0: 

660 raise AssertionError(f"environment check failed\n{error.decode('utf-8')}") 

661 version = info.decode("utf-8").strip().split('\n')[-1] 

662 return version 

663 

664 def _run(self, overwrite=True, **kwargs): 

665 

666 # Gather video files 

667 self.session_path = Path(self.session_path) 1h

668 mp4_files = [ 1h

669 self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.mp4') for cam in self.cameras 

670 if self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.mp4').exists() 

671 ] 

672 

673 labels = [label_from_path(x) for x in mp4_files] 1h

674 _logger.info(f'Running on {labels} videos') 1h

675 

676 # Check the environment 

677 self.version = self._check_env() 1h

678 _logger.info(f'iblvideo version {self.version}') 1h

679 

680 # If all results exist and overwrite is False, skip computation 

681 expected_outputs_present, expected_outputs = self.assert_expected(self.output_files, silent=True) 1h

682 if overwrite is False and expected_outputs_present is True: 1h

683 actual_outputs = expected_outputs 1h

684 return actual_outputs 1h

685 

686 # Else, loop over videos 

687 actual_outputs = [] 

688 for label, mp4_file in zip(labels, mp4_files): 

689 # Catch exceptions so that the other cams can still run but set status to Errored 

690 try: 

691 # Check that the GPU is (still) accessible 

692 check_nvidia_driver() 

693 # Check that the video can be loaded 

694 if not self._video_intact(mp4_file): 

695 _logger.error(f"Corrupt raw video file {mp4_file}") 

696 self.status = -1 

697 continue 

698 t0 = time.time() 

699 _logger.info(f'Running Lightning Pose on {label}Camera.') 

700 command2run = f"{self.scripts.joinpath('run_litpose.sh')} {str(self.env)} {mp4_file} {overwrite}" 

701 _logger.info(command2run) 

702 process = subprocess.Popen( 

703 command2run, 

704 shell=True, 

705 stdout=subprocess.PIPE, 

706 stderr=subprocess.PIPE, 

707 executable="/bin/bash", 

708 ) 

709 info, error = process.communicate() 

710 if process.returncode != 0: 

711 error_str = error.decode("utf-8").strip() 

712 _logger.error(f'Lightning pose failed for {label}Camera.\n\n' 

713 f'++++++++ Output of subprocess for debugging ++++++++\n\n' 

714 f'{error_str}\n' 

715 f'++++++++++++++++++++++++++++++++++++++++++++\n') 

716 self.status = -1 

717 continue 

718 else: 

719 _logger.info(f'{label} camera took {(time.time() - t0)} seconds') 

720 result = next(self.session_path.joinpath('alf').glob(f'_ibl_{label}Camera.lightningPose*.pqt')) 

721 actual_outputs.append(result) 

722 

723 except BaseException: 

724 _logger.error(traceback.format_exc()) 

725 self.status = -1 

726 continue 

727 

728 return actual_outputs