Coverage for ibllib/pipes/video_tasks.py: 45%
438 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-07 14:26 +0100
« prev ^ index » next coverage.py v7.8.0, created at 2025-05-07 14:26 +0100
1import logging
2import subprocess
3import time
4import traceback
5from pathlib import Path
6from functools import partial
8import cv2
9import pandas as pd
10import numpy as np
12from ibllib.qc.dlc import DlcQC
13from ibllib.io import ffmpeg, raw_daq_loaders
14from ibllib.pipes import base_tasks
15from ibllib.io.video import get_video_meta
16from ibllib.io.extractors import camera
17from ibllib.io.extractors.base import run_extractor_classes
18from ibllib.io.extractors.ephys_fpga import get_sync_and_chn_map
19from ibllib.qc.camera import run_all_qc as run_camera_qc, CameraQC
20from ibllib.misc import check_nvidia_driver
21from ibllib.io.video import label_from_path, assert_valid_label
22from ibllib.plots.snapshot import ReportSnapshot
23from ibllib.plots.figures import dlc_qc_plot
24from brainbox.behavior.dlc import likelihood_threshold, get_licks, get_pupil_diameter, get_smooth_pupil_diameter
26_logger = logging.getLogger('ibllib')
29class VideoRegisterRaw(base_tasks.VideoTask, base_tasks.RegisterRawDataTask):
30 """
31 Task to register raw video data. Builds up list of files to register from list of cameras given in session params file
32 """
34 priority = 100
35 job_size = 'small'
37 @property
38 def signature(self):
39 signature = { 1b
40 'input_files': [],
41 'output_files':
42 [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] +
43 [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] +
44 [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] +
45 [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] +
46 [('_iblrig_videoCodeFiles.raw*', self.device_collection, False)]
47 }
48 return signature 1b
50 def assert_expected_outputs(self, raise_error=True):
51 """
52 frameData replaces the timestamps file. Therefore if frameData is present, timestamps is
53 optional and vice versa.
54 """
55 assert self.status == 0
56 _logger.info('Checking output files')
57 everything_is_fine, files = self.assert_expected(self.output_files)
59 required = any('Camera.frameData' in x or 'Camera.timestamps' in x for x in map(str, files))
60 if not (everything_is_fine and required):
61 for out in self.outputs:
62 _logger.error(f'{out}')
63 if raise_error:
64 raise FileNotFoundError('Missing outputs after task completion')
66 return everything_is_fine, files
69class VideoCompress(base_tasks.VideoTask):
70 """
71 Task to compress raw video data from .avi to .mp4 format.
72 """
73 priority = 90
74 job_size = 'large'
76 @property
77 def signature(self):
78 signature = { 1abhi
79 'input_files': [(f'_iblrig_{cam}Camera.raw.*', self.device_collection, True) for cam in self.cameras],
80 'output_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras]
81 }
82 return signature 1abhi
84 def _run(self):
85 # TODO different compression parameters based on whether it is training or not based on number of cameras?
86 # avi to mp4 compression
87 if self.sync == 'bpod': 1abhi
88 command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 29 ' 1bh
89 '-nostats -codec:a copy {file_out}')
90 else:
91 command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 17 ' 1ai
92 '-loglevel 0 -codec:a copy {file_out}')
94 output_files = ffmpeg.iblrig_video_compression(self.session_path, command) 1abhi
96 if len(output_files) == 0: 1abhi
97 _logger.info('No compressed videos found')
98 return
100 return output_files 1abhi
103class VideoConvert(base_tasks.VideoTask):
104 """
105 Task that converts compressed avi to mp4 format and renames video and camlog files. Specific to UCLA widefield implementation
106 """
107 priority = 90
108 job_size = 'small'
110 @property
111 def signature(self):
112 signature = { 1c
113 'input_files': [(f'{cam}_cam*.avi', self.device_collection, True) for cam in self.cameras] +
114 [(f'{cam}_cam*.camlog', self.device_collection, False) for cam in self.cameras],
115 'output_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] +
116 [(f'_iblrig_{cam}Camera.raw.camlog', self.device_collection, True) for cam in self.cameras]
117 }
119 return signature 1c
121 def _run(self):
122 output_files = [] 1c
123 for cam in self.cameras: 1c
125 # rename and register the camlog files
126 camlog_file = next(self.session_path.joinpath(self.device_collection).glob(f'{cam}_cam*.camlog')) 1c
127 new_camlog_file = self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.camlog') 1c
128 camlog_file.replace(new_camlog_file) 1c
129 output_files.append(new_camlog_file) 1c
131 # convert the avi files to mp4
132 avi_file = next(self.session_path.joinpath(self.device_collection).glob(f'{cam}_cam*.avi')) 1c
133 mp4_file = self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.mp4') 1c
134 command2run = f'ffmpeg -i {str(avi_file)} -c:v copy -c:a copy -y {str(mp4_file)}' 1c
136 process = subprocess.Popen(command2run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 1c
137 info, error = process.communicate() 1c
138 if process.returncode == 0: 1c
139 # check the video meta matched and remove the original file
140 meta_avi = get_video_meta(avi_file) 1c
141 _ = meta_avi.pop('size') 1c
142 meta_mp4 = get_video_meta(mp4_file) 1c
143 match = True 1c
144 for key in meta_avi.keys(): 1c
145 if meta_avi[key] != meta_mp4[key]: 1c
146 match = False
148 # if all checks out we can remove the original avi
149 if match: 1c
150 avi_file.unlink() 1c
151 output_files.append(mp4_file) 1c
152 else:
153 _logger.error(f'avi and mp4 meta data do not match for {avi_file}')
154 else:
155 _logger.error(f'conversion to mp4 failed for {avi_file}: {error}')
157 return output_files 1c
160class VideoSyncQcCamlog(base_tasks.VideoTask):
161 """
162 Task to sync camera timestamps to main DAQ timestamps when camlog files are used. Specific to UCLA widefield implementation
163 """
164 priority = 40
165 job_size = 'small'
167 @property
168 def signature(self):
169 signature = { 1f
170 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] +
171 [(f'_iblrig_{cam}Camera.raw.camlog', self.device_collection, False) for cam in self.cameras] +
172 [(f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True),
173 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True),
174 (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True),
175 ('*.wiring.json', self.sync_collection, True),
176 ('*wheel.position.npy', 'alf', False),
177 ('*wheel.timestamps.npy', 'alf', False)],
178 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras]
179 }
181 return signature 1f
183 def extract_camera(self, save=True):
184 extractor = [partial(camera.CameraTimestampsCamlog, label) for label in self.cameras or []] 1f
185 kwargs = {'sync_type': self.sync, 'sync_collection': self.sync_collection, 'save': save} 1f
186 kwargs['sync'], kwargs['chmap'] = get_sync_and_chn_map(self.session_path, self.sync_collection) 1f
187 return run_extractor_classes(extractor, session_path=self.session_path, **kwargs) 1f
189 def run_qc(self, camera_data=None, update=True):
190 if camera_data is None: 1f
191 camera_data, _ = self.extract_camera(save=False)
192 qc = run_camera_qc( 1f
193 self.session_path, self.cameras, one=self.one, camlog=True, sync_collection=self.sync_collection, sync_type=self.sync,
194 update=update)
195 return qc 1f
197 def _run(self, update=True, **kwargs):
198 # Video timestamps extraction
199 data, output_files = self.extract_camera(save=True) 1f
201 # Video QC
202 self.run_qc(data, update=update) 1f
204 return output_files 1f
207class VideoSyncQcBpod(base_tasks.VideoTask):
208 """
209 Task to sync camera timestamps to main DAQ timestamps
210 N.B Signatures only reflect new daq naming convention, non-compatible with ephys when not running on server
211 """
212 priority = 40
213 job_size = 'small'
215 def __init__(self, *args, **kwargs):
216 super().__init__(*args, **kwargs) 1bjklmd
217 # Task collection (this needs to be specified in the task kwargs)
218 self.collection = self.get_task_collection(kwargs.get('collection', None)) 1bjklmd
219 # Task type (protocol)
220 self.protocol = self.get_protocol(kwargs.get('protocol', None), task_collection=self.collection) 1bjklmd
221 self.extractor = None 1bjklmd
223 @property
224 def signature(self):
225 signature = { 1bd
226 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] +
227 [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] +
228 [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] +
229 [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] +
230 [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] +
231 [('_iblrig_taskData.raw.*', self.collection, True),
232 ('_iblrig_taskSettings.raw.*', self.collection, True),
233 ('*wheel.position.npy', 'alf', False),
234 ('*wheel.timestamps.npy', 'alf', False)],
235 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras]
236 }
238 return signature 1bd
240 def extract_camera(self, save=True):
241 mp4_files = filter(lambda x: label_from_path(x) in self.cameras or [], 1bd
242 self.session_path.joinpath(self.device_collection).rglob('*.mp4'))
243 if self.cameras != ['left']: 1bd
244 raise NotImplementedError('Bpod Camera extraction currently only supports a left camera')
246 self.extractor = camera.CameraTimestampsBpod(self.session_path) 1bd
247 return self.extractor.extract(video_path=next(mp4_files), save=save, task_collection=self.collection) 1bd
249 def run_qc(self, camera_data=None, update=True):
250 if self.cameras != ['left']: 1bd
251 raise NotImplementedError('Bpod camera currently only supports a left camera')
252 if camera_data is None: 1bd
253 camera_data, _ = self.extract_camera(save=False)
254 qc = CameraQC( 1bd
255 self.session_path, 'left', sync_type='bpod', sync_collection=self.collection, one=self.one,
256 protocol=self.protocol)
257 qc.run(update=update) 1bd
258 return qc 1bd
260 def _run(self, update=True, **kwargs):
261 # Video timestamps extraction
262 data, output_files = self.extract_camera(save=True) 1bd
264 # Video QC
265 self.run_qc(data, update=update) 1bd
267 return output_files 1bd
270class VideoSyncQcNidq(base_tasks.VideoTask):
271 """
272 Task to sync camera timestamps to main DAQ timestamps
273 N.B Signatures only reflect new daq naming convention, non-compatible with ephys when not running on server
274 """
275 priority = 40
276 job_size = 'small'
278 @property
279 def signature(self):
280 signature = { 1e
281 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] +
282 [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] +
283 [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] +
284 [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] +
285 [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] +
286 [(f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True),
287 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True),
288 (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True),
289 (f'_{self.sync_namespace}_*.wiring.json', self.sync_collection, False),
290 (f'_{self.sync_namespace}_*.meta', self.sync_collection, True),
291 ('*wheel.position.npy', 'alf', False),
292 ('*wheel.timestamps.npy', 'alf', False),
293 ('*experiment.description*', '', False)],
294 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras]
295 }
297 return signature 1e
299 def extract_camera(self, save=True):
300 extractor = [partial(camera.CameraTimestampsFPGA, label) for label in self.cameras or []] 1e
301 kwargs = {'sync_type': self.sync, 'sync_collection': self.sync_collection, 'save': save} 1e
302 if self.sync_namespace == 'timeline': 1e
303 # Load sync from timeline file
304 alf_path = self.session_path / self.sync_collection
305 kwargs['sync'], kwargs['chmap'] = raw_daq_loaders.load_timeline_sync_and_chmap(alf_path)
306 else:
307 kwargs['sync'], kwargs['chmap'] = get_sync_and_chn_map(self.session_path, self.sync_collection) 1e
308 return run_extractor_classes(extractor, session_path=self.session_path, **kwargs) 1e
310 def run_qc(self, camera_data=None, update=True):
311 if camera_data is None: 1e
312 camera_data, _ = self.extract_camera(save=False)
313 qc = run_camera_qc( 1e
314 self.session_path, self.cameras, one=self.one, sync_collection=self.sync_collection, sync_type=self.sync,
315 update=update)
316 return qc 1e
318 def _run(self, update=True, **kwargs):
319 # Video timestamps extraction
320 data, output_files = self.extract_camera(save=True) 1e
322 # Video QC
323 self.run_qc(data, update=update) 1e
325 return output_files 1e
328class DLC(base_tasks.VideoTask):
329 """
330 This task relies on a correctly installed dlc environment as per
331 https://docs.google.com/document/d/1g0scP6_3EmaXCU4SsDNZWwDTaD9MG0es_grLA-d0gh0/edit#
333 If your environment is set up otherwise, make sure that you set the respective attributes:
334 t = EphysDLC(session_path)
335 t.dlcenv = Path('/path/to/your/dlcenv/bin/activate')
336 t.scripts = Path('/path/to/your/iblscripts/deploy/serverpc/dlc')
337 """
338 gpu = 1
339 cpu = 4
340 io_charge = 100
341 level = 2
342 force = True
343 job_size = 'large'
345 dlcenv = Path.home().joinpath('Documents', 'PYTHON', 'envs', 'dlcenv', 'bin', 'activate')
346 scripts = Path.home().joinpath('Documents', 'PYTHON', 'iblscripts', 'deploy', 'serverpc', 'dlc')
348 @property
349 def signature(self):
350 signature = {
351 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras],
352 'output_files': [(f'_ibl_{cam}Camera.dlc.pqt', 'alf', True) for cam in self.cameras] +
353 [(f'{cam}Camera.ROIMotionEnergy.npy', 'alf', True) for cam in self.cameras] +
354 [(f'{cam}ROIMotionEnergy.position.npy', 'alf', True) for cam in self.cameras]
355 }
357 return signature
359 def _check_dlcenv(self):
360 """Check that scripts are present, dlcenv can be activated and get iblvideo version"""
361 assert len(list(self.scripts.rglob('run_dlc.*'))) == 2, \
362 f'Scripts run_dlc.sh and run_dlc.py do not exist in {self.scripts}'
363 assert len(list(self.scripts.rglob('run_motion.*'))) == 2, \
364 f'Scripts run_motion.sh and run_motion.py do not exist in {self.scripts}'
365 assert self.dlcenv.exists(), f'DLC environment does not exist in assumed location {self.dlcenv}'
366 command2run = f"source {self.dlcenv}; python -c 'import iblvideo; print(iblvideo.__version__)'"
367 process = subprocess.Popen(
368 command2run,
369 shell=True,
370 stdout=subprocess.PIPE,
371 stderr=subprocess.PIPE,
372 executable='/bin/bash'
373 )
374 info, error = process.communicate()
375 if process.returncode != 0:
376 raise AssertionError(f"DLC environment check failed\n{error.decode('utf-8')}")
377 version = info.decode('utf-8').strip().split('\n')[-1]
378 return version
380 @staticmethod
381 def _video_intact(file_mp4):
382 """Checks that the downloaded video can be opened and is not empty"""
383 cap = cv2.VideoCapture(str(file_mp4))
384 frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
385 intact = True if frame_count > 0 else False
386 cap.release()
387 return intact
389 def _run(self, cams=None, overwrite=False):
390 # Check that the cams are valid for DLC, remove the ones that aren't
391 candidate_cams = cams or self.cameras
392 cams = []
393 for cam in candidate_cams:
394 try:
395 cams.append(assert_valid_label(cam))
396 except ValueError:
397 _logger.warning(f'{cam} is not a valid video label, this video will be skipped')
398 # Set up
399 self.session_id = self.one.path2eid(self.session_path)
400 actual_outputs = []
402 # Loop through cams
403 for cam in cams:
404 # Catch exceptions so that following cameras can still run
405 try:
406 # If all results exist and overwrite is False, skip computation
407 expected_outputs_present, expected_outputs = self.assert_expected(self.output_files, silent=True)
408 if overwrite is False and expected_outputs_present is True:
409 actual_outputs.extend(expected_outputs)
410 return actual_outputs
411 else:
412 file_mp4 = next(self.session_path.joinpath('raw_video_data').glob(f'_iblrig_{cam}Camera.raw*.mp4'))
413 if not file_mp4.exists():
414 # In this case we set the status to Incomplete.
415 _logger.error(f'No raw video file available for {cam}, skipping.')
416 self.status = -3
417 continue
418 if not self._video_intact(file_mp4):
419 _logger.error(f'Corrupt raw video file {file_mp4}')
420 self.status = -1
421 continue
422 # Check that dlc environment is ok, shell scripts exists, and get iblvideo version, GPU addressable
423 self.version = self._check_dlcenv()
424 _logger.info(f'iblvideo version {self.version}')
425 check_nvidia_driver()
427 _logger.info(f'Running DLC on {cam}Camera.')
428 command2run = f"{self.scripts.joinpath('run_dlc.sh')} {str(self.dlcenv)} {file_mp4} {overwrite}"
429 _logger.info(command2run)
430 process = subprocess.Popen(
431 command2run,
432 shell=True,
433 stdout=subprocess.PIPE,
434 stderr=subprocess.PIPE,
435 executable='/bin/bash',
436 )
437 info, error = process.communicate()
438 # info_str = info.decode("utf-8").strip()
439 # _logger.info(info_str)
440 if process.returncode != 0:
441 error_str = error.decode('utf-8').strip()
442 _logger.error(f'DLC failed for {cam}Camera.\n\n'
443 f'++++++++ Output of subprocess for debugging ++++++++\n\n'
444 f'{error_str}\n'
445 f'++++++++++++++++++++++++++++++++++++++++++++\n')
446 self.status = -1
447 # We dont' run motion energy, or add any files if dlc failed to run
448 continue
449 dlc_result = next(self.session_path.joinpath('alf').glob(f'_ibl_{cam}Camera.dlc*.pqt'))
450 actual_outputs.append(dlc_result)
452 _logger.info(f'Computing motion energy for {cam}Camera')
453 command2run = f"{self.scripts.joinpath('run_motion.sh')} {str(self.dlcenv)} {file_mp4} {dlc_result}"
454 _logger.info(command2run)
455 process = subprocess.Popen(
456 command2run,
457 shell=True,
458 stdout=subprocess.PIPE,
459 stderr=subprocess.PIPE,
460 executable='/bin/bash',
461 )
462 info, error = process.communicate()
463 # info_str = info.decode('utf-8').strip()
464 # _logger.info(info_str)
465 if process.returncode != 0:
466 error_str = error.decode('utf-8').strip()
467 _logger.error(f'Motion energy failed for {cam}Camera.\n\n'
468 f'++++++++ Output of subprocess for debugging ++++++++\n\n'
469 f'{error_str}\n'
470 f'++++++++++++++++++++++++++++++++++++++++++++\n')
471 self.status = -1
472 continue
473 actual_outputs.append(next(self.session_path.joinpath('alf').glob(
474 f'{cam}Camera.ROIMotionEnergy*.npy')))
475 actual_outputs.append(next(self.session_path.joinpath('alf').glob(
476 f'{cam}ROIMotionEnergy.position*.npy')))
477 except Exception:
478 _logger.error(traceback.format_exc())
479 self.status = -1
480 continue
481 # If status is Incomplete, check that there is at least one output.
482 # # Otherwise make sure it gets set to Empty (outputs = None), and set status to -1 to make sure it doesn't slip
483 if self.status == -3 and len(actual_outputs) == 0:
484 actual_outputs = None
485 self.status = -1
486 return actual_outputs
489class EphysPostDLC(base_tasks.VideoTask):
490 """
491 The post_dlc task takes dlc traces as input and computes useful quantities, as well as qc.
492 """
493 io_charge = 90
494 level = 3
495 force = True
497 def __init__(self, *args, **kwargs):
498 super().__init__(*args, **kwargs) 1anopqjrs
499 self.trials_collection = kwargs.get('trials_collection', 'alf') 1anopqjrs
501 @property
502 def signature(self):
503 return {
504 'input_files': [(f'_ibl_{cam}Camera.dlc.pqt', 'alf', True) for cam in self.cameras] +
505 [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras] +
506 # the following are required for the DLC plot only
507 # they are not strictly required, some plots just might be skipped
508 # In particular the raw videos don't need to be downloaded as they can be streamed
509 [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] +
510 [(f'{cam}ROIMotionEnergy.position.npy', 'alf', False) for cam in self.cameras] +
511 [(f'{cam}Camera.ROIMotionEnergy.npy', 'alf', False) for cam in self.cameras] +
512 # The trials table is used in the DLC QC, however this is not an essential dataset
513 [('_ibl_trials.table.pqt', self.trials_collection, False),
514 ('_ibl_wheel.position.npy', self.trials_collection, False),
515 ('_ibl_wheel.timestamps.npy', self.trials_collection, False)],
516 'output_files': [(f'_ibl_{cam}Camera.features.pqt', 'alf', True) for cam in self.cameras] +
517 [('licks.times.npy', 'alf', True)]
518 }
520 def _run(self, overwrite=True, run_qc=True, plot_qc=True):
521 """
522 Run the PostDLC task. Returns a list of file locations for the output files in signature. The created plot
523 (dlc_qc_plot.png) is not returned, but saved in session_path/snapshots and uploaded to Alyx as a note.
525 :param overwrite: bool, whether to recompute existing output files (default is False).
526 Note that the dlc_qc_plot will be (re-)computed even if overwrite = False
527 :param run_qc: bool, whether to run the DLC QC (default is True)
528 :param plot_qc: book, whether to create the dlc_qc_plot (default is True)
530 """
531 # Check if output files exist locally
532 exist, output_files = self.assert_expected(self.output_files, silent=True)
533 if exist and not overwrite:
534 _logger.warning('EphysPostDLC outputs exist and overwrite=False, skipping computations of outputs.')
535 else:
536 if exist and overwrite:
537 _logger.warning('EphysPostDLC outputs exist and overwrite=True, overwriting existing outputs.')
538 # Find all available DLC files
539 dlc_files = list(Path(self.session_path).joinpath('alf').rglob('_ibl_*Camera.dlc.*'))
540 for dlc_file in dlc_files:
541 _logger.debug(dlc_file)
542 output_files = []
543 combined_licks = []
545 for dlc_file in dlc_files:
546 # Catch unforeseen exceptions and move on to next cam
547 try:
548 cam = label_from_path(dlc_file)
549 # load dlc trace and camera times
550 dlc = pd.read_parquet(dlc_file)
551 dlc_thresh = likelihood_threshold(dlc, 0.9)
552 # try to load respective camera times
553 try:
554 dlc_t = np.load(next(Path(self.session_path).joinpath('alf').rglob(f'_ibl_{cam}Camera.times.*npy')))
555 times = True
556 if dlc_t.shape[0] == 0:
557 _logger.error(f'camera.times empty for {cam} camera. '
558 f'Computations using camera.times will be skipped')
559 self.status = -1
560 times = False
561 elif dlc_t.shape[0] < len(dlc_thresh):
562 _logger.error(f'Camera times shorter than DLC traces for {cam} camera. '
563 f'Computations using camera.times will be skipped')
564 self.status = -1
565 times = 'short'
566 except StopIteration:
567 self.status = -1
568 times = False
569 _logger.error(f'No camera.times for {cam} camera. '
570 f'Computations using camera.times will be skipped')
571 # These features are only computed from left and right cam
572 if cam in ('left', 'right'):
573 features = pd.DataFrame()
574 # If camera times are available, get the lick time stamps for combined array
575 if times is True:
576 _logger.info(f'Computing lick times for {cam} camera.')
577 combined_licks.append(get_licks(dlc_thresh, dlc_t))
578 elif times is False:
579 _logger.warning(f'Skipping lick times for {cam} camera as no camera.times available')
580 elif times == 'short':
581 _logger.warning(f'Skipping lick times for {cam} camera as camera.times are too short')
582 # Compute pupil diameter, raw and smoothed
583 _logger.info(f'Computing raw pupil diameter for {cam} camera.')
584 features['pupilDiameter_raw'] = get_pupil_diameter(dlc_thresh)
585 try:
586 _logger.info(f'Computing smooth pupil diameter for {cam} camera.')
587 features['pupilDiameter_smooth'] = get_smooth_pupil_diameter(features['pupilDiameter_raw'],
588 cam)
589 except Exception:
590 _logger.error(f'Computing smooth pupil diameter for {cam} camera failed, saving all NaNs.')
591 _logger.error(traceback.format_exc())
592 features['pupilDiameter_smooth'] = np.nan
593 # Save to parquet
594 features_file = Path(self.session_path).joinpath('alf', f'_ibl_{cam}Camera.features.pqt')
595 features.to_parquet(features_file)
596 output_files.append(features_file)
598 # For all cams, compute DLC QC if times available
599 if run_qc is True and times in [True, 'short']:
600 # Setting download_data to False because at this point the data should be there
601 qc = DlcQC(self.session_path, side=cam, one=self.one, download_data=False)
602 qc.run(update=True)
603 else:
604 if times is False:
605 _logger.warning(f'Skipping QC for {cam} camera as no camera.times available')
606 if not run_qc:
607 _logger.warning(f'Skipping QC for {cam} camera as run_qc=False')
609 except Exception:
610 _logger.error(traceback.format_exc())
611 self.status = -1
612 continue
614 # Combined lick times
615 if len(combined_licks) > 0:
616 lick_times_file = Path(self.session_path).joinpath('alf', 'licks.times.npy')
617 np.save(lick_times_file, sorted(np.concatenate(combined_licks)))
618 output_files.append(lick_times_file)
619 else:
620 _logger.warning('No lick times computed for this session.')
622 if plot_qc:
623 _logger.info('Creating DLC QC plot')
624 try:
625 session_id = self.one.path2eid(self.session_path)
626 fig_path = self.session_path.joinpath('snapshot', 'dlc_qc_plot.png')
627 if not fig_path.parent.exists():
628 fig_path.parent.mkdir(parents=True, exist_ok=True)
629 fig = dlc_qc_plot(self.session_path, one=self.one, cameras=self.cameras, device_collection=self.device_collection,
630 trials_collection=self.trials_collection)
631 fig.savefig(fig_path)
632 fig.clf()
633 snp = ReportSnapshot(self.session_path, session_id, one=self.one)
634 snp.outputs = [fig_path]
635 snp.register_images(widths=['orig'],
636 function=str(dlc_qc_plot.__module__) + '.' + str(dlc_qc_plot.__name__))
637 except Exception:
638 _logger.error('Could not create and/or upload DLC QC Plot')
639 _logger.error(traceback.format_exc())
640 self.status = -1
642 return output_files
645class LightningPose(base_tasks.VideoTask):
646 # TODO: make one task per cam?
647 # TODO: separate pose and motion energy
648 gpu = 1
649 io_charge = 100
650 level = 2
651 force = True
652 job_size = 'large'
653 env = 'litpose'
655 lpenv = Path.home().joinpath('Documents', 'PYTHON', 'envs', 'litpose', 'bin', 'activate')
656 scripts = Path.home().joinpath('Documents', 'PYTHON', 'iblscripts', 'deploy', 'serverpc', 'litpose')
658 @property
659 def signature(self):
660 signature = { 1g
661 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras],
662 'output_files': [(f'_ibl_{cam}Camera.lightningPose.pqt', 'alf', True) for cam in self.cameras] +
663 [(f'{cam}Camera.ROIMotionEnergy.npy', 'alf', True) for cam in self.cameras] +
664 [(f'{cam}ROIMotionEnergy.position.npy', 'alf', True) for cam in self.cameras]
665 }
667 return signature 1g
669 @staticmethod
670 def _video_intact(file_mp4):
671 """Checks that the downloaded video can be opened and is not empty"""
672 cap = cv2.VideoCapture(str(file_mp4))
673 frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
674 intact = True if frame_count > 0 else False
675 cap.release()
676 return intact
678 def _check_env(self):
679 """Check that scripts are present, env can be activated and get iblvideo version"""
680 assert len(list(self.scripts.rglob('run_litpose.*'))) == 2, \
681 f'Scripts run_litpose.sh and run_litpose.py do not exist in {self.scripts}'
682 assert self.lpenv.exists(), f"environment does not exist in assumed location {self.lpenv}"
683 command2run = f"source {self.lpenv}; python -c 'import iblvideo; print(iblvideo.__version__)'"
684 process = subprocess.Popen(
685 command2run,
686 shell=True,
687 stdout=subprocess.PIPE,
688 stderr=subprocess.PIPE,
689 executable="/bin/bash"
690 )
691 info, error = process.communicate()
692 if process.returncode != 0:
693 raise AssertionError(f"environment check failed\n{error.decode('utf-8')}")
694 version = info.decode("utf-8").strip().split('\n')[-1]
695 return version
697 def _run(self, overwrite=True, **kwargs):
699 # Gather video files
700 self.session_path = Path(self.session_path) 1g
701 mp4_files = [ 1g
702 self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.mp4') for cam in self.cameras
703 if self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.mp4').exists()
704 ]
706 labels = [label_from_path(x) for x in mp4_files] 1g
707 _logger.info(f'Running on {labels} videos') 1g
709 # Check the environment
710 self.version = self._check_env() 1g
711 _logger.info(f'iblvideo version {self.version}') 1g
713 # If all results exist and overwrite is False, skip computation
714 expected_outputs_present, expected_outputs = self.assert_expected(self.output_files, silent=True) 1g
715 if overwrite is False and expected_outputs_present is True: 1g
716 actual_outputs = expected_outputs 1g
717 return actual_outputs 1g
719 # Else, loop over videos
720 actual_outputs = []
721 for label, mp4_file in zip(labels, mp4_files):
722 # Catch exceptions so that the other cams can still run but set status to Errored
723 try:
724 # Check that the GPU is (still) accessible
725 check_nvidia_driver()
726 # Check that the video can be loaded
727 if not self._video_intact(mp4_file):
728 _logger.error(f"Corrupt raw video file {mp4_file}")
729 self.status = -1
730 continue
732 # ---------------------------
733 # Run pose estimation
734 # ---------------------------
735 t0 = time.time()
736 _logger.info(f'Running Lightning Pose on {label}Camera.')
737 command2run = f"{self.scripts.joinpath('run_litpose.sh')} {str(self.lpenv)} {mp4_file} {overwrite}"
738 _logger.info(command2run)
739 process = subprocess.Popen(
740 command2run,
741 shell=True,
742 stdout=subprocess.PIPE,
743 stderr=subprocess.PIPE,
744 executable="/bin/bash",
745 )
746 info, error = process.communicate()
747 if process.returncode != 0:
748 error_str = error.decode("utf-8").strip()
749 _logger.error(
750 f'Lightning pose failed for {label}Camera.\n\n'
751 f'++++++++ Output of subprocess for debugging ++++++++\n\n'
752 f'{error_str}\n'
753 f'++++++++++++++++++++++++++++++++++++++++++++\n'
754 )
755 self.status = -1
756 # We don't run motion energy, or add any files if LP failed to run
757 continue
758 else:
759 _logger.info(f'{label} camera took {(time.time() - t0)} seconds')
760 result = next(self.session_path.joinpath('alf').glob(f'_ibl_{label}Camera.lightningPose*.pqt'))
761 actual_outputs.append(result)
763 # ---------------------------
764 # Run motion energy
765 # ---------------------------
766 t1 = time.time()
767 _logger.info(f'Computing motion energy for {label}Camera')
768 command2run = f"{self.scripts.joinpath('run_motion.sh')} {str(self.lpenv)} {mp4_file} {result}"
769 _logger.info(command2run)
770 process = subprocess.Popen(
771 command2run,
772 shell=True,
773 stdout=subprocess.PIPE,
774 stderr=subprocess.PIPE,
775 executable='/bin/bash',
776 )
777 info, error = process.communicate()
778 if process.returncode != 0:
779 error_str = error.decode('utf-8').strip()
780 _logger.error(
781 f'Motion energy failed for {label}Camera.\n\n'
782 f'++++++++ Output of subprocess for debugging ++++++++\n\n'
783 f'{error_str}\n'
784 f'++++++++++++++++++++++++++++++++++++++++++++\n'
785 )
786 self.status = -1
787 continue
788 else:
789 _logger.info(f'{label} camera took {(time.time() - t1)} seconds')
790 actual_outputs.append(next(self.session_path.joinpath('alf').glob(
791 f'{label}Camera.ROIMotionEnergy*.npy')))
792 actual_outputs.append(next(self.session_path.joinpath('alf').glob(
793 f'{label}ROIMotionEnergy.position*.npy')))
795 except BaseException:
796 _logger.error(traceback.format_exc())
797 self.status = -1
798 continue
800 # catch here if there are no raw videos present
801 if len(actual_outputs) == 0:
802 _logger.info('Did not find any videos for this session')
803 actual_outputs = None
804 self.status = -1
806 return actual_outputs