Coverage for ibllib/pipes/video_tasks.py: 45%
396 statements
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
1import logging
2import subprocess
3import time
4import traceback
5from pathlib import Path
7import cv2
8import pandas as pd
9import numpy as np
11from ibllib.qc.dlc import DlcQC
12from ibllib.io import ffmpeg, raw_daq_loaders
13from ibllib.pipes import base_tasks
14from ibllib.io.video import get_video_meta
15from ibllib.io.extractors import camera
16from ibllib.qc.camera import run_all_qc as run_camera_qc
17from ibllib.misc import check_nvidia_driver
18from ibllib.io.video import label_from_path, assert_valid_label
19from ibllib.plots.snapshot import ReportSnapshot
20from ibllib.plots.figures import dlc_qc_plot
21from brainbox.behavior.dlc import likelihood_threshold, get_licks, get_pupil_diameter, get_smooth_pupil_diameter
23_logger = logging.getLogger('ibllib')
26class VideoRegisterRaw(base_tasks.VideoTask, base_tasks.RegisterRawDataTask):
27 """
28 Task to register raw video data. Builds up list of files to register from list of cameras given in session params file
29 """
31 priority = 100
32 job_size = 'small'
34 @property
35 def signature(self):
36 signature = { 1adijklemc
37 'input_files': [],
38 'output_files':
39 [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] +
40 [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] +
41 [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] +
42 [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] +
43 [('_iblrig_videoCodeFiles.raw*', self.device_collection, False)]
44 }
45 return signature 1adijklemc
47 def assert_expected_outputs(self, raise_error=True):
48 """
49 frameData replaces the timestamps file. Therefore if frameData is present, timestamps is
50 optional and vice versa.
51 """
52 assert self.status == 0
53 _logger.info('Checking output files')
54 everything_is_fine, files = self.assert_expected(self.output_files)
56 required = any('Camera.frameData' in x or 'Camera.timestamps' in x for x in map(str, files))
57 if not (everything_is_fine and required):
58 for out in self.outputs:
59 _logger.error(f'{out}')
60 if raise_error:
61 raise FileNotFoundError('Missing outputs after task completion')
63 return everything_is_fine, files
66class VideoCompress(base_tasks.VideoTask):
67 """
68 Task to compress raw video data from .avi to .mp4 format.
69 """
70 priority = 90
71 job_size = 'large'
73 @property
74 def signature(self):
75 signature = { 1adijklempqc
76 'input_files': [(f'_iblrig_{cam}Camera.raw.*', self.device_collection, True) for cam in self.cameras],
77 'output_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras]
78 }
79 return signature 1adijklempqc
81 def _run(self):
82 # TODO different compression parameters based on whether it is training or not based on number of cameras?
83 # avi to mp4 compression
84 if self.sync == 'bpod': 1adpqc
85 command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 29 ' 1dpc
86 '-nostats -codec:a copy {file_out}')
87 else:
88 command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 17 ' 1aq
89 '-loglevel 0 -codec:a copy {file_out}')
91 output_files = ffmpeg.iblrig_video_compression(self.session_path, command) 1adpqc
93 if len(output_files) == 0: 1adpqc
94 _logger.info('No compressed videos found') 1c
95 return 1c
97 return output_files 1adpqc
100class VideoConvert(base_tasks.VideoTask):
101 """
102 Task that converts compressed avi to mp4 format and renames video and camlog files. Specific to UCLA widefield implementation
103 """
104 priority = 90
105 job_size = 'small'
107 @property
108 def signature(self):
109 signature = { 1ob
110 'input_files': [(f'{cam}_cam*.avi', self.device_collection, True) for cam in self.cameras] +
111 [(f'{cam}_cam*.camlog', self.device_collection, False) for cam in self.cameras],
112 'output_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] +
113 [(f'_iblrig_{cam}Camera.raw.camlog', self.device_collection, True) for cam in self.cameras]
114 }
116 return signature 1ob
118 def _run(self):
119 output_files = [] 1b
120 for cam in self.cameras: 1b
122 # rename and register the camlog files
123 camlog_file = next(self.session_path.joinpath(self.device_collection).glob(f'{cam}_cam*.camlog')) 1b
124 new_camlog_file = self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.camlog') 1b
125 camlog_file.replace(new_camlog_file) 1b
126 output_files.append(new_camlog_file) 1b
128 # convert the avi files to mp4
129 avi_file = next(self.session_path.joinpath(self.device_collection).glob(f'{cam}_cam*.avi')) 1b
130 mp4_file = self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.mp4') 1b
131 command2run = f'ffmpeg -i {str(avi_file)} -c:v copy -c:a copy -y {str(mp4_file)}' 1b
133 process = subprocess.Popen(command2run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 1b
134 info, error = process.communicate() 1b
135 if process.returncode == 0: 1b
136 # check the video meta matched and remove the original file
137 meta_avi = get_video_meta(avi_file) 1b
138 _ = meta_avi.pop('size') 1b
139 meta_mp4 = get_video_meta(mp4_file) 1b
140 match = True 1b
141 for key in meta_avi.keys(): 1b
142 if meta_avi[key] != meta_mp4[key]: 1b
143 match = False
145 # if all checks out we can remove the original avi
146 if match: 1b
147 avi_file.unlink() 1b
148 output_files.append(mp4_file) 1b
149 else:
150 _logger.error(f'avi and mp4 meta data do not match for {avi_file}')
151 else:
152 _logger.error(f'conversion to mp4 failed for {avi_file}: {error}')
154 return output_files 1b
157class VideoSyncQcCamlog(base_tasks.VideoTask):
158 """
159 Task to sync camera timestamps to main DAQ timestamps when camlog files are used. Specific to UCLA widefield implementation
160 """
161 priority = 40
162 job_size = 'small'
164 @property
165 def signature(self):
166 signature = { 1on
167 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] +
168 [(f'_iblrig_{cam}Camera.raw.camlog', self.device_collection, False) for cam in self.cameras] +
169 [(f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True),
170 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True),
171 (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True),
172 ('*.wiring.json', self.sync_collection, True),
173 ('*wheel.position.npy', 'alf', False),
174 ('*wheel.timestamps.npy', 'alf', False)],
175 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras]
176 }
178 return signature 1on
180 def _run(self, qc=True, **kwargs):
182 mp4_files = self.session_path.joinpath(self.device_collection).rglob('*.mp4') 1n
183 labels = [label_from_path(x) for x in mp4_files] 1n
185 # Video timestamps extraction
186 output_files = [] 1n
187 data, files = camera.extract_all(self.session_path, sync_type=self.sync, sync_collection=self.sync_collection, 1n
188 save=True, labels=labels, camlog=True)
189 output_files.extend(files) 1n
191 # Video QC
192 if qc: 1n
193 run_camera_qc(self.session_path, update=True, one=self.one, cameras=labels, camlog=True,
194 sync_collection=self.sync_collection, sync_type=self.sync)
196 return output_files 1n
199class VideoSyncQcBpod(base_tasks.VideoTask):
200 """
201 Task to sync camera timestamps to main DAQ timestamps
202 N.B Signatures only reflect new daq naming convention, non-compatible with ephys when not running on server
203 """
204 priority = 40
205 job_size = 'small'
207 def __init__(self, *args, **kwargs):
208 super().__init__(*args, **kwargs) 1dersfc
209 # Task collection (this needs to be specified in the task kwargs)
210 self.collection = self.get_task_collection(kwargs.get('collection', None)) 1dersfc
211 # Task type (protocol)
212 self.protocol = self.get_protocol(kwargs.get('protocol', None), task_collection=self.collection) 1dersfc
214 @property
215 def signature(self):
216 signature = { 1dfc
217 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] +
218 [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] +
219 [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] +
220 [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] +
221 [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] +
222 [('_iblrig_taskData.raw.*', self.collection, True),
223 ('_iblrig_taskSettings.raw.*', self.collection, True),
224 ('*wheel.position.npy', 'alf', False),
225 ('*wheel.timestamps.npy', 'alf', False)],
226 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras]
227 }
229 return signature 1dfc
231 def _run(self, **kwargs):
233 mp4_files = self.session_path.joinpath(self.device_collection).rglob('*.mp4') 1dfc
234 labels = [label_from_path(x) for x in mp4_files] 1dfc
236 # Video timestamps extraction
237 output_files = [] 1dfc
238 data, files = camera.extract_all(self.session_path, sync_type=self.sync, sync_collection=self.sync_collection, 1dfc
239 save=True, labels=labels, task_collection=self.collection)
240 output_files.extend(files) 1dfc
242 # Video QC
243 run_camera_qc(self.session_path, update=True, one=self.one, cameras=labels, 1dfc
244 sync_collection=self.sync_collection, sync_type=self.sync)
246 return output_files 1dfc
249class VideoSyncQcNidq(base_tasks.VideoTask):
250 """
251 Task to sync camera timestamps to main DAQ timestamps
252 N.B Signatures only reflect new daq naming convention, non-compatible with ephys when not running on server
253 """
254 priority = 40
255 job_size = 'small'
257 @property
258 def signature(self):
259 signature = { 1aijklemg
260 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] +
261 [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] +
262 [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] +
263 [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] +
264 [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] +
265 [(f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True),
266 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True),
267 (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True),
268 ('*.wiring.json', self.sync_collection, True),
269 ('*wheel.position.npy', 'alf', False),
270 ('*wheel.timestamps.npy', 'alf', False),
271 ('*experiment.description*', '', False)],
272 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras]
273 }
275 return signature 1aijklemg
277 def _run(self, **kwargs):
279 mp4_files = self.session_path.joinpath(self.device_collection).glob('*.mp4') 1g
280 labels = [label_from_path(x) for x in mp4_files] 1g
281 labels = [lab for lab in labels if lab in ('left', 'right', 'body', 'belly')] 1g
283 kwargs = {} 1g
284 if self.sync_namespace == 'timeline': 1g
285 # Load sync from timeline file
286 alf_path = self.session_path / self.sync_collection
287 kwargs['sync'], kwargs['chmap'] = raw_daq_loaders.load_timeline_sync_and_chmap(alf_path)
289 # Video timestamps extraction
290 output_files = [] 1g
291 data, files = camera.extract_all(self.session_path, sync_type=self.sync, sync_collection=self.sync_collection, 1g
292 save=True, labels=labels, **kwargs)
293 output_files.extend(files) 1g
295 # Video QC
296 run_camera_qc(self.session_path, update=True, one=self.one, cameras=labels, 1g
297 sync_collection=self.sync_collection, sync_type=self.sync)
299 return output_files 1g
302class DLC(base_tasks.VideoTask):
303 """
304 This task relies on a correctly installed dlc environment as per
305 https://docs.google.com/document/d/1g0scP6_3EmaXCU4SsDNZWwDTaD9MG0es_grLA-d0gh0/edit#
307 If your environment is set up otherwise, make sure that you set the respective attributes:
308 t = EphysDLC(session_path)
309 t.dlcenv = Path('/path/to/your/dlcenv/bin/activate')
310 t.scripts = Path('/path/to/your/iblscripts/deploy/serverpc/dlc')
311 """
312 gpu = 1
313 cpu = 4
314 io_charge = 100
315 level = 2
316 force = True
317 job_size = 'large'
319 dlcenv = Path.home().joinpath('Documents', 'PYTHON', 'envs', 'dlcenv', 'bin', 'activate')
320 scripts = Path.home().joinpath('Documents', 'PYTHON', 'iblscripts', 'deploy', 'serverpc', 'dlc')
322 @property
323 def signature(self):
324 signature = { 1aijklemo
325 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras],
326 'output_files': [(f'_ibl_{cam}Camera.dlc.pqt', 'alf', True) for cam in self.cameras] +
327 [(f'{cam}Camera.ROIMotionEnergy.npy', 'alf', True) for cam in self.cameras] +
328 [(f'{cam}ROIMotionEnergy.position.npy', 'alf', True)for cam in self.cameras]
329 }
331 return signature 1aijklemo
333 def _check_dlcenv(self):
334 """Check that scripts are present, dlcenv can be activated and get iblvideo version"""
335 assert len(list(self.scripts.rglob('run_dlc.*'))) == 2, \
336 f'Scripts run_dlc.sh and run_dlc.py do not exist in {self.scripts}'
337 assert len(list(self.scripts.rglob('run_motion.*'))) == 2, \
338 f'Scripts run_motion.sh and run_motion.py do not exist in {self.scripts}'
339 assert self.dlcenv.exists(), f'DLC environment does not exist in assumed location {self.dlcenv}'
340 command2run = f"source {self.dlcenv}; python -c 'import iblvideo; print(iblvideo.__version__)'"
341 process = subprocess.Popen(
342 command2run,
343 shell=True,
344 stdout=subprocess.PIPE,
345 stderr=subprocess.PIPE,
346 executable='/bin/bash'
347 )
348 info, error = process.communicate()
349 if process.returncode != 0:
350 raise AssertionError(f"DLC environment check failed\n{error.decode('utf-8')}")
351 version = info.decode('utf-8').strip().split('\n')[-1]
352 return version
354 @staticmethod
355 def _video_intact(file_mp4):
356 """Checks that the downloaded video can be opened and is not empty"""
357 cap = cv2.VideoCapture(str(file_mp4))
358 frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
359 intact = True if frame_count > 0 else False
360 cap.release()
361 return intact
363 def _run(self, cams=None, overwrite=False):
364 # Check that the cams are valid for DLC, remove the ones that aren't
365 candidate_cams = cams or self.cameras
366 cams = []
367 for cam in candidate_cams:
368 try:
369 cams.append(assert_valid_label(cam))
370 except ValueError:
371 _logger.warning(f'{cam} is not a valid video label, this video will be skipped')
372 # Set up
373 self.session_id = self.one.path2eid(self.session_path)
374 actual_outputs = []
376 # Loop through cams
377 for cam in cams:
378 # Catch exceptions so that following cameras can still run
379 try:
380 # If all results exist and overwrite is False, skip computation
381 expected_outputs_present, expected_outputs = self.assert_expected(self.output_files, silent=True)
382 if overwrite is False and expected_outputs_present is True:
383 actual_outputs.extend(expected_outputs)
384 return actual_outputs
385 else:
386 file_mp4 = next(self.session_path.joinpath('raw_video_data').glob(f'_iblrig_{cam}Camera.raw*.mp4'))
387 if not file_mp4.exists():
388 # In this case we set the status to Incomplete.
389 _logger.error(f'No raw video file available for {cam}, skipping.')
390 self.status = -3
391 continue
392 if not self._video_intact(file_mp4):
393 _logger.error(f'Corrupt raw video file {file_mp4}')
394 self.status = -1
395 continue
396 # Check that dlc environment is ok, shell scripts exists, and get iblvideo version, GPU addressable
397 self.version = self._check_dlcenv()
398 _logger.info(f'iblvideo version {self.version}')
399 check_nvidia_driver()
401 _logger.info(f'Running DLC on {cam}Camera.')
402 command2run = f"{self.scripts.joinpath('run_dlc.sh')} {str(self.dlcenv)} {file_mp4} {overwrite}"
403 _logger.info(command2run)
404 process = subprocess.Popen(
405 command2run,
406 shell=True,
407 stdout=subprocess.PIPE,
408 stderr=subprocess.PIPE,
409 executable='/bin/bash',
410 )
411 info, error = process.communicate()
412 # info_str = info.decode("utf-8").strip()
413 # _logger.info(info_str)
414 if process.returncode != 0:
415 error_str = error.decode('utf-8').strip()
416 _logger.error(f'DLC failed for {cam}Camera.\n\n'
417 f'++++++++ Output of subprocess for debugging ++++++++\n\n'
418 f'{error_str}\n'
419 f'++++++++++++++++++++++++++++++++++++++++++++\n')
420 self.status = -1
421 # We dont' run motion energy, or add any files if dlc failed to run
422 continue
423 dlc_result = next(self.session_path.joinpath('alf').glob(f'_ibl_{cam}Camera.dlc*.pqt'))
424 actual_outputs.append(dlc_result)
426 _logger.info(f'Computing motion energy for {cam}Camera')
427 command2run = f"{self.scripts.joinpath('run_motion.sh')} {str(self.dlcenv)} {file_mp4} {dlc_result}"
428 _logger.info(command2run)
429 process = subprocess.Popen(
430 command2run,
431 shell=True,
432 stdout=subprocess.PIPE,
433 stderr=subprocess.PIPE,
434 executable='/bin/bash',
435 )
436 info, error = process.communicate()
437 # info_str = info.decode('utf-8').strip()
438 # _logger.info(info_str)
439 if process.returncode != 0:
440 error_str = error.decode('utf-8').strip()
441 _logger.error(f'Motion energy failed for {cam}Camera.\n\n'
442 f'++++++++ Output of subprocess for debugging ++++++++\n\n'
443 f'{error_str}\n'
444 f'++++++++++++++++++++++++++++++++++++++++++++\n')
445 self.status = -1
446 continue
447 actual_outputs.append(next(self.session_path.joinpath('alf').glob(
448 f'{cam}Camera.ROIMotionEnergy*.npy')))
449 actual_outputs.append(next(self.session_path.joinpath('alf').glob(
450 f'{cam}ROIMotionEnergy.position*.npy')))
451 except Exception:
452 _logger.error(traceback.format_exc())
453 self.status = -1
454 continue
455 # If status is Incomplete, check that there is at least one output.
456 # # Otherwise make sure it gets set to Empty (outputs = None), and set status to -1 to make sure it doesn't slip
457 if self.status == -3 and len(actual_outputs) == 0:
458 actual_outputs = None
459 self.status = -1
460 return actual_outputs
463class EphysPostDLC(base_tasks.VideoTask):
464 """
465 The post_dlc task takes dlc traces as input and computes useful quantities, as well as qc.
466 """
467 io_charge = 90
468 level = 3
469 force = True
471 def __init__(self, *args, **kwargs):
472 super().__init__(*args, **kwargs) 1aijklemo
473 self.trials_collection = kwargs.get('trials_collection', 'alf') 1aijklemo
475 @property
476 def signature(self):
477 return {
478 'input_files': [(f'_ibl_{cam}Camera.dlc.pqt', 'alf', True) for cam in self.cameras] +
479 [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras] +
480 # the following are required for the DLC plot only
481 # they are not strictly required, some plots just might be skipped
482 # In particular the raw videos don't need to be downloaded as they can be streamed
483 [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] +
484 [(f'{cam}ROIMotionEnergy.position.npy', 'alf', False) for cam in self.cameras] +
485 # The trials table is used in the DLC QC, however this is not an essential dataset
486 [('_ibl_trials.table.pqt', self.trials_collection, False)],
487 'output_files': [(f'_ibl_{cam}Camera.features.pqt', 'alf', True) for cam in self.cameras] +
488 [('licks.times.npy', 'alf', True)]
489 }
491 def _run(self, overwrite=True, run_qc=True, plot_qc=True):
492 """
493 Run the PostDLC task. Returns a list of file locations for the output files in signature. The created plot
494 (dlc_qc_plot.png) is not returned, but saved in session_path/snapshots and uploaded to Alyx as a note.
496 :param overwrite: bool, whether to recompute existing output files (default is False).
497 Note that the dlc_qc_plot will be (re-)computed even if overwrite = False
498 :param run_qc: bool, whether to run the DLC QC (default is True)
499 :param plot_qc: book, whether to create the dlc_qc_plot (default is True)
501 """
502 # Check if output files exist locally
503 exist, output_files = self.assert_expected(self.signature['output_files'], silent=True)
504 if exist and not overwrite:
505 _logger.warning('EphysPostDLC outputs exist and overwrite=False, skipping computations of outputs.')
506 else:
507 if exist and overwrite:
508 _logger.warning('EphysPostDLC outputs exist and overwrite=True, overwriting existing outputs.')
509 # Find all available DLC files
510 dlc_files = list(Path(self.session_path).joinpath('alf').rglob('_ibl_*Camera.dlc.*'))
511 for dlc_file in dlc_files:
512 _logger.debug(dlc_file)
513 output_files = []
514 combined_licks = []
516 for dlc_file in dlc_files:
517 # Catch unforeseen exceptions and move on to next cam
518 try:
519 cam = label_from_path(dlc_file)
520 # load dlc trace and camera times
521 dlc = pd.read_parquet(dlc_file)
522 dlc_thresh = likelihood_threshold(dlc, 0.9)
523 # try to load respective camera times
524 try:
525 dlc_t = np.load(next(Path(self.session_path).joinpath('alf').rglob(f'_ibl_{cam}Camera.times.*npy')))
526 times = True
527 if dlc_t.shape[0] == 0:
528 _logger.error(f'camera.times empty for {cam} camera. '
529 f'Computations using camera.times will be skipped')
530 self.status = -1
531 times = False
532 elif dlc_t.shape[0] < len(dlc_thresh):
533 _logger.error(f'Camera times shorter than DLC traces for {cam} camera. '
534 f'Computations using camera.times will be skipped')
535 self.status = -1
536 times = 'short'
537 except StopIteration:
538 self.status = -1
539 times = False
540 _logger.error(f'No camera.times for {cam} camera. '
541 f'Computations using camera.times will be skipped')
542 # These features are only computed from left and right cam
543 if cam in ('left', 'right'):
544 features = pd.DataFrame()
545 # If camera times are available, get the lick time stamps for combined array
546 if times is True:
547 _logger.info(f'Computing lick times for {cam} camera.')
548 combined_licks.append(get_licks(dlc_thresh, dlc_t))
549 elif times is False:
550 _logger.warning(f'Skipping lick times for {cam} camera as no camera.times available')
551 elif times == 'short':
552 _logger.warning(f'Skipping lick times for {cam} camera as camera.times are too short')
553 # Compute pupil diameter, raw and smoothed
554 _logger.info(f'Computing raw pupil diameter for {cam} camera.')
555 features['pupilDiameter_raw'] = get_pupil_diameter(dlc_thresh)
556 try:
557 _logger.info(f'Computing smooth pupil diameter for {cam} camera.')
558 features['pupilDiameter_smooth'] = get_smooth_pupil_diameter(features['pupilDiameter_raw'],
559 cam)
560 except Exception:
561 _logger.error(f'Computing smooth pupil diameter for {cam} camera failed, saving all NaNs.')
562 _logger.error(traceback.format_exc())
563 features['pupilDiameter_smooth'] = np.nan
564 # Save to parquet
565 features_file = Path(self.session_path).joinpath('alf', f'_ibl_{cam}Camera.features.pqt')
566 features.to_parquet(features_file)
567 output_files.append(features_file)
569 # For all cams, compute DLC QC if times available
570 if run_qc is True and times in [True, 'short']:
571 # Setting download_data to False because at this point the data should be there
572 qc = DlcQC(self.session_path, side=cam, one=self.one, download_data=False)
573 qc.run(update=True)
574 else:
575 if times is False:
576 _logger.warning(f'Skipping QC for {cam} camera as no camera.times available')
577 if not run_qc:
578 _logger.warning(f'Skipping QC for {cam} camera as run_qc=False')
580 except Exception:
581 _logger.error(traceback.format_exc())
582 self.status = -1
583 continue
585 # Combined lick times
586 if len(combined_licks) > 0:
587 lick_times_file = Path(self.session_path).joinpath('alf', 'licks.times.npy')
588 np.save(lick_times_file, sorted(np.concatenate(combined_licks)))
589 output_files.append(lick_times_file)
590 else:
591 _logger.warning('No lick times computed for this session.')
593 if plot_qc:
594 _logger.info('Creating DLC QC plot')
595 try:
596 session_id = self.one.path2eid(self.session_path)
597 fig_path = self.session_path.joinpath('snapshot', 'dlc_qc_plot.png')
598 if not fig_path.parent.exists():
599 fig_path.parent.mkdir(parents=True, exist_ok=True)
600 fig = dlc_qc_plot(self.session_path, one=self.one, cameras=self.cameras, device_collection=self.device_collection,
601 trials_collection=self.trials_collection)
602 fig.savefig(fig_path)
603 fig.clf()
604 snp = ReportSnapshot(self.session_path, session_id, one=self.one)
605 snp.outputs = [fig_path]
606 snp.register_images(widths=['orig'],
607 function=str(dlc_qc_plot.__module__) + '.' + str(dlc_qc_plot.__name__))
608 except Exception:
609 _logger.error('Could not create and/or upload DLC QC Plot')
610 _logger.error(traceback.format_exc())
611 self.status = -1
613 return output_files
616class LightningPose(base_tasks.VideoTask):
617 # TODO: make one task per cam?
618 gpu = 1
619 io_charge = 100
620 level = 2
621 force = True
622 job_size = 'large'
624 env = Path.home().joinpath('Documents', 'PYTHON', 'envs', 'litpose', 'bin', 'activate')
625 scripts = Path.home().joinpath('Documents', 'PYTHON', 'iblscripts', 'deploy', 'serverpc', 'litpose')
627 @property
628 def signature(self):
629 signature = { 1h
630 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras],
631 'output_files': [(f'_ibl_{cam}Camera.lightningPose.pqt', 'alf', True) for cam in self.cameras]
632 }
634 return signature 1h
636 @staticmethod
637 def _video_intact(file_mp4):
638 """Checks that the downloaded video can be opened and is not empty"""
639 cap = cv2.VideoCapture(str(file_mp4))
640 frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
641 intact = True if frame_count > 0 else False
642 cap.release()
643 return intact
645 def _check_env(self):
646 """Check that scripts are present, env can be activated and get iblvideo version"""
647 assert len(list(self.scripts.rglob('run_litpose.*'))) == 2, \
648 f'Scripts run_litpose.sh and run_litpose.py do not exist in {self.scripts}'
649 assert self.env.exists(), f"environment does not exist in assumed location {self.env}"
650 command2run = f"source {self.env}; python -c 'import iblvideo; print(iblvideo.__version__)'"
651 process = subprocess.Popen(
652 command2run,
653 shell=True,
654 stdout=subprocess.PIPE,
655 stderr=subprocess.PIPE,
656 executable="/bin/bash"
657 )
658 info, error = process.communicate()
659 if process.returncode != 0:
660 raise AssertionError(f"environment check failed\n{error.decode('utf-8')}")
661 version = info.decode("utf-8").strip().split('\n')[-1]
662 return version
664 def _run(self, overwrite=True, **kwargs):
666 # Gather video files
667 self.session_path = Path(self.session_path) 1h
668 mp4_files = [ 1h
669 self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.mp4') for cam in self.cameras
670 if self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.mp4').exists()
671 ]
673 labels = [label_from_path(x) for x in mp4_files] 1h
674 _logger.info(f'Running on {labels} videos') 1h
676 # Check the environment
677 self.version = self._check_env() 1h
678 _logger.info(f'iblvideo version {self.version}') 1h
680 # If all results exist and overwrite is False, skip computation
681 expected_outputs_present, expected_outputs = self.assert_expected(self.output_files, silent=True) 1h
682 if overwrite is False and expected_outputs_present is True: 1h
683 actual_outputs = expected_outputs 1h
684 return actual_outputs 1h
686 # Else, loop over videos
687 actual_outputs = []
688 for label, mp4_file in zip(labels, mp4_files):
689 # Catch exceptions so that the other cams can still run but set status to Errored
690 try:
691 # Check that the GPU is (still) accessible
692 check_nvidia_driver()
693 # Check that the video can be loaded
694 if not self._video_intact(mp4_file):
695 _logger.error(f"Corrupt raw video file {mp4_file}")
696 self.status = -1
697 continue
698 t0 = time.time()
699 _logger.info(f'Running Lightning Pose on {label}Camera.')
700 command2run = f"{self.scripts.joinpath('run_litpose.sh')} {str(self.env)} {mp4_file} {overwrite}"
701 _logger.info(command2run)
702 process = subprocess.Popen(
703 command2run,
704 shell=True,
705 stdout=subprocess.PIPE,
706 stderr=subprocess.PIPE,
707 executable="/bin/bash",
708 )
709 info, error = process.communicate()
710 if process.returncode != 0:
711 error_str = error.decode("utf-8").strip()
712 _logger.error(f'Lightning pose failed for {label}Camera.\n\n'
713 f'++++++++ Output of subprocess for debugging ++++++++\n\n'
714 f'{error_str}\n'
715 f'++++++++++++++++++++++++++++++++++++++++++++\n')
716 self.status = -1
717 continue
718 else:
719 _logger.info(f'{label} camera took {(time.time() - t0)} seconds')
720 result = next(self.session_path.joinpath('alf').glob(f'_ibl_{label}Camera.lightningPose*.pqt'))
721 actual_outputs.append(result)
723 except BaseException:
724 _logger.error(traceback.format_exc())
725 self.status = -1
726 continue
728 return actual_outputs