Coverage for ibllib/pipes/video_tasks.py: 45%

438 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-05-07 14:26 +0100

1import logging 

2import subprocess 

3import time 

4import traceback 

5from pathlib import Path 

6from functools import partial 

7 

8import cv2 

9import pandas as pd 

10import numpy as np 

11 

12from ibllib.qc.dlc import DlcQC 

13from ibllib.io import ffmpeg, raw_daq_loaders 

14from ibllib.pipes import base_tasks 

15from ibllib.io.video import get_video_meta 

16from ibllib.io.extractors import camera 

17from ibllib.io.extractors.base import run_extractor_classes 

18from ibllib.io.extractors.ephys_fpga import get_sync_and_chn_map 

19from ibllib.qc.camera import run_all_qc as run_camera_qc, CameraQC 

20from ibllib.misc import check_nvidia_driver 

21from ibllib.io.video import label_from_path, assert_valid_label 

22from ibllib.plots.snapshot import ReportSnapshot 

23from ibllib.plots.figures import dlc_qc_plot 

24from brainbox.behavior.dlc import likelihood_threshold, get_licks, get_pupil_diameter, get_smooth_pupil_diameter 

25 

26_logger = logging.getLogger('ibllib') 

27 

28 

29class VideoRegisterRaw(base_tasks.VideoTask, base_tasks.RegisterRawDataTask): 

30 """ 

31 Task to register raw video data. Builds up list of files to register from list of cameras given in session params file 

32 """ 

33 

34 priority = 100 

35 job_size = 'small' 

36 

37 @property 

38 def signature(self): 

39 signature = { 1b

40 'input_files': [], 

41 'output_files': 

42 [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] + 

43 [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] + 

44 [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] + 

45 [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] + 

46 [('_iblrig_videoCodeFiles.raw*', self.device_collection, False)] 

47 } 

48 return signature 1b

49 

50 def assert_expected_outputs(self, raise_error=True): 

51 """ 

52 frameData replaces the timestamps file. Therefore if frameData is present, timestamps is 

53 optional and vice versa. 

54 """ 

55 assert self.status == 0 

56 _logger.info('Checking output files') 

57 everything_is_fine, files = self.assert_expected(self.output_files) 

58 

59 required = any('Camera.frameData' in x or 'Camera.timestamps' in x for x in map(str, files)) 

60 if not (everything_is_fine and required): 

61 for out in self.outputs: 

62 _logger.error(f'{out}') 

63 if raise_error: 

64 raise FileNotFoundError('Missing outputs after task completion') 

65 

66 return everything_is_fine, files 

67 

68 

69class VideoCompress(base_tasks.VideoTask): 

70 """ 

71 Task to compress raw video data from .avi to .mp4 format. 

72 """ 

73 priority = 90 

74 job_size = 'large' 

75 

76 @property 

77 def signature(self): 

78 signature = { 1abhi

79 'input_files': [(f'_iblrig_{cam}Camera.raw.*', self.device_collection, True) for cam in self.cameras], 

80 'output_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] 

81 } 

82 return signature 1abhi

83 

84 def _run(self): 

85 # TODO different compression parameters based on whether it is training or not based on number of cameras? 

86 # avi to mp4 compression 

87 if self.sync == 'bpod': 1abhi

88 command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 29 ' 1bh

89 '-nostats -codec:a copy {file_out}') 

90 else: 

91 command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 17 ' 1ai

92 '-loglevel 0 -codec:a copy {file_out}') 

93 

94 output_files = ffmpeg.iblrig_video_compression(self.session_path, command) 1abhi

95 

96 if len(output_files) == 0: 1abhi

97 _logger.info('No compressed videos found') 

98 return 

99 

100 return output_files 1abhi

101 

102 

103class VideoConvert(base_tasks.VideoTask): 

104 """ 

105 Task that converts compressed avi to mp4 format and renames video and camlog files. Specific to UCLA widefield implementation 

106 """ 

107 priority = 90 

108 job_size = 'small' 

109 

110 @property 

111 def signature(self): 

112 signature = { 1c

113 'input_files': [(f'{cam}_cam*.avi', self.device_collection, True) for cam in self.cameras] + 

114 [(f'{cam}_cam*.camlog', self.device_collection, False) for cam in self.cameras], 

115 'output_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + 

116 [(f'_iblrig_{cam}Camera.raw.camlog', self.device_collection, True) for cam in self.cameras] 

117 } 

118 

119 return signature 1c

120 

121 def _run(self): 

122 output_files = [] 1c

123 for cam in self.cameras: 1c

124 

125 # rename and register the camlog files 

126 camlog_file = next(self.session_path.joinpath(self.device_collection).glob(f'{cam}_cam*.camlog')) 1c

127 new_camlog_file = self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.camlog') 1c

128 camlog_file.replace(new_camlog_file) 1c

129 output_files.append(new_camlog_file) 1c

130 

131 # convert the avi files to mp4 

132 avi_file = next(self.session_path.joinpath(self.device_collection).glob(f'{cam}_cam*.avi')) 1c

133 mp4_file = self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.mp4') 1c

134 command2run = f'ffmpeg -i {str(avi_file)} -c:v copy -c:a copy -y {str(mp4_file)}' 1c

135 

136 process = subprocess.Popen(command2run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 1c

137 info, error = process.communicate() 1c

138 if process.returncode == 0: 1c

139 # check the video meta matched and remove the original file 

140 meta_avi = get_video_meta(avi_file) 1c

141 _ = meta_avi.pop('size') 1c

142 meta_mp4 = get_video_meta(mp4_file) 1c

143 match = True 1c

144 for key in meta_avi.keys(): 1c

145 if meta_avi[key] != meta_mp4[key]: 1c

146 match = False 

147 

148 # if all checks out we can remove the original avi 

149 if match: 1c

150 avi_file.unlink() 1c

151 output_files.append(mp4_file) 1c

152 else: 

153 _logger.error(f'avi and mp4 meta data do not match for {avi_file}') 

154 else: 

155 _logger.error(f'conversion to mp4 failed for {avi_file}: {error}') 

156 

157 return output_files 1c

158 

159 

160class VideoSyncQcCamlog(base_tasks.VideoTask): 

161 """ 

162 Task to sync camera timestamps to main DAQ timestamps when camlog files are used. Specific to UCLA widefield implementation 

163 """ 

164 priority = 40 

165 job_size = 'small' 

166 

167 @property 

168 def signature(self): 

169 signature = { 1f

170 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + 

171 [(f'_iblrig_{cam}Camera.raw.camlog', self.device_collection, False) for cam in self.cameras] + 

172 [(f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True), 

173 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True), 

174 (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True), 

175 ('*.wiring.json', self.sync_collection, True), 

176 ('*wheel.position.npy', 'alf', False), 

177 ('*wheel.timestamps.npy', 'alf', False)], 

178 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras] 

179 } 

180 

181 return signature 1f

182 

183 def extract_camera(self, save=True): 

184 extractor = [partial(camera.CameraTimestampsCamlog, label) for label in self.cameras or []] 1f

185 kwargs = {'sync_type': self.sync, 'sync_collection': self.sync_collection, 'save': save} 1f

186 kwargs['sync'], kwargs['chmap'] = get_sync_and_chn_map(self.session_path, self.sync_collection) 1f

187 return run_extractor_classes(extractor, session_path=self.session_path, **kwargs) 1f

188 

189 def run_qc(self, camera_data=None, update=True): 

190 if camera_data is None: 1f

191 camera_data, _ = self.extract_camera(save=False) 

192 qc = run_camera_qc( 1f

193 self.session_path, self.cameras, one=self.one, camlog=True, sync_collection=self.sync_collection, sync_type=self.sync, 

194 update=update) 

195 return qc 1f

196 

197 def _run(self, update=True, **kwargs): 

198 # Video timestamps extraction 

199 data, output_files = self.extract_camera(save=True) 1f

200 

201 # Video QC 

202 self.run_qc(data, update=update) 1f

203 

204 return output_files 1f

205 

206 

207class VideoSyncQcBpod(base_tasks.VideoTask): 

208 """ 

209 Task to sync camera timestamps to main DAQ timestamps 

210 N.B Signatures only reflect new daq naming convention, non-compatible with ephys when not running on server 

211 """ 

212 priority = 40 

213 job_size = 'small' 

214 

215 def __init__(self, *args, **kwargs): 

216 super().__init__(*args, **kwargs) 1bjklmd

217 # Task collection (this needs to be specified in the task kwargs) 

218 self.collection = self.get_task_collection(kwargs.get('collection', None)) 1bjklmd

219 # Task type (protocol) 

220 self.protocol = self.get_protocol(kwargs.get('protocol', None), task_collection=self.collection) 1bjklmd

221 self.extractor = None 1bjklmd

222 

223 @property 

224 def signature(self): 

225 signature = { 1bd

226 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + 

227 [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] + 

228 [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] + 

229 [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] + 

230 [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] + 

231 [('_iblrig_taskData.raw.*', self.collection, True), 

232 ('_iblrig_taskSettings.raw.*', self.collection, True), 

233 ('*wheel.position.npy', 'alf', False), 

234 ('*wheel.timestamps.npy', 'alf', False)], 

235 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras] 

236 } 

237 

238 return signature 1bd

239 

240 def extract_camera(self, save=True): 

241 mp4_files = filter(lambda x: label_from_path(x) in self.cameras or [], 1bd

242 self.session_path.joinpath(self.device_collection).rglob('*.mp4')) 

243 if self.cameras != ['left']: 1bd

244 raise NotImplementedError('Bpod Camera extraction currently only supports a left camera') 

245 

246 self.extractor = camera.CameraTimestampsBpod(self.session_path) 1bd

247 return self.extractor.extract(video_path=next(mp4_files), save=save, task_collection=self.collection) 1bd

248 

249 def run_qc(self, camera_data=None, update=True): 

250 if self.cameras != ['left']: 1bd

251 raise NotImplementedError('Bpod camera currently only supports a left camera') 

252 if camera_data is None: 1bd

253 camera_data, _ = self.extract_camera(save=False) 

254 qc = CameraQC( 1bd

255 self.session_path, 'left', sync_type='bpod', sync_collection=self.collection, one=self.one, 

256 protocol=self.protocol) 

257 qc.run(update=update) 1bd

258 return qc 1bd

259 

260 def _run(self, update=True, **kwargs): 

261 # Video timestamps extraction 

262 data, output_files = self.extract_camera(save=True) 1bd

263 

264 # Video QC 

265 self.run_qc(data, update=update) 1bd

266 

267 return output_files 1bd

268 

269 

270class VideoSyncQcNidq(base_tasks.VideoTask): 

271 """ 

272 Task to sync camera timestamps to main DAQ timestamps 

273 N.B Signatures only reflect new daq naming convention, non-compatible with ephys when not running on server 

274 """ 

275 priority = 40 

276 job_size = 'small' 

277 

278 @property 

279 def signature(self): 

280 signature = { 1e

281 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + 

282 [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] + 

283 [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] + 

284 [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] + 

285 [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] + 

286 [(f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True), 

287 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True), 

288 (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True), 

289 (f'_{self.sync_namespace}_*.wiring.json', self.sync_collection, False), 

290 (f'_{self.sync_namespace}_*.meta', self.sync_collection, True), 

291 ('*wheel.position.npy', 'alf', False), 

292 ('*wheel.timestamps.npy', 'alf', False), 

293 ('*experiment.description*', '', False)], 

294 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras] 

295 } 

296 

297 return signature 1e

298 

299 def extract_camera(self, save=True): 

300 extractor = [partial(camera.CameraTimestampsFPGA, label) for label in self.cameras or []] 1e

301 kwargs = {'sync_type': self.sync, 'sync_collection': self.sync_collection, 'save': save} 1e

302 if self.sync_namespace == 'timeline': 1e

303 # Load sync from timeline file 

304 alf_path = self.session_path / self.sync_collection 

305 kwargs['sync'], kwargs['chmap'] = raw_daq_loaders.load_timeline_sync_and_chmap(alf_path) 

306 else: 

307 kwargs['sync'], kwargs['chmap'] = get_sync_and_chn_map(self.session_path, self.sync_collection) 1e

308 return run_extractor_classes(extractor, session_path=self.session_path, **kwargs) 1e

309 

310 def run_qc(self, camera_data=None, update=True): 

311 if camera_data is None: 1e

312 camera_data, _ = self.extract_camera(save=False) 

313 qc = run_camera_qc( 1e

314 self.session_path, self.cameras, one=self.one, sync_collection=self.sync_collection, sync_type=self.sync, 

315 update=update) 

316 return qc 1e

317 

318 def _run(self, update=True, **kwargs): 

319 # Video timestamps extraction 

320 data, output_files = self.extract_camera(save=True) 1e

321 

322 # Video QC 

323 self.run_qc(data, update=update) 1e

324 

325 return output_files 1e

326 

327 

328class DLC(base_tasks.VideoTask): 

329 """ 

330 This task relies on a correctly installed dlc environment as per 

331 https://docs.google.com/document/d/1g0scP6_3EmaXCU4SsDNZWwDTaD9MG0es_grLA-d0gh0/edit# 

332 

333 If your environment is set up otherwise, make sure that you set the respective attributes: 

334 t = EphysDLC(session_path) 

335 t.dlcenv = Path('/path/to/your/dlcenv/bin/activate') 

336 t.scripts = Path('/path/to/your/iblscripts/deploy/serverpc/dlc') 

337 """ 

338 gpu = 1 

339 cpu = 4 

340 io_charge = 100 

341 level = 2 

342 force = True 

343 job_size = 'large' 

344 

345 dlcenv = Path.home().joinpath('Documents', 'PYTHON', 'envs', 'dlcenv', 'bin', 'activate') 

346 scripts = Path.home().joinpath('Documents', 'PYTHON', 'iblscripts', 'deploy', 'serverpc', 'dlc') 

347 

348 @property 

349 def signature(self): 

350 signature = { 

351 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras], 

352 'output_files': [(f'_ibl_{cam}Camera.dlc.pqt', 'alf', True) for cam in self.cameras] + 

353 [(f'{cam}Camera.ROIMotionEnergy.npy', 'alf', True) for cam in self.cameras] + 

354 [(f'{cam}ROIMotionEnergy.position.npy', 'alf', True) for cam in self.cameras] 

355 } 

356 

357 return signature 

358 

359 def _check_dlcenv(self): 

360 """Check that scripts are present, dlcenv can be activated and get iblvideo version""" 

361 assert len(list(self.scripts.rglob('run_dlc.*'))) == 2, \ 

362 f'Scripts run_dlc.sh and run_dlc.py do not exist in {self.scripts}' 

363 assert len(list(self.scripts.rglob('run_motion.*'))) == 2, \ 

364 f'Scripts run_motion.sh and run_motion.py do not exist in {self.scripts}' 

365 assert self.dlcenv.exists(), f'DLC environment does not exist in assumed location {self.dlcenv}' 

366 command2run = f"source {self.dlcenv}; python -c 'import iblvideo; print(iblvideo.__version__)'" 

367 process = subprocess.Popen( 

368 command2run, 

369 shell=True, 

370 stdout=subprocess.PIPE, 

371 stderr=subprocess.PIPE, 

372 executable='/bin/bash' 

373 ) 

374 info, error = process.communicate() 

375 if process.returncode != 0: 

376 raise AssertionError(f"DLC environment check failed\n{error.decode('utf-8')}") 

377 version = info.decode('utf-8').strip().split('\n')[-1] 

378 return version 

379 

380 @staticmethod 

381 def _video_intact(file_mp4): 

382 """Checks that the downloaded video can be opened and is not empty""" 

383 cap = cv2.VideoCapture(str(file_mp4)) 

384 frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) 

385 intact = True if frame_count > 0 else False 

386 cap.release() 

387 return intact 

388 

389 def _run(self, cams=None, overwrite=False): 

390 # Check that the cams are valid for DLC, remove the ones that aren't 

391 candidate_cams = cams or self.cameras 

392 cams = [] 

393 for cam in candidate_cams: 

394 try: 

395 cams.append(assert_valid_label(cam)) 

396 except ValueError: 

397 _logger.warning(f'{cam} is not a valid video label, this video will be skipped') 

398 # Set up 

399 self.session_id = self.one.path2eid(self.session_path) 

400 actual_outputs = [] 

401 

402 # Loop through cams 

403 for cam in cams: 

404 # Catch exceptions so that following cameras can still run 

405 try: 

406 # If all results exist and overwrite is False, skip computation 

407 expected_outputs_present, expected_outputs = self.assert_expected(self.output_files, silent=True) 

408 if overwrite is False and expected_outputs_present is True: 

409 actual_outputs.extend(expected_outputs) 

410 return actual_outputs 

411 else: 

412 file_mp4 = next(self.session_path.joinpath('raw_video_data').glob(f'_iblrig_{cam}Camera.raw*.mp4')) 

413 if not file_mp4.exists(): 

414 # In this case we set the status to Incomplete. 

415 _logger.error(f'No raw video file available for {cam}, skipping.') 

416 self.status = -3 

417 continue 

418 if not self._video_intact(file_mp4): 

419 _logger.error(f'Corrupt raw video file {file_mp4}') 

420 self.status = -1 

421 continue 

422 # Check that dlc environment is ok, shell scripts exists, and get iblvideo version, GPU addressable 

423 self.version = self._check_dlcenv() 

424 _logger.info(f'iblvideo version {self.version}') 

425 check_nvidia_driver() 

426 

427 _logger.info(f'Running DLC on {cam}Camera.') 

428 command2run = f"{self.scripts.joinpath('run_dlc.sh')} {str(self.dlcenv)} {file_mp4} {overwrite}" 

429 _logger.info(command2run) 

430 process = subprocess.Popen( 

431 command2run, 

432 shell=True, 

433 stdout=subprocess.PIPE, 

434 stderr=subprocess.PIPE, 

435 executable='/bin/bash', 

436 ) 

437 info, error = process.communicate() 

438 # info_str = info.decode("utf-8").strip() 

439 # _logger.info(info_str) 

440 if process.returncode != 0: 

441 error_str = error.decode('utf-8').strip() 

442 _logger.error(f'DLC failed for {cam}Camera.\n\n' 

443 f'++++++++ Output of subprocess for debugging ++++++++\n\n' 

444 f'{error_str}\n' 

445 f'++++++++++++++++++++++++++++++++++++++++++++\n') 

446 self.status = -1 

447 # We dont' run motion energy, or add any files if dlc failed to run 

448 continue 

449 dlc_result = next(self.session_path.joinpath('alf').glob(f'_ibl_{cam}Camera.dlc*.pqt')) 

450 actual_outputs.append(dlc_result) 

451 

452 _logger.info(f'Computing motion energy for {cam}Camera') 

453 command2run = f"{self.scripts.joinpath('run_motion.sh')} {str(self.dlcenv)} {file_mp4} {dlc_result}" 

454 _logger.info(command2run) 

455 process = subprocess.Popen( 

456 command2run, 

457 shell=True, 

458 stdout=subprocess.PIPE, 

459 stderr=subprocess.PIPE, 

460 executable='/bin/bash', 

461 ) 

462 info, error = process.communicate() 

463 # info_str = info.decode('utf-8').strip() 

464 # _logger.info(info_str) 

465 if process.returncode != 0: 

466 error_str = error.decode('utf-8').strip() 

467 _logger.error(f'Motion energy failed for {cam}Camera.\n\n' 

468 f'++++++++ Output of subprocess for debugging ++++++++\n\n' 

469 f'{error_str}\n' 

470 f'++++++++++++++++++++++++++++++++++++++++++++\n') 

471 self.status = -1 

472 continue 

473 actual_outputs.append(next(self.session_path.joinpath('alf').glob( 

474 f'{cam}Camera.ROIMotionEnergy*.npy'))) 

475 actual_outputs.append(next(self.session_path.joinpath('alf').glob( 

476 f'{cam}ROIMotionEnergy.position*.npy'))) 

477 except Exception: 

478 _logger.error(traceback.format_exc()) 

479 self.status = -1 

480 continue 

481 # If status is Incomplete, check that there is at least one output. 

482 # # Otherwise make sure it gets set to Empty (outputs = None), and set status to -1 to make sure it doesn't slip 

483 if self.status == -3 and len(actual_outputs) == 0: 

484 actual_outputs = None 

485 self.status = -1 

486 return actual_outputs 

487 

488 

489class EphysPostDLC(base_tasks.VideoTask): 

490 """ 

491 The post_dlc task takes dlc traces as input and computes useful quantities, as well as qc. 

492 """ 

493 io_charge = 90 

494 level = 3 

495 force = True 

496 

497 def __init__(self, *args, **kwargs): 

498 super().__init__(*args, **kwargs) 1anopqjrs

499 self.trials_collection = kwargs.get('trials_collection', 'alf') 1anopqjrs

500 

501 @property 

502 def signature(self): 

503 return { 

504 'input_files': [(f'_ibl_{cam}Camera.dlc.pqt', 'alf', True) for cam in self.cameras] + 

505 [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras] + 

506 # the following are required for the DLC plot only 

507 # they are not strictly required, some plots just might be skipped 

508 # In particular the raw videos don't need to be downloaded as they can be streamed 

509 [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + 

510 [(f'{cam}ROIMotionEnergy.position.npy', 'alf', False) for cam in self.cameras] + 

511 [(f'{cam}Camera.ROIMotionEnergy.npy', 'alf', False) for cam in self.cameras] + 

512 # The trials table is used in the DLC QC, however this is not an essential dataset 

513 [('_ibl_trials.table.pqt', self.trials_collection, False), 

514 ('_ibl_wheel.position.npy', self.trials_collection, False), 

515 ('_ibl_wheel.timestamps.npy', self.trials_collection, False)], 

516 'output_files': [(f'_ibl_{cam}Camera.features.pqt', 'alf', True) for cam in self.cameras] + 

517 [('licks.times.npy', 'alf', True)] 

518 } 

519 

520 def _run(self, overwrite=True, run_qc=True, plot_qc=True): 

521 """ 

522 Run the PostDLC task. Returns a list of file locations for the output files in signature. The created plot 

523 (dlc_qc_plot.png) is not returned, but saved in session_path/snapshots and uploaded to Alyx as a note. 

524 

525 :param overwrite: bool, whether to recompute existing output files (default is False). 

526 Note that the dlc_qc_plot will be (re-)computed even if overwrite = False 

527 :param run_qc: bool, whether to run the DLC QC (default is True) 

528 :param plot_qc: book, whether to create the dlc_qc_plot (default is True) 

529 

530 """ 

531 # Check if output files exist locally 

532 exist, output_files = self.assert_expected(self.output_files, silent=True) 

533 if exist and not overwrite: 

534 _logger.warning('EphysPostDLC outputs exist and overwrite=False, skipping computations of outputs.') 

535 else: 

536 if exist and overwrite: 

537 _logger.warning('EphysPostDLC outputs exist and overwrite=True, overwriting existing outputs.') 

538 # Find all available DLC files 

539 dlc_files = list(Path(self.session_path).joinpath('alf').rglob('_ibl_*Camera.dlc.*')) 

540 for dlc_file in dlc_files: 

541 _logger.debug(dlc_file) 

542 output_files = [] 

543 combined_licks = [] 

544 

545 for dlc_file in dlc_files: 

546 # Catch unforeseen exceptions and move on to next cam 

547 try: 

548 cam = label_from_path(dlc_file) 

549 # load dlc trace and camera times 

550 dlc = pd.read_parquet(dlc_file) 

551 dlc_thresh = likelihood_threshold(dlc, 0.9) 

552 # try to load respective camera times 

553 try: 

554 dlc_t = np.load(next(Path(self.session_path).joinpath('alf').rglob(f'_ibl_{cam}Camera.times.*npy'))) 

555 times = True 

556 if dlc_t.shape[0] == 0: 

557 _logger.error(f'camera.times empty for {cam} camera. ' 

558 f'Computations using camera.times will be skipped') 

559 self.status = -1 

560 times = False 

561 elif dlc_t.shape[0] < len(dlc_thresh): 

562 _logger.error(f'Camera times shorter than DLC traces for {cam} camera. ' 

563 f'Computations using camera.times will be skipped') 

564 self.status = -1 

565 times = 'short' 

566 except StopIteration: 

567 self.status = -1 

568 times = False 

569 _logger.error(f'No camera.times for {cam} camera. ' 

570 f'Computations using camera.times will be skipped') 

571 # These features are only computed from left and right cam 

572 if cam in ('left', 'right'): 

573 features = pd.DataFrame() 

574 # If camera times are available, get the lick time stamps for combined array 

575 if times is True: 

576 _logger.info(f'Computing lick times for {cam} camera.') 

577 combined_licks.append(get_licks(dlc_thresh, dlc_t)) 

578 elif times is False: 

579 _logger.warning(f'Skipping lick times for {cam} camera as no camera.times available') 

580 elif times == 'short': 

581 _logger.warning(f'Skipping lick times for {cam} camera as camera.times are too short') 

582 # Compute pupil diameter, raw and smoothed 

583 _logger.info(f'Computing raw pupil diameter for {cam} camera.') 

584 features['pupilDiameter_raw'] = get_pupil_diameter(dlc_thresh) 

585 try: 

586 _logger.info(f'Computing smooth pupil diameter for {cam} camera.') 

587 features['pupilDiameter_smooth'] = get_smooth_pupil_diameter(features['pupilDiameter_raw'], 

588 cam) 

589 except Exception: 

590 _logger.error(f'Computing smooth pupil diameter for {cam} camera failed, saving all NaNs.') 

591 _logger.error(traceback.format_exc()) 

592 features['pupilDiameter_smooth'] = np.nan 

593 # Save to parquet 

594 features_file = Path(self.session_path).joinpath('alf', f'_ibl_{cam}Camera.features.pqt') 

595 features.to_parquet(features_file) 

596 output_files.append(features_file) 

597 

598 # For all cams, compute DLC QC if times available 

599 if run_qc is True and times in [True, 'short']: 

600 # Setting download_data to False because at this point the data should be there 

601 qc = DlcQC(self.session_path, side=cam, one=self.one, download_data=False) 

602 qc.run(update=True) 

603 else: 

604 if times is False: 

605 _logger.warning(f'Skipping QC for {cam} camera as no camera.times available') 

606 if not run_qc: 

607 _logger.warning(f'Skipping QC for {cam} camera as run_qc=False') 

608 

609 except Exception: 

610 _logger.error(traceback.format_exc()) 

611 self.status = -1 

612 continue 

613 

614 # Combined lick times 

615 if len(combined_licks) > 0: 

616 lick_times_file = Path(self.session_path).joinpath('alf', 'licks.times.npy') 

617 np.save(lick_times_file, sorted(np.concatenate(combined_licks))) 

618 output_files.append(lick_times_file) 

619 else: 

620 _logger.warning('No lick times computed for this session.') 

621 

622 if plot_qc: 

623 _logger.info('Creating DLC QC plot') 

624 try: 

625 session_id = self.one.path2eid(self.session_path) 

626 fig_path = self.session_path.joinpath('snapshot', 'dlc_qc_plot.png') 

627 if not fig_path.parent.exists(): 

628 fig_path.parent.mkdir(parents=True, exist_ok=True) 

629 fig = dlc_qc_plot(self.session_path, one=self.one, cameras=self.cameras, device_collection=self.device_collection, 

630 trials_collection=self.trials_collection) 

631 fig.savefig(fig_path) 

632 fig.clf() 

633 snp = ReportSnapshot(self.session_path, session_id, one=self.one) 

634 snp.outputs = [fig_path] 

635 snp.register_images(widths=['orig'], 

636 function=str(dlc_qc_plot.__module__) + '.' + str(dlc_qc_plot.__name__)) 

637 except Exception: 

638 _logger.error('Could not create and/or upload DLC QC Plot') 

639 _logger.error(traceback.format_exc()) 

640 self.status = -1 

641 

642 return output_files 

643 

644 

645class LightningPose(base_tasks.VideoTask): 

646 # TODO: make one task per cam? 

647 # TODO: separate pose and motion energy 

648 gpu = 1 

649 io_charge = 100 

650 level = 2 

651 force = True 

652 job_size = 'large' 

653 env = 'litpose' 

654 

655 lpenv = Path.home().joinpath('Documents', 'PYTHON', 'envs', 'litpose', 'bin', 'activate') 

656 scripts = Path.home().joinpath('Documents', 'PYTHON', 'iblscripts', 'deploy', 'serverpc', 'litpose') 

657 

658 @property 

659 def signature(self): 

660 signature = { 1g

661 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras], 

662 'output_files': [(f'_ibl_{cam}Camera.lightningPose.pqt', 'alf', True) for cam in self.cameras] + 

663 [(f'{cam}Camera.ROIMotionEnergy.npy', 'alf', True) for cam in self.cameras] + 

664 [(f'{cam}ROIMotionEnergy.position.npy', 'alf', True) for cam in self.cameras] 

665 } 

666 

667 return signature 1g

668 

669 @staticmethod 

670 def _video_intact(file_mp4): 

671 """Checks that the downloaded video can be opened and is not empty""" 

672 cap = cv2.VideoCapture(str(file_mp4)) 

673 frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) 

674 intact = True if frame_count > 0 else False 

675 cap.release() 

676 return intact 

677 

678 def _check_env(self): 

679 """Check that scripts are present, env can be activated and get iblvideo version""" 

680 assert len(list(self.scripts.rglob('run_litpose.*'))) == 2, \ 

681 f'Scripts run_litpose.sh and run_litpose.py do not exist in {self.scripts}' 

682 assert self.lpenv.exists(), f"environment does not exist in assumed location {self.lpenv}" 

683 command2run = f"source {self.lpenv}; python -c 'import iblvideo; print(iblvideo.__version__)'" 

684 process = subprocess.Popen( 

685 command2run, 

686 shell=True, 

687 stdout=subprocess.PIPE, 

688 stderr=subprocess.PIPE, 

689 executable="/bin/bash" 

690 ) 

691 info, error = process.communicate() 

692 if process.returncode != 0: 

693 raise AssertionError(f"environment check failed\n{error.decode('utf-8')}") 

694 version = info.decode("utf-8").strip().split('\n')[-1] 

695 return version 

696 

697 def _run(self, overwrite=True, **kwargs): 

698 

699 # Gather video files 

700 self.session_path = Path(self.session_path) 1g

701 mp4_files = [ 1g

702 self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.mp4') for cam in self.cameras 

703 if self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.mp4').exists() 

704 ] 

705 

706 labels = [label_from_path(x) for x in mp4_files] 1g

707 _logger.info(f'Running on {labels} videos') 1g

708 

709 # Check the environment 

710 self.version = self._check_env() 1g

711 _logger.info(f'iblvideo version {self.version}') 1g

712 

713 # If all results exist and overwrite is False, skip computation 

714 expected_outputs_present, expected_outputs = self.assert_expected(self.output_files, silent=True) 1g

715 if overwrite is False and expected_outputs_present is True: 1g

716 actual_outputs = expected_outputs 1g

717 return actual_outputs 1g

718 

719 # Else, loop over videos 

720 actual_outputs = [] 

721 for label, mp4_file in zip(labels, mp4_files): 

722 # Catch exceptions so that the other cams can still run but set status to Errored 

723 try: 

724 # Check that the GPU is (still) accessible 

725 check_nvidia_driver() 

726 # Check that the video can be loaded 

727 if not self._video_intact(mp4_file): 

728 _logger.error(f"Corrupt raw video file {mp4_file}") 

729 self.status = -1 

730 continue 

731 

732 # --------------------------- 

733 # Run pose estimation 

734 # --------------------------- 

735 t0 = time.time() 

736 _logger.info(f'Running Lightning Pose on {label}Camera.') 

737 command2run = f"{self.scripts.joinpath('run_litpose.sh')} {str(self.lpenv)} {mp4_file} {overwrite}" 

738 _logger.info(command2run) 

739 process = subprocess.Popen( 

740 command2run, 

741 shell=True, 

742 stdout=subprocess.PIPE, 

743 stderr=subprocess.PIPE, 

744 executable="/bin/bash", 

745 ) 

746 info, error = process.communicate() 

747 if process.returncode != 0: 

748 error_str = error.decode("utf-8").strip() 

749 _logger.error( 

750 f'Lightning pose failed for {label}Camera.\n\n' 

751 f'++++++++ Output of subprocess for debugging ++++++++\n\n' 

752 f'{error_str}\n' 

753 f'++++++++++++++++++++++++++++++++++++++++++++\n' 

754 ) 

755 self.status = -1 

756 # We don't run motion energy, or add any files if LP failed to run 

757 continue 

758 else: 

759 _logger.info(f'{label} camera took {(time.time() - t0)} seconds') 

760 result = next(self.session_path.joinpath('alf').glob(f'_ibl_{label}Camera.lightningPose*.pqt')) 

761 actual_outputs.append(result) 

762 

763 # --------------------------- 

764 # Run motion energy 

765 # --------------------------- 

766 t1 = time.time() 

767 _logger.info(f'Computing motion energy for {label}Camera') 

768 command2run = f"{self.scripts.joinpath('run_motion.sh')} {str(self.lpenv)} {mp4_file} {result}" 

769 _logger.info(command2run) 

770 process = subprocess.Popen( 

771 command2run, 

772 shell=True, 

773 stdout=subprocess.PIPE, 

774 stderr=subprocess.PIPE, 

775 executable='/bin/bash', 

776 ) 

777 info, error = process.communicate() 

778 if process.returncode != 0: 

779 error_str = error.decode('utf-8').strip() 

780 _logger.error( 

781 f'Motion energy failed for {label}Camera.\n\n' 

782 f'++++++++ Output of subprocess for debugging ++++++++\n\n' 

783 f'{error_str}\n' 

784 f'++++++++++++++++++++++++++++++++++++++++++++\n' 

785 ) 

786 self.status = -1 

787 continue 

788 else: 

789 _logger.info(f'{label} camera took {(time.time() - t1)} seconds') 

790 actual_outputs.append(next(self.session_path.joinpath('alf').glob( 

791 f'{label}Camera.ROIMotionEnergy*.npy'))) 

792 actual_outputs.append(next(self.session_path.joinpath('alf').glob( 

793 f'{label}ROIMotionEnergy.position*.npy'))) 

794 

795 except BaseException: 

796 _logger.error(traceback.format_exc()) 

797 self.status = -1 

798 continue 

799 

800 # catch here if there are no raw videos present 

801 if len(actual_outputs) == 0: 

802 _logger.info('Did not find any videos for this session') 

803 actual_outputs = None 

804 self.status = -1 

805 

806 return actual_outputs