Coverage for ibllib/pipes/video_tasks.py: 58%
226 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
1import logging
2import subprocess
3import cv2
4import traceback
5from pathlib import Path
7from ibllib.io import ffmpeg, raw_daq_loaders
8from ibllib.pipes import base_tasks
9from ibllib.io.video import get_video_meta
10from ibllib.io.extractors import camera
11from ibllib.qc.camera import run_all_qc as run_camera_qc
12from ibllib.misc import check_nvidia_driver
13from ibllib.io.video import label_from_path, assert_valid_label
15_logger = logging.getLogger('ibllib')
18class VideoRegisterRaw(base_tasks.VideoTask, base_tasks.RegisterRawDataTask):
19 """
20 Task to register raw video data. Builds up list of files to register from list of cameras given in session params file
21 """
23 priority = 100
24 job_size = 'small'
26 @property
27 def signature(self):
28 signature = { 1c
29 'input_files': [],
30 'output_files':
31 [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] +
32 [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] +
33 [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] +
34 [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] +
35 [('_iblrig_videoCodeFiles.raw*', self.device_collection, False)]
36 }
37 return signature 1c
39 def assert_expected_outputs(self, raise_error=True):
40 """
41 frameData replaces the timestamps file. Therefore if frameData is present, timestamps is
42 optional and vice versa.
43 """
44 assert self.status == 0
45 _logger.info('Checking output files')
46 everything_is_fine, files = self.assert_expected(self.output_files)
48 required = any('Camera.frameData' in x or 'Camera.timestamps' in x for x in map(str, files))
49 if not (everything_is_fine and required):
50 for out in self.outputs:
51 _logger.error(f"{out}")
52 if raise_error:
53 raise FileNotFoundError("Missing outputs after task completion")
55 return everything_is_fine, files
58class VideoCompress(base_tasks.VideoTask):
59 """
60 Task to compress raw video data from .avi to .mp4 format.
61 """
62 priority = 90
63 job_size = 'large'
65 @property
66 def signature(self):
67 signature = { 1acgh
68 'input_files': [(f'_iblrig_{cam}Camera.raw.*', self.device_collection, True) for cam in self.cameras],
69 'output_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras]
70 }
71 return signature 1acgh
73 def _run(self):
74 # TODO different compression parameters based on whether it is training or not based on number of cameras?
75 # avi to mp4 compression
76 if self.sync == 'bpod': 1acgh
77 command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 29 ' 1cg
78 '-nostats -codec:a copy {file_out}')
79 else:
80 command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 17 ' 1ah
81 '-loglevel 0 -codec:a copy {file_out}')
83 output_files = ffmpeg.iblrig_video_compression(self.session_path, command) 1acgh
85 if len(output_files) == 0: 1acgh
86 _logger.info('No compressed videos found')
87 return
89 return output_files 1acgh
92class VideoConvert(base_tasks.VideoTask):
93 """
94 Task that converts compressed avi to mp4 format and renames video and camlog files. Specific to UCLA widefield implementation
95 """
96 priority = 90
97 job_size = 'small'
99 @property
100 def signature(self):
101 signature = { 1b
102 'input_files': [(f'{cam}_cam*.avi', self.device_collection, True) for cam in self.cameras] +
103 [(f'{cam}_cam*.camlog', self.device_collection, False) for cam in self.cameras],
104 'output_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] +
105 [(f'_iblrig_{cam}Camera.raw.camlog', self.device_collection, True) for cam in self.cameras]
106 }
108 return signature 1b
110 def _run(self):
111 output_files = [] 1b
112 for cam in self.cameras: 1b
114 # rename and register the camlog files
115 camlog_file = next(self.session_path.joinpath(self.device_collection).glob(f'{cam}_cam*.camlog')) 1b
116 new_camlog_file = self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.camlog') 1b
117 camlog_file.replace(new_camlog_file) 1b
118 output_files.append(new_camlog_file) 1b
120 # convert the avi files to mp4
121 avi_file = next(self.session_path.joinpath(self.device_collection).glob(f'{cam}_cam*.avi')) 1b
122 mp4_file = self.session_path.joinpath(self.device_collection, f'_iblrig_{cam}Camera.raw.mp4') 1b
123 command2run = f"ffmpeg -i {str(avi_file)} -c:v copy -c:a copy -y {str(mp4_file)}" 1b
125 process = subprocess.Popen(command2run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 1b
126 info, error = process.communicate() 1b
127 if process.returncode == 0: 1b
128 # check the video meta matched and remove the original file
129 meta_avi = get_video_meta(avi_file) 1b
130 _ = meta_avi.pop('size') 1b
131 meta_mp4 = get_video_meta(mp4_file) 1b
132 match = True 1b
133 for key in meta_avi.keys(): 1b
134 if meta_avi[key] != meta_mp4[key]: 1b
135 match = False
137 # if all checks out we can remove the original avi
138 if match: 1b
139 avi_file.unlink() 1b
140 output_files.append(mp4_file) 1b
141 else:
142 _logger.error(f'avi and mp4 meta data do not match for {avi_file}')
143 else:
144 _logger.error(f'conversion to mp4 failed for {avi_file}: {error}')
146 return output_files 1b
149class VideoSyncQcCamlog(base_tasks.VideoTask):
150 """
151 Task to sync camera timestamps to main DAQ timestamps when camlog files are used. Specific to UCLA widefield implementation
152 """
153 priority = 40
154 job_size = 'small'
156 @property
157 def signature(self):
158 signature = { 1f
159 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] +
160 [(f'_iblrig_{cam}Camera.raw.camlog', self.device_collection, False) for cam in self.cameras] +
161 [(f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True),
162 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True),
163 (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True),
164 ('*.wiring.json', self.sync_collection, True),
165 ('*wheel.position.npy', 'alf', False),
166 ('*wheel.timestamps.npy', 'alf', False)],
167 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras]
168 }
170 return signature 1f
172 def _run(self, qc=True, **kwargs):
174 mp4_files = self.session_path.joinpath(self.device_collection).rglob('*.mp4') 1f
175 labels = [label_from_path(x) for x in mp4_files] 1f
177 # Video timestamps extraction
178 output_files = [] 1f
179 data, files = camera.extract_all(self.session_path, sync_type=self.sync, sync_collection=self.sync_collection, 1f
180 save=True, labels=labels, camlog=True)
181 output_files.extend(files) 1f
183 # Video QC
184 if qc: 1f
185 run_camera_qc(self.session_path, update=True, one=self.one, cameras=labels, camlog=True,
186 sync_collection=self.sync_collection, sync_type=self.sync)
188 return output_files 1f
191class VideoSyncQcBpod(base_tasks.VideoTask):
192 """
193 Task to sync camera timestamps to main DAQ timestamps
194 N.B Signatures only reflect new daq naming convention, non compatible with ephys when not running on server
195 """
196 priority = 40
197 job_size = 'small'
199 def __init__(self, *args, **kwargs):
200 super().__init__(*args, **kwargs) 1cijkdl
201 # Task collection (this needs to be specified in the task kwargs)
202 self.collection = self.get_task_collection(kwargs.get('collection', None)) 1cijkdl
203 # Task type (protocol)
204 self.protocol = self.get_protocol(kwargs.get('protocol', None), task_collection=self.collection) 1cijkdl
206 @property
207 def signature(self):
208 signature = { 1cd
209 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] +
210 [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] +
211 [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] +
212 [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] +
213 [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] +
214 [('_iblrig_taskData.raw.*', self.collection, True),
215 ('_iblrig_taskSettings.raw.*', self.collection, True),
216 ('*wheel.position.npy', 'alf', False),
217 ('*wheel.timestamps.npy', 'alf', False)],
218 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras]
219 }
221 return signature 1cd
223 def _run(self, **kwargs):
225 mp4_files = self.session_path.joinpath(self.device_collection).rglob('*.mp4') 1cd
226 labels = [label_from_path(x) for x in mp4_files] 1cd
228 # Video timestamps extraction
229 output_files = [] 1cd
230 data, files = camera.extract_all(self.session_path, sync_type=self.sync, sync_collection=self.sync_collection, 1cd
231 save=True, labels=labels, task_collection=self.collection)
232 output_files.extend(files) 1cd
234 # Video QC
235 run_camera_qc(self.session_path, update=True, one=self.one, cameras=labels, 1cd
236 sync_collection=self.sync_collection, sync_type=self.sync)
238 return output_files 1cd
241class VideoSyncQcNidq(base_tasks.VideoTask):
242 """
243 Task to sync camera timestamps to main DAQ timestamps
244 N.B Signatures only reflect new daq naming convention, non compatible with ephys when not running on server
245 """
246 priority = 40
247 job_size = 'small'
249 @property
250 def signature(self):
251 signature = { 1e
252 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras] +
253 [(f'_iblrig_{cam}Camera.timestamps*', self.device_collection, False) for cam in self.cameras] +
254 [(f'_iblrig_{cam}Camera.GPIO.bin', self.device_collection, False) for cam in self.cameras] +
255 [(f'_iblrig_{cam}Camera.frame_counter.bin', self.device_collection, False) for cam in self.cameras] +
256 [(f'_iblrig_{cam}Camera.frameData.bin', self.device_collection, False) for cam in self.cameras] +
257 [(f'_{self.sync_namespace}_sync.channels.npy', self.sync_collection, True),
258 (f'_{self.sync_namespace}_sync.polarities.npy', self.sync_collection, True),
259 (f'_{self.sync_namespace}_sync.times.npy', self.sync_collection, True),
260 ('*.wiring.json', self.sync_collection, True),
261 ('*wheel.position.npy', 'alf', False),
262 ('*wheel.timestamps.npy', 'alf', False),
263 ('*experiment.description*', '', False)],
264 'output_files': [(f'_ibl_{cam}Camera.times.npy', 'alf', True) for cam in self.cameras]
265 }
267 return signature 1e
269 def _run(self, **kwargs):
271 mp4_files = self.session_path.joinpath(self.device_collection).glob('*.mp4') 1e
272 labels = [label_from_path(x) for x in mp4_files] 1e
273 labels = [lab for lab in labels if lab in ('left', 'right', 'body', 'belly')] 1e
275 kwargs = {} 1e
276 if self.sync_namespace == 'timeline': 1e
277 # Load sync from timeline file
278 alf_path = self.session_path / self.sync_collection
279 kwargs['sync'], kwargs['chmap'] = raw_daq_loaders.load_timeline_sync_and_chmap(alf_path)
281 # Video timestamps extraction
282 output_files = [] 1e
283 data, files = camera.extract_all(self.session_path, sync_type=self.sync, sync_collection=self.sync_collection, 1e
284 save=True, labels=labels, **kwargs)
285 output_files.extend(files) 1e
287 # Video QC
288 run_camera_qc(self.session_path, update=True, one=self.one, cameras=labels, 1e
289 sync_collection=self.sync_collection, sync_type=self.sync)
291 return output_files 1e
294class DLC(base_tasks.VideoTask):
295 """
296 This task relies on a correctly installed dlc environment as per
297 https://docs.google.com/document/d/1g0scP6_3EmaXCU4SsDNZWwDTaD9MG0es_grLA-d0gh0/edit#
299 If your environment is set up otherwise, make sure that you set the respective attributes:
300 t = EphysDLC(session_path)
301 t.dlcenv = Path('/path/to/your/dlcenv/bin/activate')
302 t.scripts = Path('/path/to/your/iblscripts/deploy/serverpc/dlc')
303 """
304 gpu = 1
305 cpu = 4
306 io_charge = 100
307 level = 2
308 force = True
309 job_size = 'large'
311 dlcenv = Path.home().joinpath('Documents', 'PYTHON', 'envs', 'dlcenv', 'bin', 'activate')
312 scripts = Path.home().joinpath('Documents', 'PYTHON', 'iblscripts', 'deploy', 'serverpc', 'dlc')
314 @property
315 def signature(self):
316 signature = {
317 'input_files': [(f'_iblrig_{cam}Camera.raw.mp4', self.device_collection, True) for cam in self.cameras],
318 'output_files': [(f'_ibl_{cam}Camera.dlc.pqt', 'alf', True) for cam in self.cameras] +
319 [(f'{cam}Camera.ROIMotionEnergy.npy', 'alf', True) for cam in self.cameras] +
320 [(f'{cam}ROIMotionEnergy.position.npy', 'alf', True)for cam in self.cameras]
321 }
323 return signature
325 def _check_dlcenv(self):
326 """Check that scripts are present, dlcenv can be activated and get iblvideo version"""
327 assert len(list(self.scripts.rglob('run_dlc.*'))) == 2, \
328 f'Scripts run_dlc.sh and run_dlc.py do not exist in {self.scripts}'
329 assert len(list(self.scripts.rglob('run_motion.*'))) == 2, \
330 f'Scripts run_motion.sh and run_motion.py do not exist in {self.scripts}'
331 assert self.dlcenv.exists(), f"DLC environment does not exist in assumed location {self.dlcenv}"
332 command2run = f"source {self.dlcenv}; python -c 'import iblvideo; print(iblvideo.__version__)'"
333 process = subprocess.Popen(
334 command2run,
335 shell=True,
336 stdout=subprocess.PIPE,
337 stderr=subprocess.PIPE,
338 executable="/bin/bash"
339 )
340 info, error = process.communicate()
341 if process.returncode != 0:
342 raise AssertionError(f"DLC environment check failed\n{error.decode('utf-8')}")
343 version = info.decode("utf-8").strip().split('\n')[-1]
344 return version
346 @staticmethod
347 def _video_intact(file_mp4):
348 """Checks that the downloaded video can be opened and is not empty"""
349 cap = cv2.VideoCapture(str(file_mp4))
350 frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
351 intact = True if frame_count > 0 else False
352 cap.release()
353 return intact
355 def _run(self, cams=None, overwrite=False):
356 # Check that the cams are valid for DLC, remove the ones that aren't
357 candidate_cams = cams or self.cameras
358 cams = []
359 for cam in candidate_cams:
360 try:
361 cams.append(assert_valid_label(cam))
362 except ValueError:
363 _logger.warning(f'{cam} is not a valid video label, this video will be skipped')
364 # Set up
365 self.session_id = self.one.path2eid(self.session_path)
366 actual_outputs = []
368 # Loop through cams
369 for cam in cams:
370 # Catch exceptions so that following cameras can still run
371 try:
372 # If all results exist and overwrite is False, skip computation
373 expected_outputs_present, expected_outputs = self.assert_expected(self.output_files, silent=True)
374 if overwrite is False and expected_outputs_present is True:
375 actual_outputs.extend(expected_outputs)
376 return actual_outputs
377 else:
378 file_mp4 = next(self.session_path.joinpath('raw_video_data').glob(f'_iblrig_{cam}Camera.raw*.mp4'))
379 if not file_mp4.exists():
380 # In this case we set the status to Incomplete.
381 _logger.error(f"No raw video file available for {cam}, skipping.")
382 self.status = -3
383 continue
384 if not self._video_intact(file_mp4):
385 _logger.error(f"Corrupt raw video file {file_mp4}")
386 self.status = -1
387 continue
388 # Check that dlc environment is ok, shell scripts exists, and get iblvideo version, GPU addressable
389 self.version = self._check_dlcenv()
390 _logger.info(f'iblvideo version {self.version}')
391 check_nvidia_driver()
393 _logger.info(f'Running DLC on {cam}Camera.')
394 command2run = f"{self.scripts.joinpath('run_dlc.sh')} {str(self.dlcenv)} {file_mp4} {overwrite}"
395 _logger.info(command2run)
396 process = subprocess.Popen(
397 command2run,
398 shell=True,
399 stdout=subprocess.PIPE,
400 stderr=subprocess.PIPE,
401 executable="/bin/bash",
402 )
403 info, error = process.communicate()
404 # info_str = info.decode("utf-8").strip()
405 # _logger.info(info_str)
406 if process.returncode != 0:
407 error_str = error.decode("utf-8").strip()
408 _logger.error(f'DLC failed for {cam}Camera.\n\n'
409 f'++++++++ Output of subprocess for debugging ++++++++\n\n'
410 f'{error_str}\n'
411 f'++++++++++++++++++++++++++++++++++++++++++++\n')
412 self.status = -1
413 # We dont' run motion energy, or add any files if dlc failed to run
414 continue
415 dlc_result = next(self.session_path.joinpath('alf').glob(f'_ibl_{cam}Camera.dlc*.pqt'))
416 actual_outputs.append(dlc_result)
418 _logger.info(f'Computing motion energy for {cam}Camera')
419 command2run = f"{self.scripts.joinpath('run_motion.sh')} {str(self.dlcenv)} {file_mp4} {dlc_result}"
420 _logger.info(command2run)
421 process = subprocess.Popen(
422 command2run,
423 shell=True,
424 stdout=subprocess.PIPE,
425 stderr=subprocess.PIPE,
426 executable="/bin/bash",
427 )
428 info, error = process.communicate()
429 # info_str = info.decode("utf-8").strip()
430 # _logger.info(info_str)
431 if process.returncode != 0:
432 error_str = error.decode("utf-8").strip()
433 _logger.error(f'Motion energy failed for {cam}Camera.\n\n'
434 f'++++++++ Output of subprocess for debugging ++++++++\n\n'
435 f'{error_str}\n'
436 f'++++++++++++++++++++++++++++++++++++++++++++\n')
437 self.status = -1
438 continue
439 actual_outputs.append(next(self.session_path.joinpath('alf').glob(
440 f'{cam}Camera.ROIMotionEnergy*.npy')))
441 actual_outputs.append(next(self.session_path.joinpath('alf').glob(
442 f'{cam}ROIMotionEnergy.position*.npy')))
443 except BaseException:
444 _logger.error(traceback.format_exc())
445 self.status = -1
446 continue
447 # If status is Incomplete, check that there is at least one output.
448 # # Otherwise make sure it gets set to Empty (outputs = None), and set status to -1 to make sure it doesn't slip
449 if self.status == -3 and len(actual_outputs) == 0:
450 actual_outputs = None
451 self.status = -1
452 return actual_outputs