Coverage for ibllib/pipes/video_tasks.py: 45%
438 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 09:55 +0000
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 09:55 +0000
1import logging
2import subprocess
3import time
4import traceback
5from pathlib import Path
6from functools import partial
8import cv2
9import pandas as pd
10import numpy as np
12from ibllib.qc.dlc import DlcQC
13from ibllib.io import ffmpeg, raw_daq_loaders
14from ibllib.pipes import base_tasks
15from ibllib.io.video import get_video_meta
16from ibllib.io.extractors import camera
17from ibllib.io.extractors.base import run_extractor_classes
18from ibllib.io.extractors.ephys_fpga import get_sync_and_chn_map
19from ibllib.qc.camera import run_all_qc as run_camera_qc, CameraQC
20from ibllib.misc import check_nvidia_driver
21from ibllib.io.video import label_from_path, assert_valid_label
22from ibllib.plots.snapshot import ReportSnapshot
23from ibllib.plots.figures import dlc_qc_plot
24from brainbox.behavior.dlc import likelihood_threshold, get_licks, get_pupil_diameter, get_smooth_pupil_diameter
26_logger = logging.getLogger('ibllib')
29class VideoRegisterRaw(base_tasks.VideoTask, base_tasks.RegisterRawDataTask):
30 """
31 Task to register raw video data. Builds up list of files to register from list of cameras given in session params file
32 """
34 priority = 100
35 job_size = 'small'
37 @property
38 def signature(self):
39 signature = { 1b
40 'input_files': [],
41 'output_files':
42 [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] +
43 [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] +
44 [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] +
45 [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] +
46 [('_iblrig_videoCodeFiles.raw*', self.device_collection, False)]
47 }
48 return signature 1b
50 def assert_expected_outputs(self, raise_error=True):
51 """
52 frameData replaces the timestamps file. Therefore if frameData is present, timestamps is
53 optional and vice versa.
54 """
55 assert self.status == 0
56 _logger.info('Checking output files')
57 everything_is_fine, files = self.assert_expected(self.output_files)
59 required = any('Camera.frameData' in x or 'Camera.timestamps' in x for x in map(str, files))
60 if not (everything_is_fine and required):
61 for out in self.outputs:
62 _logger.error(f'{out}')
63 if raise_error:
64 raise FileNotFoundError('Missing outputs after task completion')
66 return everything_is_fine, files
69class VideoCompress(base_tasks.VideoTask):
70 """
71 Task to compress raw video data from .avi to .mp4 format.
72 """
73 priority = 90
74 job_size = 'large'
76 @property
77 def signature(self):
78 signature = { 1abhi
79 'input_files': [(f'_iblrig_{cam}Camera.raw.*', self.device_collection, True) for cam in self.cameras],
80 'output_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras]
81 }
82 return signature 1abhi
84 def _run(self):
85 # TODO different compression parameters based on whether it is training or not based on number of cameras?
86 # avi to mp4 compression
87 if self.sync == 'bpod': 1abhi
88 command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 29 ' 1bh
89 '-nostats -codec:a copy {file_out}')
90 else:
91 command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 17 ' 1ai
92 '-loglevel 0 -codec:a copy {file_out}')
94 output_files = ffmpeg.iblrig_video_compression(self.session_path, command) 1abhi
96 if len(output_files) == 0: 1abhi
97 _logger.info('No compressed videos found')
98 return
100 return output_files 1abhi
103class VideoConvert(base_tasks.VideoTask):
104 """
105 Task that converts compressed avi to mp4 format and renames video and camlog files. Specific to UCLA widefield implementation
106 """
107 priority = 90
108 job_size = 'small'
110 @property
111 def signature(self):
112 signature = { 1c
113 'input_files': [(f'{cam}_cam*.avi', self.device_collection, True) for cam in self.cameras] +
114 [(f'{cam}_cam*.camlog', self.device_collection, False) for cam in self.cameras],
115 'output_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] +
116 [(f'_iblrig_{cam}Camera.raw.camlog', self.device_collection, True) for cam in self.cameras]
117 }
119 return signature 1c
121 def _run(self):
122 output_files = [] 1c
123 for cam in self.cameras: 1c
125 # rename and register the camlog files
126 camlog_file = next(self.session_path.joinpath(self.device_collection).glob(f'{cam}_cam*.camlog')) 1c
127 new_camlog_file = self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.camlog') 1c
128 camlog_file.replace(new_camlog_file) 1c
129 output_files.append(new_camlog_file) 1c
131 # convert the avi files to mp4
132 avi_file = next(self.session_path.joinpath(self.device_collection).glob(f'{cam}_cam*.avi')) 1c
133 mp4_file = self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.mp4') 1c
134 command2run = f'ffmpeg -i {str(avi_file)} -c:v copy -c:a copy -y {str(mp4_file)}' 1c
136 process = subprocess.Popen(command2run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 1c
137 info, error = process.communicate() 1c
138 if process.returncode == 0: 1c
139 # check the video meta matched and remove the original file
140 meta_avi = get_video_meta(avi_file) 1c
141 _ = meta_avi.pop('size') 1c
142 meta_mp4 = get_video_meta(mp4_file) 1c
143 match = True 1c
144 for key in meta_avi.keys(): 1c
145 if meta_avi[key] != meta_mp4[key]: 1c
146 match = False
148 # if all checks out we can remove the original avi
149 if match: 1c
150 avi_file.unlink() 1c
151 output_files.append(mp4_file) 1c
152 else:
153 _logger.error(f'avi and mp4 meta data do not match for {avi_file}')
154 else:
155 _logger.error(f'conversion to mp4 failed for {avi_file}: {error}')
157 return output_files 1c
160class VideoSyncQcCamlog(base_tasks.VideoTask):
161 """
162 Task to sync camera timestamps to main DAQ timestamps when camlog files are used. Specific to UCLA widefield implementation
163 """
164 priority = 40
165 job_size = 'small'
167 @property
168 def signature(self):
169 signature = { 1f
170 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] +
171 [(f'_iblrig_{cam}Camera.raw.camlog', self.device_collection, False) for cam in self.cameras] +
172 [(f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True),
173 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True),
174 (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True),
175 ('*.wiring.json', self.sync_collection, True),
176 ('*wheel.position.npy', 'alf', False),
177 ('*wheel.timestamps.npy', 'alf', False)],
178 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras]
179 }
181 return signature 1f
183 def extract_camera(self, save=True):
184 extractor = [partial(camera.CameraTimestampsCamlog, label) for label in self.cameras or []] 1f
185 kwargs = {'sync_type': self.sync, 'sync_collection': self.sync_collection, 'save': save} 1f
186 kwargs['sync'], kwargs['chmap'] = get_sync_and_chn_map(self.session_path, self.sync_collection) 1f
187 return run_extractor_classes(extractor, session_path=self.session_path, **kwargs) 1f
189 def run_qc(self, camera_data=None, update=True):
190 if camera_data is None: 1f
191 camera_data, _ = self.extract_camera(save=False)
192 qc = run_camera_qc( 1f
193 self.session_path, self.cameras, one=self.one, camlog=True, sync_collection=self.sync_collection, sync_type=self.sync,
194 update=update)
195 return qc 1f
197 def _run(self, update=True, **kwargs):
198 # Video timestamps extraction
199 data, output_files = self.extract_camera(save=True) 1f
201 # Video QC
202 self.run_qc(data, update=update) 1f
204 return output_files 1f
207class VideoSyncQcBpod(base_tasks.VideoTask):
208 """
209 Task to sync camera timestamps to main DAQ timestamps
210 N.B Signatures only reflect new daq naming convention, non-compatible with ephys when not running on server
211 """
212 priority = 40
213 job_size = 'small'
215 def __init__(self, *args, **kwargs):
216 super().__init__(*args, **kwargs) 1bjklmd
217 # Task collection (this needs to be specified in the task kwargs)
218 self.collection = self.get_task_collection(kwargs.get('collection', None)) 1bjklmd
219 # Task type (protocol)
220 self.protocol = self.get_protocol(kwargs.get('protocol', None), task_collection=self.collection) 1bjklmd
221 self.extractor = None 1bjklmd
223 @property
224 def signature(self):
225 signature = { 1bd
226 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] +
227 [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] +
228 [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] +
229 [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] +
230 [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] +
231 [('_iblrig_taskData.raw.*', self.collection, True),
232 ('_iblrig_taskSettings.raw.*', self.collection, True),
233 ('*wheel.position.npy', 'alf', False),
234 ('*wheel.timestamps.npy', 'alf', False)],
235 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras]
236 }
238 return signature 1bd
240 def extract_camera(self, save=True):
241 mp4_files = filter(lambda x: label_from_path(x) in self.cameras or [], 1bd
242 self.session_path.joinpath(self.device_collection).rglob('*.mp4'))
243 if self.cameras != ['left']: 1bd
244 raise NotImplementedError('Bpod Camera extraction currently only supports a left camera')
246 self.extractor = camera.CameraTimestampsBpod(self.session_path) 1bd
247 return self.extractor.extract(video_path=next(mp4_files), save=save, task_collection=self.collection) 1bd
249 def run_qc(self, camera_data=None, update=True):
250 if self.cameras != ['left']: 1bd
251 raise NotImplementedError('Bpod camera currently only supports a left camera')
252 if camera_data is None: 1bd
253 camera_data, _ = self.extract_camera(save=False)
254 qc = CameraQC( 1bd
255 self.session_path, 'left', sync_type='bpod', sync_collection=self.collection, one=self.one)
256 qc.run(update=update) 1bd
257 return qc 1bd
259 def _run(self, update=True, **kwargs):
260 # Video timestamps extraction
261 data, output_files = self.extract_camera(save=True) 1bd
263 # Video QC
264 self.run_qc(data, update=update) 1bd
266 return output_files 1bd
269class VideoSyncQcNidq(base_tasks.VideoTask):
270 """
271 Task to sync camera timestamps to main DAQ timestamps
272 N.B Signatures only reflect new daq naming convention, non-compatible with ephys when not running on server
273 """
274 priority = 40
275 job_size = 'small'
277 @property
278 def signature(self):
279 signature = { 1e
280 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] +
281 [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] +
282 [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] +
283 [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] +
284 [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] +
285 [(f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True),
286 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True),
287 (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True),
288 (f'_{self.sync_namespace}_*.wiring.json', self.sync_collection, False),
289 (f'_{self.sync_namespace}_*.meta', self.sync_collection, True),
290 ('*wheel.position.npy', 'alf', False),
291 ('*wheel.timestamps.npy', 'alf', False),
292 ('*experiment.description*', '', False)],
293 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras]
294 }
296 return signature 1e
298 def extract_camera(self, save=True):
299 extractor = [partial(camera.CameraTimestampsFPGA, label) for label in self.cameras or []] 1e
300 kwargs = {'sync_type': self.sync, 'sync_collection': self.sync_collection, 'save': save} 1e
301 if self.sync_namespace == 'timeline': 1e
302 # Load sync from timeline file
303 alf_path = self.session_path / self.sync_collection
304 kwargs['sync'], kwargs['chmap'] = raw_daq_loaders.load_timeline_sync_and_chmap(alf_path)
305 else:
306 kwargs['sync'], kwargs['chmap'] = get_sync_and_chn_map(self.session_path, self.sync_collection) 1e
307 return run_extractor_classes(extractor, session_path=self.session_path, **kwargs) 1e
309 def run_qc(self, camera_data=None, update=True):
310 if camera_data is None: 1e
311 camera_data, _ = self.extract_camera(save=False)
312 qc = run_camera_qc( 1e
313 self.session_path, self.cameras, one=self.one, sync_collection=self.sync_collection, sync_type=self.sync,
314 update=update)
315 return qc 1e
317 def _run(self, update=True, **kwargs):
318 # Video timestamps extraction
319 data, output_files = self.extract_camera(save=True) 1e
321 # Video QC
322 self.run_qc(data, update=update) 1e
324 return output_files 1e
327class DLC(base_tasks.VideoTask):
328 """
329 This task relies on a correctly installed dlc environment as per
330 https://docs.google.com/document/d/1g0scP6_3EmaXCU4SsDNZWwDTaD9MG0es_grLA-d0gh0/edit#
332 If your environment is set up otherwise, make sure that you set the respective attributes:
333 t = EphysDLC(session_path)
334 t.dlcenv = Path('/path/to/your/dlcenv/bin/activate')
335 t.scripts = Path('/path/to/your/iblscripts/deploy/serverpc/dlc')
336 """
337 gpu = 1
338 cpu = 4
339 io_charge = 100
340 level = 2
341 force = True
342 job_size = 'large'
344 dlcenv = Path.home().joinpath('Documents', 'PYTHON', 'envs', 'dlcenv', 'bin', 'activate')
345 scripts = Path.home().joinpath('Documents', 'PYTHON', 'iblscripts', 'deploy', 'serverpc', 'dlc')
347 @property
348 def signature(self):
349 signature = {
350 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras],
351 'output_files': [(f'_ibl_{cam}Camera.dlc.pqt', 'alf', True) for cam in self.cameras] +
352 [(f'{cam}Camera.ROIMotionEnergy.npy', 'alf', True) for cam in self.cameras] +
353 [(f'{cam}ROIMotionEnergy.position.npy', 'alf', True) for cam in self.cameras]
354 }
356 return signature
358 def _check_dlcenv(self):
359 """Check that scripts are present, dlcenv can be activated and get iblvideo version"""
360 assert len(list(self.scripts.rglob('run_dlc.*'))) == 2, \
361 f'Scripts run_dlc.sh and run_dlc.py do not exist in {self.scripts}'
362 assert len(list(self.scripts.rglob('run_motion.*'))) == 2, \
363 f'Scripts run_motion.sh and run_motion.py do not exist in {self.scripts}'
364 assert self.dlcenv.exists(), f'DLC environment does not exist in assumed location {self.dlcenv}'
365 command2run = f"source {self.dlcenv}; python -c 'import iblvideo; print(iblvideo.__version__)'"
366 process = subprocess.Popen(
367 command2run,
368 shell=True,
369 stdout=subprocess.PIPE,
370 stderr=subprocess.PIPE,
371 executable='/bin/bash'
372 )
373 info, error = process.communicate()
374 if process.returncode != 0:
375 raise AssertionError(f"DLC environment check failed\n{error.decode('utf-8')}")
376 version = info.decode('utf-8').strip().split('\n')[-1]
377 return version
379 @staticmethod
380 def _video_intact(file_mp4):
381 """Checks that the downloaded video can be opened and is not empty"""
382 cap = cv2.VideoCapture(str(file_mp4))
383 frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
384 intact = True if frame_count > 0 else False
385 cap.release()
386 return intact
388 def _run(self, cams=None, overwrite=False):
389 # Check that the cams are valid for DLC, remove the ones that aren't
390 candidate_cams = cams or self.cameras
391 cams = []
392 for cam in candidate_cams:
393 try:
394 cams.append(assert_valid_label(cam))
395 except ValueError:
396 _logger.warning(f'{cam} is not a valid video label, this video will be skipped')
397 # Set up
398 self.session_id = self.one.path2eid(self.session_path)
399 actual_outputs = []
401 # Loop through cams
402 for cam in cams:
403 # Catch exceptions so that following cameras can still run
404 try:
405 # If all results exist and overwrite is False, skip computation
406 expected_outputs_present, expected_outputs = self.assert_expected(self.output_files, silent=True)
407 if overwrite is False and expected_outputs_present is True:
408 actual_outputs.extend(expected_outputs)
409 return actual_outputs
410 else:
411 file_mp4 = next(self.session_path.joinpath('raw_video_data').glob(f'_iblrig_{cam}Camera.raw*.mp4'))
412 if not file_mp4.exists():
413 # In this case we set the status to Incomplete.
414 _logger.error(f'No raw video file available for {cam}, skipping.')
415 self.status = -3
416 continue
417 if not self._video_intact(file_mp4):
418 _logger.error(f'Corrupt raw video file {file_mp4}')
419 self.status = -1
420 continue
421 # Check that dlc environment is ok, shell scripts exists, and get iblvideo version, GPU addressable
422 self.version = self._check_dlcenv()
423 _logger.info(f'iblvideo version {self.version}')
424 check_nvidia_driver()
426 _logger.info(f'Running DLC on {cam}Camera.')
427 command2run = f"{self.scripts.joinpath('run_dlc.sh')} {str(self.dlcenv)} {file_mp4} {overwrite}"
428 _logger.info(command2run)
429 process = subprocess.Popen(
430 command2run,
431 shell=True,
432 stdout=subprocess.PIPE,
433 stderr=subprocess.PIPE,
434 executable='/bin/bash',
435 )
436 info, error = process.communicate()
437 # info_str = info.decode("utf-8").strip()
438 # _logger.info(info_str)
439 if process.returncode != 0:
440 error_str = error.decode('utf-8').strip()
441 _logger.error(f'DLC failed for {cam}Camera.\n\n'
442 f'++++++++ Output of subprocess for debugging ++++++++\n\n'
443 f'{error_str}\n'
444 f'++++++++++++++++++++++++++++++++++++++++++++\n')
445 self.status = -1
446 # We dont' run motion energy, or add any files if dlc failed to run
447 continue
448 dlc_result = next(self.session_path.joinpath('alf').glob(f'_ibl_{cam}Camera.dlc*.pqt'))
449 actual_outputs.append(dlc_result)
451 _logger.info(f'Computing motion energy for {cam}Camera')
452 command2run = f"{self.scripts.joinpath('run_motion.sh')} {str(self.dlcenv)} {file_mp4} {dlc_result}"
453 _logger.info(command2run)
454 process = subprocess.Popen(
455 command2run,
456 shell=True,
457 stdout=subprocess.PIPE,
458 stderr=subprocess.PIPE,
459 executable='/bin/bash',
460 )
461 info, error = process.communicate()
462 # info_str = info.decode('utf-8').strip()
463 # _logger.info(info_str)
464 if process.returncode != 0:
465 error_str = error.decode('utf-8').strip()
466 _logger.error(f'Motion energy failed for {cam}Camera.\n\n'
467 f'++++++++ Output of subprocess for debugging ++++++++\n\n'
468 f'{error_str}\n'
469 f'++++++++++++++++++++++++++++++++++++++++++++\n')
470 self.status = -1
471 continue
472 actual_outputs.append(next(self.session_path.joinpath('alf').glob(
473 f'{cam}Camera.ROIMotionEnergy*.npy')))
474 actual_outputs.append(next(self.session_path.joinpath('alf').glob(
475 f'{cam}ROIMotionEnergy.position*.npy')))
476 except Exception:
477 _logger.error(traceback.format_exc())
478 self.status = -1
479 continue
480 # If status is Incomplete, check that there is at least one output.
481 # # Otherwise make sure it gets set to Empty (outputs = None), and set status to -1 to make sure it doesn't slip
482 if self.status == -3 and len(actual_outputs) == 0:
483 actual_outputs = None
484 self.status = -1
485 return actual_outputs
488class EphysPostDLC(base_tasks.VideoTask):
489 """
490 The post_dlc task takes dlc traces as input and computes useful quantities, as well as qc.
491 """
492 io_charge = 90
493 level = 3
494 force = True
496 def __init__(self, *args, **kwargs):
497 super().__init__(*args, **kwargs) 1anopqjrs
498 self.trials_collection = kwargs.get('trials_collection', 'alf') 1anopqjrs
500 @property
501 def signature(self):
502 return {
503 'input_files': [(f'_ibl_{cam}Camera.dlc.pqt', 'alf', True) for cam in self.cameras] +
504 [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras] +
505 # the following are required for the DLC plot only
506 # they are not strictly required, some plots just might be skipped
507 # In particular the raw videos don't need to be downloaded as they can be streamed
508 [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] +
509 [(f'{cam}ROIMotionEnergy.position.npy', 'alf', False) for cam in self.cameras] +
510 [(f'{cam}Camera.ROIMotionEnergy.npy', 'alf', False) for cam in self.cameras] +
511 # The trials table is used in the DLC QC, however this is not an essential dataset
512 [('_ibl_trials.table.pqt', self.trials_collection, False),
513 ('_ibl_wheel.position.npy', self.trials_collection, False),
514 ('_ibl_wheel.timestamps.npy', self.trials_collection, False)],
515 'output_files': [(f'_ibl_{cam}Camera.features.pqt', 'alf', True) for cam in self.cameras] +
516 [('licks.times.npy', 'alf', True)]
517 }
519 def _run(self, overwrite=True, run_qc=True, plot_qc=True):
520 """
521 Run the PostDLC task. Returns a list of file locations for the output files in signature. The created plot
522 (dlc_qc_plot.png) is not returned, but saved in session_path/snapshots and uploaded to Alyx as a note.
524 :param overwrite: bool, whether to recompute existing output files (default is False).
525 Note that the dlc_qc_plot will be (re-)computed even if overwrite = False
526 :param run_qc: bool, whether to run the DLC QC (default is True)
527 :param plot_qc: book, whether to create the dlc_qc_plot (default is True)
529 """
530 # Check if output files exist locally
531 exist, output_files = self.assert_expected(self.output_files, silent=True)
532 if exist and not overwrite:
533 _logger.warning('EphysPostDLC outputs exist and overwrite=False, skipping computations of outputs.')
534 else:
535 if exist and overwrite:
536 _logger.warning('EphysPostDLC outputs exist and overwrite=True, overwriting existing outputs.')
537 # Find all available DLC files
538 dlc_files = list(Path(self.session_path).joinpath('alf').rglob('_ibl_*Camera.dlc.*'))
539 for dlc_file in dlc_files:
540 _logger.debug(dlc_file)
541 output_files = []
542 combined_licks = []
544 for dlc_file in dlc_files:
545 # Catch unforeseen exceptions and move on to next cam
546 try:
547 cam = label_from_path(dlc_file)
548 # load dlc trace and camera times
549 dlc = pd.read_parquet(dlc_file)
550 dlc_thresh = likelihood_threshold(dlc, 0.9)
551 # try to load respective camera times
552 try:
553 dlc_t = np.load(next(Path(self.session_path).joinpath('alf').rglob(f'_ibl_{cam}Camera.times.*npy')))
554 times = True
555 if dlc_t.shape[0] == 0:
556 _logger.error(f'camera.times empty for {cam} camera. '
557 f'Computations using camera.times will be skipped')
558 self.status = -1
559 times = False
560 elif dlc_t.shape[0] < len(dlc_thresh):
561 _logger.error(f'Camera times shorter than DLC traces for {cam} camera. '
562 f'Computations using camera.times will be skipped')
563 self.status = -1
564 times = 'short'
565 except StopIteration:
566 self.status = -1
567 times = False
568 _logger.error(f'No camera.times for {cam} camera. '
569 f'Computations using camera.times will be skipped')
570 # These features are only computed from left and right cam
571 if cam in ('left', 'right'):
572 features = pd.DataFrame()
573 # If camera times are available, get the lick time stamps for combined array
574 if times is True:
575 _logger.info(f'Computing lick times for {cam} camera.')
576 combined_licks.append(get_licks(dlc_thresh, dlc_t))
577 elif times is False:
578 _logger.warning(f'Skipping lick times for {cam} camera as no camera.times available')
579 elif times == 'short':
580 _logger.warning(f'Skipping lick times for {cam} camera as camera.times are too short')
581 # Compute pupil diameter, raw and smoothed
582 _logger.info(f'Computing raw pupil diameter for {cam} camera.')
583 features['pupilDiameter_raw'] = get_pupil_diameter(dlc_thresh)
584 try:
585 _logger.info(f'Computing smooth pupil diameter for {cam} camera.')
586 features['pupilDiameter_smooth'] = get_smooth_pupil_diameter(features['pupilDiameter_raw'],
587 cam)
588 except Exception:
589 _logger.error(f'Computing smooth pupil diameter for {cam} camera failed, saving all NaNs.')
590 _logger.error(traceback.format_exc())
591 features['pupilDiameter_smooth'] = np.nan
592 # Save to parquet
593 features_file = Path(self.session_path).joinpath('alf', f'_ibl_{cam}Camera.features.pqt')
594 features.to_parquet(features_file)
595 output_files.append(features_file)
597 # For all cams, compute DLC QC if times available
598 if run_qc is True and times in [True, 'short']:
599 # Setting download_data to False because at this point the data should be there
600 qc = DlcQC(self.session_path, side=cam, one=self.one, download_data=False)
601 qc.run(update=True)
602 else:
603 if times is False:
604 _logger.warning(f'Skipping QC for {cam} camera as no camera.times available')
605 if not run_qc:
606 _logger.warning(f'Skipping QC for {cam} camera as run_qc=False')
608 except Exception:
609 _logger.error(traceback.format_exc())
610 self.status = -1
611 continue
613 # Combined lick times
614 if len(combined_licks) > 0:
615 lick_times_file = Path(self.session_path).joinpath('alf', 'licks.times.npy')
616 np.save(lick_times_file, sorted(np.concatenate(combined_licks)))
617 output_files.append(lick_times_file)
618 else:
619 _logger.warning('No lick times computed for this session.')
621 if plot_qc:
622 _logger.info('Creating DLC QC plot')
623 try:
624 session_id = self.one.path2eid(self.session_path)
625 fig_path = self.session_path.joinpath('snapshot', 'dlc_qc_plot.png')
626 if not fig_path.parent.exists():
627 fig_path.parent.mkdir(parents=True, exist_ok=True)
628 fig = dlc_qc_plot(self.session_path, one=self.one, cameras=self.cameras, device_collection=self.device_collection,
629 trials_collection=self.trials_collection)
630 fig.savefig(fig_path)
631 fig.clf()
632 snp = ReportSnapshot(self.session_path, session_id, one=self.one)
633 snp.outputs = [fig_path]
634 snp.register_images(widths=['orig'],
635 function=str(dlc_qc_plot.__module__) + '.' + str(dlc_qc_plot.__name__))
636 except Exception:
637 _logger.error('Could not create and/or upload DLC QC Plot')
638 _logger.error(traceback.format_exc())
639 self.status = -1
641 return output_files
644class LightningPose(base_tasks.VideoTask):
645 # TODO: make one task per cam?
646 # TODO: separate pose and motion energy
647 gpu = 1
648 io_charge = 100
649 level = 2
650 force = True
651 job_size = 'large'
652 env = 'litpose'
654 lpenv = Path.home().joinpath('Documents', 'PYTHON', 'envs', 'litpose', 'bin', 'activate')
655 scripts = Path.home().joinpath('Documents', 'PYTHON', 'iblscripts', 'deploy', 'serverpc', 'litpose')
657 @property
658 def signature(self):
659 signature = { 1g
660 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras],
661 'output_files': [(f'_ibl_{cam}Camera.lightningPose.pqt', 'alf', True) for cam in self.cameras] +
662 [(f'{cam}Camera.ROIMotionEnergy.npy', 'alf', True) for cam in self.cameras] +
663 [(f'{cam}ROIMotionEnergy.position.npy', 'alf', True) for cam in self.cameras]
664 }
666 return signature 1g
668 @staticmethod
669 def _video_intact(file_mp4):
670 """Checks that the downloaded video can be opened and is not empty"""
671 cap = cv2.VideoCapture(str(file_mp4))
672 frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
673 intact = True if frame_count > 0 else False
674 cap.release()
675 return intact
677 def _check_env(self):
678 """Check that scripts are present, env can be activated and get iblvideo version"""
679 assert len(list(self.scripts.rglob('run_litpose.*'))) == 2, \
680 f'Scripts run_litpose.sh and run_litpose.py do not exist in {self.scripts}'
681 assert self.lpenv.exists(), f"environment does not exist in assumed location {self.lpenv}"
682 command2run = f"source {self.lpenv}; python -c 'import iblvideo; print(iblvideo.__version__)'"
683 process = subprocess.Popen(
684 command2run,
685 shell=True,
686 stdout=subprocess.PIPE,
687 stderr=subprocess.PIPE,
688 executable="/bin/bash"
689 )
690 info, error = process.communicate()
691 if process.returncode != 0:
692 raise AssertionError(f"environment check failed\n{error.decode('utf-8')}")
693 version = info.decode("utf-8").strip().split('\n')[-1]
694 return version
696 def _run(self, overwrite=True, **kwargs):
698 # Gather video files
699 self.session_path = Path(self.session_path) 1g
700 mp4_files = [ 1g
701 self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.mp4') for cam in self.cameras
702 if self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.mp4').exists()
703 ]
705 labels = [label_from_path(x) for x in mp4_files] 1g
706 _logger.info(f'Running on {labels} videos') 1g
708 # Check the environment
709 self.version = self._check_env() 1g
710 _logger.info(f'iblvideo version {self.version}') 1g
712 # If all results exist and overwrite is False, skip computation
713 expected_outputs_present, expected_outputs = self.assert_expected(self.output_files, silent=True) 1g
714 if overwrite is False and expected_outputs_present is True: 1g
715 actual_outputs = expected_outputs 1g
716 return actual_outputs 1g
718 # Else, loop over videos
719 actual_outputs = []
720 for label, mp4_file in zip(labels, mp4_files):
721 # Catch exceptions so that the other cams can still run but set status to Errored
722 try:
723 # Check that the GPU is (still) accessible
724 check_nvidia_driver()
725 # Check that the video can be loaded
726 if not self._video_intact(mp4_file):
727 _logger.error(f"Corrupt raw video file {mp4_file}")
728 self.status = -1
729 continue
731 # ---------------------------
732 # Run pose estimation
733 # ---------------------------
734 t0 = time.time()
735 _logger.info(f'Running Lightning Pose on {label}Camera.')
736 command2run = f"{self.scripts.joinpath('run_litpose.sh')} {str(self.lpenv)} {mp4_file} {overwrite}"
737 _logger.info(command2run)
738 process = subprocess.Popen(
739 command2run,
740 shell=True,
741 stdout=subprocess.PIPE,
742 stderr=subprocess.PIPE,
743 executable="/bin/bash",
744 )
745 info, error = process.communicate()
746 if process.returncode != 0:
747 error_str = error.decode("utf-8").strip()
748 _logger.error(
749 f'Lightning pose failed for {label}Camera.\n\n'
750 f'++++++++ Output of subprocess for debugging ++++++++\n\n'
751 f'{error_str}\n'
752 f'++++++++++++++++++++++++++++++++++++++++++++\n'
753 )
754 self.status = -1
755 # We don't run motion energy, or add any files if LP failed to run
756 continue
757 else:
758 _logger.info(f'{label} camera took {(time.time() - t0)} seconds')
759 result = next(self.session_path.joinpath('alf').glob(f'_ibl_{label}Camera.lightningPose*.pqt'))
760 actual_outputs.append(result)
762 # ---------------------------
763 # Run motion energy
764 # ---------------------------
765 t1 = time.time()
766 _logger.info(f'Computing motion energy for {label}Camera')
767 command2run = f"{self.scripts.joinpath('run_motion.sh')} {str(self.lpenv)} {mp4_file} {result}"
768 _logger.info(command2run)
769 process = subprocess.Popen(
770 command2run,
771 shell=True,
772 stdout=subprocess.PIPE,
773 stderr=subprocess.PIPE,
774 executable='/bin/bash',
775 )
776 info, error = process.communicate()
777 if process.returncode != 0:
778 error_str = error.decode('utf-8').strip()
779 _logger.error(
780 f'Motion energy failed for {label}Camera.\n\n'
781 f'++++++++ Output of subprocess for debugging ++++++++\n\n'
782 f'{error_str}\n'
783 f'++++++++++++++++++++++++++++++++++++++++++++\n'
784 )
785 self.status = -1
786 continue
787 else:
788 _logger.info(f'{label} camera took {(time.time() - t1)} seconds')
789 actual_outputs.append(next(self.session_path.joinpath('alf').glob(
790 f'{label}Camera.ROIMotionEnergy*.npy')))
791 actual_outputs.append(next(self.session_path.joinpath('alf').glob(
792 f'{label}ROIMotionEnergy.position*.npy')))
794 except BaseException:
795 _logger.error(traceback.format_exc())
796 self.status = -1
797 continue
799 # catch here if there are no raw videos present
800 if len(actual_outputs) == 0:
801 _logger.info('Did not find any videos for this session')
802 actual_outputs = None
803 self.status = -1
805 return actual_outputs