Coverage for ibllib/pipes/video_tasks.py: 45%

438 statements  

« prev     ^ index     » next       coverage.py v7.7.0, created at 2025-03-17 09:55 +0000

1import logging 

2import subprocess 

3import time 

4import traceback 

5from pathlib import Path 

6from functools import partial 

7 

8import cv2 

9import pandas as pd 

10import numpy as np 

11 

12from ibllib.qc.dlc import DlcQC 

13from ibllib.io import ffmpeg, raw_daq_loaders 

14from ibllib.pipes import base_tasks 

15from ibllib.io.video import get_video_meta 

16from ibllib.io.extractors import camera 

17from ibllib.io.extractors.base import run_extractor_classes 

18from ibllib.io.extractors.ephys_fpga import get_sync_and_chn_map 

19from ibllib.qc.camera import run_all_qc as run_camera_qc, CameraQC 

20from ibllib.misc import check_nvidia_driver 

21from ibllib.io.video import label_from_path, assert_valid_label 

22from ibllib.plots.snapshot import ReportSnapshot 

23from ibllib.plots.figures import dlc_qc_plot 

24from brainbox.behavior.dlc import likelihood_threshold, get_licks, get_pupil_diameter, get_smooth_pupil_diameter 

25 

26_logger = logging.getLogger('ibllib') 

27 

28 

29class VideoRegisterRaw(base_tasks.VideoTask, base_tasks.RegisterRawDataTask): 

30 """ 

31 Task to register raw video data. Builds up list of files to register from list of cameras given in session params file 

32 """ 

33 

34 priority = 100 

35 job_size = 'small' 

36 

37 @property 

38 def signature(self): 

39 signature = { 1b

40 'input_files': [], 

41 'output_files': 

42 [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] + 

43 [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] + 

44 [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] + 

45 [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] + 

46 [('_iblrig_videoCodeFiles.raw*', self.device_collection, False)] 

47 } 

48 return signature 1b

49 

50 def assert_expected_outputs(self, raise_error=True): 

51 """ 

52 frameData replaces the timestamps file. Therefore if frameData is present, timestamps is 

53 optional and vice versa. 

54 """ 

55 assert self.status == 0 

56 _logger.info('Checking output files') 

57 everything_is_fine, files = self.assert_expected(self.output_files) 

58 

59 required = any('Camera.frameData' in x or 'Camera.timestamps' in x for x in map(str, files)) 

60 if not (everything_is_fine and required): 

61 for out in self.outputs: 

62 _logger.error(f'{out}') 

63 if raise_error: 

64 raise FileNotFoundError('Missing outputs after task completion') 

65 

66 return everything_is_fine, files 

67 

68 

69class VideoCompress(base_tasks.VideoTask): 

70 """ 

71 Task to compress raw video data from .avi to .mp4 format. 

72 """ 

73 priority = 90 

74 job_size = 'large' 

75 

76 @property 

77 def signature(self): 

78 signature = { 1abhi

79 'input_files': [(f'_iblrig_{cam}Camera.raw.*', self.device_collection, True) for cam in self.cameras], 

80 'output_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] 

81 } 

82 return signature 1abhi

83 

84 def _run(self): 

85 # TODO different compression parameters based on whether it is training or not based on number of cameras? 

86 # avi to mp4 compression 

87 if self.sync == 'bpod': 1abhi

88 command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 29 ' 1bh

89 '-nostats -codec:a copy {file_out}') 

90 else: 

91 command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 17 ' 1ai

92 '-loglevel 0 -codec:a copy {file_out}') 

93 

94 output_files = ffmpeg.iblrig_video_compression(self.session_path, command) 1abhi

95 

96 if len(output_files) == 0: 1abhi

97 _logger.info('No compressed videos found') 

98 return 

99 

100 return output_files 1abhi

101 

102 

103class VideoConvert(base_tasks.VideoTask): 

104 """ 

105 Task that converts compressed avi to mp4 format and renames video and camlog files. Specific to UCLA widefield implementation 

106 """ 

107 priority = 90 

108 job_size = 'small' 

109 

110 @property 

111 def signature(self): 

112 signature = { 1c

113 'input_files': [(f'{cam}_cam*.avi', self.device_collection, True) for cam in self.cameras] + 

114 [(f'{cam}_cam*.camlog', self.device_collection, False) for cam in self.cameras], 

115 'output_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + 

116 [(f'_iblrig_{cam}Camera.raw.camlog', self.device_collection, True) for cam in self.cameras] 

117 } 

118 

119 return signature 1c

120 

121 def _run(self): 

122 output_files = [] 1c

123 for cam in self.cameras: 1c

124 

125 # rename and register the camlog files 

126 camlog_file = next(self.session_path.joinpath(self.device_collection).glob(f'{cam}_cam*.camlog')) 1c

127 new_camlog_file = self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.camlog') 1c

128 camlog_file.replace(new_camlog_file) 1c

129 output_files.append(new_camlog_file) 1c

130 

131 # convert the avi files to mp4 

132 avi_file = next(self.session_path.joinpath(self.device_collection).glob(f'{cam}_cam*.avi')) 1c

133 mp4_file = self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.mp4') 1c

134 command2run = f'ffmpeg -i {str(avi_file)} -c:v copy -c:a copy -y {str(mp4_file)}' 1c

135 

136 process = subprocess.Popen(command2run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 1c

137 info, error = process.communicate() 1c

138 if process.returncode == 0: 1c

139 # check the video meta matched and remove the original file 

140 meta_avi = get_video_meta(avi_file) 1c

141 _ = meta_avi.pop('size') 1c

142 meta_mp4 = get_video_meta(mp4_file) 1c

143 match = True 1c

144 for key in meta_avi.keys(): 1c

145 if meta_avi[key] != meta_mp4[key]: 1c

146 match = False 

147 

148 # if all checks out we can remove the original avi 

149 if match: 1c

150 avi_file.unlink() 1c

151 output_files.append(mp4_file) 1c

152 else: 

153 _logger.error(f'avi and mp4 meta data do not match for {avi_file}') 

154 else: 

155 _logger.error(f'conversion to mp4 failed for {avi_file}: {error}') 

156 

157 return output_files 1c

158 

159 

160class VideoSyncQcCamlog(base_tasks.VideoTask): 

161 """ 

162 Task to sync camera timestamps to main DAQ timestamps when camlog files are used. Specific to UCLA widefield implementation 

163 """ 

164 priority = 40 

165 job_size = 'small' 

166 

167 @property 

168 def signature(self): 

169 signature = { 1f

170 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + 

171 [(f'_iblrig_{cam}Camera.raw.camlog', self.device_collection, False) for cam in self.cameras] + 

172 [(f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True), 

173 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True), 

174 (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True), 

175 ('*.wiring.json', self.sync_collection, True), 

176 ('*wheel.position.npy', 'alf', False), 

177 ('*wheel.timestamps.npy', 'alf', False)], 

178 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras] 

179 } 

180 

181 return signature 1f

182 

183 def extract_camera(self, save=True): 

184 extractor = [partial(camera.CameraTimestampsCamlog, label) for label in self.cameras or []] 1f

185 kwargs = {'sync_type': self.sync, 'sync_collection': self.sync_collection, 'save': save} 1f

186 kwargs['sync'], kwargs['chmap'] = get_sync_and_chn_map(self.session_path, self.sync_collection) 1f

187 return run_extractor_classes(extractor, session_path=self.session_path, **kwargs) 1f

188 

189 def run_qc(self, camera_data=None, update=True): 

190 if camera_data is None: 1f

191 camera_data, _ = self.extract_camera(save=False) 

192 qc = run_camera_qc( 1f

193 self.session_path, self.cameras, one=self.one, camlog=True, sync_collection=self.sync_collection, sync_type=self.sync, 

194 update=update) 

195 return qc 1f

196 

197 def _run(self, update=True, **kwargs): 

198 # Video timestamps extraction 

199 data, output_files = self.extract_camera(save=True) 1f

200 

201 # Video QC 

202 self.run_qc(data, update=update) 1f

203 

204 return output_files 1f

205 

206 

207class VideoSyncQcBpod(base_tasks.VideoTask): 

208 """ 

209 Task to sync camera timestamps to main DAQ timestamps 

210 N.B Signatures only reflect new daq naming convention, non-compatible with ephys when not running on server 

211 """ 

212 priority = 40 

213 job_size = 'small' 

214 

215 def __init__(self, *args, **kwargs): 

216 super().__init__(*args, **kwargs) 1bjklmd

217 # Task collection (this needs to be specified in the task kwargs) 

218 self.collection = self.get_task_collection(kwargs.get('collection', None)) 1bjklmd

219 # Task type (protocol) 

220 self.protocol = self.get_protocol(kwargs.get('protocol', None), task_collection=self.collection) 1bjklmd

221 self.extractor = None 1bjklmd

222 

223 @property 

224 def signature(self): 

225 signature = { 1bd

226 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + 

227 [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] + 

228 [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] + 

229 [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] + 

230 [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] + 

231 [('_iblrig_taskData.raw.*', self.collection, True), 

232 ('_iblrig_taskSettings.raw.*', self.collection, True), 

233 ('*wheel.position.npy', 'alf', False), 

234 ('*wheel.timestamps.npy', 'alf', False)], 

235 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras] 

236 } 

237 

238 return signature 1bd

239 

240 def extract_camera(self, save=True): 

241 mp4_files = filter(lambda x: label_from_path(x) in self.cameras or [], 1bd

242 self.session_path.joinpath(self.device_collection).rglob('*.mp4')) 

243 if self.cameras != ['left']: 1bd

244 raise NotImplementedError('Bpod Camera extraction currently only supports a left camera') 

245 

246 self.extractor = camera.CameraTimestampsBpod(self.session_path) 1bd

247 return self.extractor.extract(video_path=next(mp4_files), save=save, task_collection=self.collection) 1bd

248 

249 def run_qc(self, camera_data=None, update=True): 

250 if self.cameras != ['left']: 1bd

251 raise NotImplementedError('Bpod camera currently only supports a left camera') 

252 if camera_data is None: 1bd

253 camera_data, _ = self.extract_camera(save=False) 

254 qc = CameraQC( 1bd

255 self.session_path, 'left', sync_type='bpod', sync_collection=self.collection, one=self.one) 

256 qc.run(update=update) 1bd

257 return qc 1bd

258 

259 def _run(self, update=True, **kwargs): 

260 # Video timestamps extraction 

261 data, output_files = self.extract_camera(save=True) 1bd

262 

263 # Video QC 

264 self.run_qc(data, update=update) 1bd

265 

266 return output_files 1bd

267 

268 

269class VideoSyncQcNidq(base_tasks.VideoTask): 

270 """ 

271 Task to sync camera timestamps to main DAQ timestamps 

272 N.B Signatures only reflect new daq naming convention, non-compatible with ephys when not running on server 

273 """ 

274 priority = 40 

275 job_size = 'small' 

276 

277 @property 

278 def signature(self): 

279 signature = { 1e

280 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + 

281 [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] + 

282 [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] + 

283 [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] + 

284 [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] + 

285 [(f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True), 

286 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True), 

287 (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True), 

288 (f'_{self.sync_namespace}_*.wiring.json', self.sync_collection, False), 

289 (f'_{self.sync_namespace}_*.meta', self.sync_collection, True), 

290 ('*wheel.position.npy', 'alf', False), 

291 ('*wheel.timestamps.npy', 'alf', False), 

292 ('*experiment.description*', '', False)], 

293 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras] 

294 } 

295 

296 return signature 1e

297 

298 def extract_camera(self, save=True): 

299 extractor = [partial(camera.CameraTimestampsFPGA, label) for label in self.cameras or []] 1e

300 kwargs = {'sync_type': self.sync, 'sync_collection': self.sync_collection, 'save': save} 1e

301 if self.sync_namespace == 'timeline': 1e

302 # Load sync from timeline file 

303 alf_path = self.session_path / self.sync_collection 

304 kwargs['sync'], kwargs['chmap'] = raw_daq_loaders.load_timeline_sync_and_chmap(alf_path) 

305 else: 

306 kwargs['sync'], kwargs['chmap'] = get_sync_and_chn_map(self.session_path, self.sync_collection) 1e

307 return run_extractor_classes(extractor, session_path=self.session_path, **kwargs) 1e

308 

309 def run_qc(self, camera_data=None, update=True): 

310 if camera_data is None: 1e

311 camera_data, _ = self.extract_camera(save=False) 

312 qc = run_camera_qc( 1e

313 self.session_path, self.cameras, one=self.one, sync_collection=self.sync_collection, sync_type=self.sync, 

314 update=update) 

315 return qc 1e

316 

317 def _run(self, update=True, **kwargs): 

318 # Video timestamps extraction 

319 data, output_files = self.extract_camera(save=True) 1e

320 

321 # Video QC 

322 self.run_qc(data, update=update) 1e

323 

324 return output_files 1e

325 

326 

327class DLC(base_tasks.VideoTask): 

328 """ 

329 This task relies on a correctly installed dlc environment as per 

330 https://docs.google.com/document/d/1g0scP6_3EmaXCU4SsDNZWwDTaD9MG0es_grLA-d0gh0/edit# 

331 

332 If your environment is set up otherwise, make sure that you set the respective attributes: 

333 t = EphysDLC(session_path) 

334 t.dlcenv = Path('/path/to/your/dlcenv/bin/activate') 

335 t.scripts = Path('/path/to/your/iblscripts/deploy/serverpc/dlc') 

336 """ 

337 gpu = 1 

338 cpu = 4 

339 io_charge = 100 

340 level = 2 

341 force = True 

342 job_size = 'large' 

343 

344 dlcenv = Path.home().joinpath('Documents', 'PYTHON', 'envs', 'dlcenv', 'bin', 'activate') 

345 scripts = Path.home().joinpath('Documents', 'PYTHON', 'iblscripts', 'deploy', 'serverpc', 'dlc') 

346 

347 @property 

348 def signature(self): 

349 signature = { 

350 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras], 

351 'output_files': [(f'_ibl_{cam}Camera.dlc.pqt', 'alf', True) for cam in self.cameras] + 

352 [(f'{cam}Camera.ROIMotionEnergy.npy', 'alf', True) for cam in self.cameras] + 

353 [(f'{cam}ROIMotionEnergy.position.npy', 'alf', True) for cam in self.cameras] 

354 } 

355 

356 return signature 

357 

358 def _check_dlcenv(self): 

359 """Check that scripts are present, dlcenv can be activated and get iblvideo version""" 

360 assert len(list(self.scripts.rglob('run_dlc.*'))) == 2, \ 

361 f'Scripts run_dlc.sh and run_dlc.py do not exist in {self.scripts}' 

362 assert len(list(self.scripts.rglob('run_motion.*'))) == 2, \ 

363 f'Scripts run_motion.sh and run_motion.py do not exist in {self.scripts}' 

364 assert self.dlcenv.exists(), f'DLC environment does not exist in assumed location {self.dlcenv}' 

365 command2run = f"source {self.dlcenv}; python -c 'import iblvideo; print(iblvideo.__version__)'" 

366 process = subprocess.Popen( 

367 command2run, 

368 shell=True, 

369 stdout=subprocess.PIPE, 

370 stderr=subprocess.PIPE, 

371 executable='/bin/bash' 

372 ) 

373 info, error = process.communicate() 

374 if process.returncode != 0: 

375 raise AssertionError(f"DLC environment check failed\n{error.decode('utf-8')}") 

376 version = info.decode('utf-8').strip().split('\n')[-1] 

377 return version 

378 

379 @staticmethod 

380 def _video_intact(file_mp4): 

381 """Checks that the downloaded video can be opened and is not empty""" 

382 cap = cv2.VideoCapture(str(file_mp4)) 

383 frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) 

384 intact = True if frame_count > 0 else False 

385 cap.release() 

386 return intact 

387 

388 def _run(self, cams=None, overwrite=False): 

389 # Check that the cams are valid for DLC, remove the ones that aren't 

390 candidate_cams = cams or self.cameras 

391 cams = [] 

392 for cam in candidate_cams: 

393 try: 

394 cams.append(assert_valid_label(cam)) 

395 except ValueError: 

396 _logger.warning(f'{cam} is not a valid video label, this video will be skipped') 

397 # Set up 

398 self.session_id = self.one.path2eid(self.session_path) 

399 actual_outputs = [] 

400 

401 # Loop through cams 

402 for cam in cams: 

403 # Catch exceptions so that following cameras can still run 

404 try: 

405 # If all results exist and overwrite is False, skip computation 

406 expected_outputs_present, expected_outputs = self.assert_expected(self.output_files, silent=True) 

407 if overwrite is False and expected_outputs_present is True: 

408 actual_outputs.extend(expected_outputs) 

409 return actual_outputs 

410 else: 

411 file_mp4 = next(self.session_path.joinpath('raw_video_data').glob(f'_iblrig_{cam}Camera.raw*.mp4')) 

412 if not file_mp4.exists(): 

413 # In this case we set the status to Incomplete. 

414 _logger.error(f'No raw video file available for {cam}, skipping.') 

415 self.status = -3 

416 continue 

417 if not self._video_intact(file_mp4): 

418 _logger.error(f'Corrupt raw video file {file_mp4}') 

419 self.status = -1 

420 continue 

421 # Check that dlc environment is ok, shell scripts exists, and get iblvideo version, GPU addressable 

422 self.version = self._check_dlcenv() 

423 _logger.info(f'iblvideo version {self.version}') 

424 check_nvidia_driver() 

425 

426 _logger.info(f'Running DLC on {cam}Camera.') 

427 command2run = f"{self.scripts.joinpath('run_dlc.sh')} {str(self.dlcenv)} {file_mp4} {overwrite}" 

428 _logger.info(command2run) 

429 process = subprocess.Popen( 

430 command2run, 

431 shell=True, 

432 stdout=subprocess.PIPE, 

433 stderr=subprocess.PIPE, 

434 executable='/bin/bash', 

435 ) 

436 info, error = process.communicate() 

437 # info_str = info.decode("utf-8").strip() 

438 # _logger.info(info_str) 

439 if process.returncode != 0: 

440 error_str = error.decode('utf-8').strip() 

441 _logger.error(f'DLC failed for {cam}Camera.\n\n' 

442 f'++++++++ Output of subprocess for debugging ++++++++\n\n' 

443 f'{error_str}\n' 

444 f'++++++++++++++++++++++++++++++++++++++++++++\n') 

445 self.status = -1 

446 # We dont' run motion energy, or add any files if dlc failed to run 

447 continue 

448 dlc_result = next(self.session_path.joinpath('alf').glob(f'_ibl_{cam}Camera.dlc*.pqt')) 

449 actual_outputs.append(dlc_result) 

450 

451 _logger.info(f'Computing motion energy for {cam}Camera') 

452 command2run = f"{self.scripts.joinpath('run_motion.sh')} {str(self.dlcenv)} {file_mp4} {dlc_result}" 

453 _logger.info(command2run) 

454 process = subprocess.Popen( 

455 command2run, 

456 shell=True, 

457 stdout=subprocess.PIPE, 

458 stderr=subprocess.PIPE, 

459 executable='/bin/bash', 

460 ) 

461 info, error = process.communicate() 

462 # info_str = info.decode('utf-8').strip() 

463 # _logger.info(info_str) 

464 if process.returncode != 0: 

465 error_str = error.decode('utf-8').strip() 

466 _logger.error(f'Motion energy failed for {cam}Camera.\n\n' 

467 f'++++++++ Output of subprocess for debugging ++++++++\n\n' 

468 f'{error_str}\n' 

469 f'++++++++++++++++++++++++++++++++++++++++++++\n') 

470 self.status = -1 

471 continue 

472 actual_outputs.append(next(self.session_path.joinpath('alf').glob( 

473 f'{cam}Camera.ROIMotionEnergy*.npy'))) 

474 actual_outputs.append(next(self.session_path.joinpath('alf').glob( 

475 f'{cam}ROIMotionEnergy.position*.npy'))) 

476 except Exception: 

477 _logger.error(traceback.format_exc()) 

478 self.status = -1 

479 continue 

480 # If status is Incomplete, check that there is at least one output. 

481 # # Otherwise make sure it gets set to Empty (outputs = None), and set status to -1 to make sure it doesn't slip 

482 if self.status == -3 and len(actual_outputs) == 0: 

483 actual_outputs = None 

484 self.status = -1 

485 return actual_outputs 

486 

487 

488class EphysPostDLC(base_tasks.VideoTask): 

489 """ 

490 The post_dlc task takes dlc traces as input and computes useful quantities, as well as qc. 

491 """ 

492 io_charge = 90 

493 level = 3 

494 force = True 

495 

496 def __init__(self, *args, **kwargs): 

497 super().__init__(*args, **kwargs) 1anopqjrs

498 self.trials_collection = kwargs.get('trials_collection', 'alf') 1anopqjrs

499 

500 @property 

501 def signature(self): 

502 return { 

503 'input_files': [(f'_ibl_{cam}Camera.dlc.pqt', 'alf', True) for cam in self.cameras] + 

504 [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras] + 

505 # the following are required for the DLC plot only 

506 # they are not strictly required, some plots just might be skipped 

507 # In particular the raw videos don't need to be downloaded as they can be streamed 

508 [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] + 

509 [(f'{cam}ROIMotionEnergy.position.npy', 'alf', False) for cam in self.cameras] + 

510 [(f'{cam}Camera.ROIMotionEnergy.npy', 'alf', False) for cam in self.cameras] + 

511 # The trials table is used in the DLC QC, however this is not an essential dataset 

512 [('_ibl_trials.table.pqt', self.trials_collection, False), 

513 ('_ibl_wheel.position.npy', self.trials_collection, False), 

514 ('_ibl_wheel.timestamps.npy', self.trials_collection, False)], 

515 'output_files': [(f'_ibl_{cam}Camera.features.pqt', 'alf', True) for cam in self.cameras] + 

516 [('licks.times.npy', 'alf', True)] 

517 } 

518 

519 def _run(self, overwrite=True, run_qc=True, plot_qc=True): 

520 """ 

521 Run the PostDLC task. Returns a list of file locations for the output files in signature. The created plot 

522 (dlc_qc_plot.png) is not returned, but saved in session_path/snapshots and uploaded to Alyx as a note. 

523 

524 :param overwrite: bool, whether to recompute existing output files (default is False). 

525 Note that the dlc_qc_plot will be (re-)computed even if overwrite = False 

526 :param run_qc: bool, whether to run the DLC QC (default is True) 

527 :param plot_qc: book, whether to create the dlc_qc_plot (default is True) 

528 

529 """ 

530 # Check if output files exist locally 

531 exist, output_files = self.assert_expected(self.output_files, silent=True) 

532 if exist and not overwrite: 

533 _logger.warning('EphysPostDLC outputs exist and overwrite=False, skipping computations of outputs.') 

534 else: 

535 if exist and overwrite: 

536 _logger.warning('EphysPostDLC outputs exist and overwrite=True, overwriting existing outputs.') 

537 # Find all available DLC files 

538 dlc_files = list(Path(self.session_path).joinpath('alf').rglob('_ibl_*Camera.dlc.*')) 

539 for dlc_file in dlc_files: 

540 _logger.debug(dlc_file) 

541 output_files = [] 

542 combined_licks = [] 

543 

544 for dlc_file in dlc_files: 

545 # Catch unforeseen exceptions and move on to next cam 

546 try: 

547 cam = label_from_path(dlc_file) 

548 # load dlc trace and camera times 

549 dlc = pd.read_parquet(dlc_file) 

550 dlc_thresh = likelihood_threshold(dlc, 0.9) 

551 # try to load respective camera times 

552 try: 

553 dlc_t = np.load(next(Path(self.session_path).joinpath('alf').rglob(f'_ibl_{cam}Camera.times.*npy'))) 

554 times = True 

555 if dlc_t.shape[0] == 0: 

556 _logger.error(f'camera.times empty for {cam} camera. ' 

557 f'Computations using camera.times will be skipped') 

558 self.status = -1 

559 times = False 

560 elif dlc_t.shape[0] < len(dlc_thresh): 

561 _logger.error(f'Camera times shorter than DLC traces for {cam} camera. ' 

562 f'Computations using camera.times will be skipped') 

563 self.status = -1 

564 times = 'short' 

565 except StopIteration: 

566 self.status = -1 

567 times = False 

568 _logger.error(f'No camera.times for {cam} camera. ' 

569 f'Computations using camera.times will be skipped') 

570 # These features are only computed from left and right cam 

571 if cam in ('left', 'right'): 

572 features = pd.DataFrame() 

573 # If camera times are available, get the lick time stamps for combined array 

574 if times is True: 

575 _logger.info(f'Computing lick times for {cam} camera.') 

576 combined_licks.append(get_licks(dlc_thresh, dlc_t)) 

577 elif times is False: 

578 _logger.warning(f'Skipping lick times for {cam} camera as no camera.times available') 

579 elif times == 'short': 

580 _logger.warning(f'Skipping lick times for {cam} camera as camera.times are too short') 

581 # Compute pupil diameter, raw and smoothed 

582 _logger.info(f'Computing raw pupil diameter for {cam} camera.') 

583 features['pupilDiameter_raw'] = get_pupil_diameter(dlc_thresh) 

584 try: 

585 _logger.info(f'Computing smooth pupil diameter for {cam} camera.') 

586 features['pupilDiameter_smooth'] = get_smooth_pupil_diameter(features['pupilDiameter_raw'], 

587 cam) 

588 except Exception: 

589 _logger.error(f'Computing smooth pupil diameter for {cam} camera failed, saving all NaNs.') 

590 _logger.error(traceback.format_exc()) 

591 features['pupilDiameter_smooth'] = np.nan 

592 # Save to parquet 

593 features_file = Path(self.session_path).joinpath('alf', f'_ibl_{cam}Camera.features.pqt') 

594 features.to_parquet(features_file) 

595 output_files.append(features_file) 

596 

597 # For all cams, compute DLC QC if times available 

598 if run_qc is True and times in [True, 'short']: 

599 # Setting download_data to False because at this point the data should be there 

600 qc = DlcQC(self.session_path, side=cam, one=self.one, download_data=False) 

601 qc.run(update=True) 

602 else: 

603 if times is False: 

604 _logger.warning(f'Skipping QC for {cam} camera as no camera.times available') 

605 if not run_qc: 

606 _logger.warning(f'Skipping QC for {cam} camera as run_qc=False') 

607 

608 except Exception: 

609 _logger.error(traceback.format_exc()) 

610 self.status = -1 

611 continue 

612 

613 # Combined lick times 

614 if len(combined_licks) > 0: 

615 lick_times_file = Path(self.session_path).joinpath('alf', 'licks.times.npy') 

616 np.save(lick_times_file, sorted(np.concatenate(combined_licks))) 

617 output_files.append(lick_times_file) 

618 else: 

619 _logger.warning('No lick times computed for this session.') 

620 

621 if plot_qc: 

622 _logger.info('Creating DLC QC plot') 

623 try: 

624 session_id = self.one.path2eid(self.session_path) 

625 fig_path = self.session_path.joinpath('snapshot', 'dlc_qc_plot.png') 

626 if not fig_path.parent.exists(): 

627 fig_path.parent.mkdir(parents=True, exist_ok=True) 

628 fig = dlc_qc_plot(self.session_path, one=self.one, cameras=self.cameras, device_collection=self.device_collection, 

629 trials_collection=self.trials_collection) 

630 fig.savefig(fig_path) 

631 fig.clf() 

632 snp = ReportSnapshot(self.session_path, session_id, one=self.one) 

633 snp.outputs = [fig_path] 

634 snp.register_images(widths=['orig'], 

635 function=str(dlc_qc_plot.__module__) + '.' + str(dlc_qc_plot.__name__)) 

636 except Exception: 

637 _logger.error('Could not create and/or upload DLC QC Plot') 

638 _logger.error(traceback.format_exc()) 

639 self.status = -1 

640 

641 return output_files 

642 

643 

644class LightningPose(base_tasks.VideoTask): 

645 # TODO: make one task per cam? 

646 # TODO: separate pose and motion energy 

647 gpu = 1 

648 io_charge = 100 

649 level = 2 

650 force = True 

651 job_size = 'large' 

652 env = 'litpose' 

653 

654 lpenv = Path.home().joinpath('Documents', 'PYTHON', 'envs', 'litpose', 'bin', 'activate') 

655 scripts = Path.home().joinpath('Documents', 'PYTHON', 'iblscripts', 'deploy', 'serverpc', 'litpose') 

656 

657 @property 

658 def signature(self): 

659 signature = { 1g

660 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras], 

661 'output_files': [(f'_ibl_{cam}Camera.lightningPose.pqt', 'alf', True) for cam in self.cameras] + 

662 [(f'{cam}Camera.ROIMotionEnergy.npy', 'alf', True) for cam in self.cameras] + 

663 [(f'{cam}ROIMotionEnergy.position.npy', 'alf', True) for cam in self.cameras] 

664 } 

665 

666 return signature 1g

667 

668 @staticmethod 

669 def _video_intact(file_mp4): 

670 """Checks that the downloaded video can be opened and is not empty""" 

671 cap = cv2.VideoCapture(str(file_mp4)) 

672 frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) 

673 intact = True if frame_count > 0 else False 

674 cap.release() 

675 return intact 

676 

677 def _check_env(self): 

678 """Check that scripts are present, env can be activated and get iblvideo version""" 

679 assert len(list(self.scripts.rglob('run_litpose.*'))) == 2, \ 

680 f'Scripts run_litpose.sh and run_litpose.py do not exist in {self.scripts}' 

681 assert self.lpenv.exists(), f"environment does not exist in assumed location {self.lpenv}" 

682 command2run = f"source {self.lpenv}; python -c 'import iblvideo; print(iblvideo.__version__)'" 

683 process = subprocess.Popen( 

684 command2run, 

685 shell=True, 

686 stdout=subprocess.PIPE, 

687 stderr=subprocess.PIPE, 

688 executable="/bin/bash" 

689 ) 

690 info, error = process.communicate() 

691 if process.returncode != 0: 

692 raise AssertionError(f"environment check failed\n{error.decode('utf-8')}") 

693 version = info.decode("utf-8").strip().split('\n')[-1] 

694 return version 

695 

696 def _run(self, overwrite=True, **kwargs): 

697 

698 # Gather video files 

699 self.session_path = Path(self.session_path) 1g

700 mp4_files = [ 1g

701 self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.mp4') for cam in self.cameras 

702 if self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.mp4').exists() 

703 ] 

704 

705 labels = [label_from_path(x) for x in mp4_files] 1g

706 _logger.info(f'Running on {labels} videos') 1g

707 

708 # Check the environment 

709 self.version = self._check_env() 1g

710 _logger.info(f'iblvideo version {self.version}') 1g

711 

712 # If all results exist and overwrite is False, skip computation 

713 expected_outputs_present, expected_outputs = self.assert_expected(self.output_files, silent=True) 1g

714 if overwrite is False and expected_outputs_present is True: 1g

715 actual_outputs = expected_outputs 1g

716 return actual_outputs 1g

717 

718 # Else, loop over videos 

719 actual_outputs = [] 

720 for label, mp4_file in zip(labels, mp4_files): 

721 # Catch exceptions so that the other cams can still run but set status to Errored 

722 try: 

723 # Check that the GPU is (still) accessible 

724 check_nvidia_driver() 

725 # Check that the video can be loaded 

726 if not self._video_intact(mp4_file): 

727 _logger.error(f"Corrupt raw video file {mp4_file}") 

728 self.status = -1 

729 continue 

730 

731 # --------------------------- 

732 # Run pose estimation 

733 # --------------------------- 

734 t0 = time.time() 

735 _logger.info(f'Running Lightning Pose on {label}Camera.') 

736 command2run = f"{self.scripts.joinpath('run_litpose.sh')} {str(self.lpenv)} {mp4_file} {overwrite}" 

737 _logger.info(command2run) 

738 process = subprocess.Popen( 

739 command2run, 

740 shell=True, 

741 stdout=subprocess.PIPE, 

742 stderr=subprocess.PIPE, 

743 executable="/bin/bash", 

744 ) 

745 info, error = process.communicate() 

746 if process.returncode != 0: 

747 error_str = error.decode("utf-8").strip() 

748 _logger.error( 

749 f'Lightning pose failed for {label}Camera.\n\n' 

750 f'++++++++ Output of subprocess for debugging ++++++++\n\n' 

751 f'{error_str}\n' 

752 f'++++++++++++++++++++++++++++++++++++++++++++\n' 

753 ) 

754 self.status = -1 

755 # We don't run motion energy, or add any files if LP failed to run 

756 continue 

757 else: 

758 _logger.info(f'{label} camera took {(time.time() - t0)} seconds') 

759 result = next(self.session_path.joinpath('alf').glob(f'_ibl_{label}Camera.lightningPose*.pqt')) 

760 actual_outputs.append(result) 

761 

762 # --------------------------- 

763 # Run motion energy 

764 # --------------------------- 

765 t1 = time.time() 

766 _logger.info(f'Computing motion energy for {label}Camera') 

767 command2run = f"{self.scripts.joinpath('run_motion.sh')} {str(self.lpenv)} {mp4_file} {result}" 

768 _logger.info(command2run) 

769 process = subprocess.Popen( 

770 command2run, 

771 shell=True, 

772 stdout=subprocess.PIPE, 

773 stderr=subprocess.PIPE, 

774 executable='/bin/bash', 

775 ) 

776 info, error = process.communicate() 

777 if process.returncode != 0: 

778 error_str = error.decode('utf-8').strip() 

779 _logger.error( 

780 f'Motion energy failed for {label}Camera.\n\n' 

781 f'++++++++ Output of subprocess for debugging ++++++++\n\n' 

782 f'{error_str}\n' 

783 f'++++++++++++++++++++++++++++++++++++++++++++\n' 

784 ) 

785 self.status = -1 

786 continue 

787 else: 

788 _logger.info(f'{label} camera took {(time.time() - t1)} seconds') 

789 actual_outputs.append(next(self.session_path.joinpath('alf').glob( 

790 f'{label}Camera.ROIMotionEnergy*.npy'))) 

791 actual_outputs.append(next(self.session_path.joinpath('alf').glob( 

792 f'{label}ROIMotionEnergy.position*.npy'))) 

793 

794 except BaseException: 

795 _logger.error(traceback.format_exc()) 

796 self.status = -1 

797 continue 

798 

799 # catch here if there are no raw videos present 

800 if len(actual_outputs) == 0: 

801 _logger.info('Did not find any videos for this session') 

802 actual_outputs = None 

803 self.status = -1 

804 

805 return actual_outputs