Coverage for ibllib/pipes/ephys_preprocessing.py: 51%
783 statements
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
1"""(Deprecated) Electrophysiology data preprocessing tasks.
3These tasks are part of the old pipeline. This module has been replaced by the `ephys_tasks` module
4and the dynamic pipeline.
5"""
6import logging
7import re
8import shutil
9import subprocess
10from collections import OrderedDict
11import traceback
12from pathlib import Path
13import warnings
15import cv2
16import numpy as np
17import pandas as pd
18import packaging.version
20import one.alf.io as alfio
21from ibldsp.utils import rms
22import spikeglx
24from ibllib.misc import check_nvidia_driver
25from ibllib.ephys import ephysqc, spikes, sync_probes
26from ibllib.io import ffmpeg
27from ibllib.io.video import label_from_path, assert_valid_label
28from ibllib.io.extractors import ephys_fpga, ephys_passive, camera
29from ibllib.pipes import tasks, base_tasks
30import ibllib.pipes.training_preprocessing as tpp
31from ibllib.pipes.misc import create_alyx_probe_insertions
32from ibllib.qc.alignment_qc import get_aligned_channels
33from ibllib.qc.task_extractors import TaskQCExtractor
34from ibllib.qc.task_metrics import TaskQC
35from ibllib.qc.camera import run_all_qc as run_camera_qc
36from ibllib.qc.dlc import DlcQC
37from ibllib.plots.figures import dlc_qc_plot, BehaviourPlots, LfpPlots, ApPlots, BadChannelsAp
38from ibllib.plots.figures import SpikeSorting as SpikeSortingPlots
39from ibllib.plots.snapshot import ReportSnapshot
40from brainbox.behavior.dlc import likelihood_threshold, get_licks, get_pupil_diameter, get_smooth_pupil_diameter
42_logger = logging.getLogger('ibllib')
43warnings.warn('`pipes.ephys_preprocessing` to be removed in favour of dynamic pipeline', FutureWarning)
46# level 0
47class EphysPulses(tasks.Task):
48 """
49 Extract Pulses from raw electrophysiology data into numpy arrays
50 Perform the probes synchronisation with nidq (3B) or main probe (3A)
51 """
52 cpu = 2
53 io_charge = 30 # this jobs reads raw ap files
54 priority = 90 # a lot of jobs depend on this one
55 level = 0 # this job doesn't depend on anything
56 force = False # whether to force download of missing data on local server if outputs already exist
57 signature = {
58 'input_files': [('*ap.meta', 'raw_ephys_data/probe*', True),
59 ('*ap.ch', 'raw_ephys_data/probe*', False), # not necessary when we have .bin file
60 ('*ap.*bin', 'raw_ephys_data/probe*', True),
61 ('*nidq.meta', 'raw_ephys_data', True),
62 ('*nidq.ch', 'raw_ephys_data', False), # not necessary when we have .bin file
63 ('*nidq.*bin', 'raw_ephys_data', True)],
64 'output_files': [('_spikeglx_sync*.npy', 'raw_ephys_data*', True),
65 ('_spikeglx_sync.polarities*.npy', 'raw_ephys_data*', True),
66 ('_spikeglx_sync.times*.npy', 'raw_ephys_data*', True)]
67 }
69 def get_signatures(self, **kwargs):
70 """
71 Find the input and output signatures specific for local filesystem
72 :return:
73 """
74 neuropixel_version = spikeglx.get_neuropixel_version_from_folder(self.session_path) 1i
75 probes = spikeglx.get_probes_from_folder(self.session_path) 1i
77 full_input_files = [] 1i
78 for sig in self.signature['input_files']: 1i
79 if 'raw_ephys_data/probe*' in sig[1]: 1i
80 for probe in probes: 1i
81 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1i
82 else:
83 if neuropixel_version != '3A': 1i
84 full_input_files.append((sig[0], sig[1], sig[2])) 1i
86 self.input_files = full_input_files 1i
88 full_output_files = [] 1i
89 for sig in self.signature['output_files']: 1i
90 if neuropixel_version != '3A': 1i
91 full_output_files.append((sig[0], 'raw_ephys_data', sig[2])) 1i
92 for probe in probes: 1i
93 full_output_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1i
95 self.output_files = full_output_files 1i
97 def _run(self, overwrite=False):
98 # outputs numpy
99 syncs, out_files = ephys_fpga.extract_sync(self.session_path, overwrite=overwrite)
100 for out_file in out_files:
101 _logger.info(f"extracted pulses for {out_file}")
103 status, sync_files = sync_probes.sync(self.session_path)
104 return out_files + sync_files
107class RawEphysQC(tasks.Task):
108 """
109 Computes raw electrophysiology QC
110 """
111 cpu = 2
112 io_charge = 30 # this jobs reads raw ap files
113 priority = 10 # a lot of jobs depend on this one
114 level = 0 # this job doesn't depend on anything
115 force = False
116 signature = {
117 'input_files': [('*ap.meta', 'raw_ephys_data/probe*', True),
118 ('*lf.meta', 'raw_ephys_data/probe*', True), # not necessary to run task as optional computation
119 ('*lf.ch', 'raw_ephys_data/probe*', False), # not required it .bin file
120 ('*lf.*bin', 'raw_ephys_data/probe*', True)], # not necessary to run task as optional computation
121 'output_files': [('_iblqc_ephysChannels.apRMS.npy', 'raw_ephys_data/probe*', True),
122 ('_iblqc_ephysChannels.rawSpikeRates.npy', 'raw_ephys_data/probe*', True),
123 ('_iblqc_ephysChannels.labels.npy', 'raw_ephys_data/probe*', True),
124 ('_iblqc_ephysSpectralDensityLF.freqs.npy', 'raw_ephys_data/probe*', True),
125 ('_iblqc_ephysSpectralDensityLF.power.npy', 'raw_ephys_data/probe*', True),
126 ('_iblqc_ephysSpectralDensityAP.freqs.npy', 'raw_ephys_data/probe*', True),
127 ('_iblqc_ephysSpectralDensityAP.power.npy', 'raw_ephys_data/probe*', True),
128 ('_iblqc_ephysTimeRmsLF.rms.npy', 'raw_ephys_data/probe*', True),
129 ('_iblqc_ephysTimeRmsLF.timestamps.npy', 'raw_ephys_data/probe*', True)]
130 }
132 def _run(self, overwrite=False):
133 eid = self.one.path2eid(self.session_path)
134 probes = [(x['id'], x['name']) for x in self.one.alyx.rest('insertions', 'list', session=eid)]
135 # Usually there should be two probes, if there are less, check if all probes are registered
136 if len(probes) < 2:
137 _logger.warning(f"{len(probes)} probes registered for session {eid}, trying to register from local data")
138 probes = [(p['id'], p['name']) for p in create_alyx_probe_insertions(self.session_path, one=self.one)]
139 qc_files = []
140 for pid, pname in probes:
141 _logger.info(f"\nRunning QC for probe insertion {pname}")
142 try:
143 eqc = ephysqc.EphysQC(pid, session_path=self.session_path, one=self.one)
144 qc_files.extend(eqc.run(update=True, overwrite=overwrite))
145 _logger.info("Creating LFP QC plots")
146 plot_task = LfpPlots(pid, session_path=self.session_path, one=self.one)
147 _ = plot_task.run()
148 self.plot_tasks.append(plot_task)
149 plot_task = BadChannelsAp(pid, session_path=self.session_path, one=self.one)
150 _ = plot_task.run()
151 self.plot_tasks.append(plot_task)
153 except AssertionError:
154 _logger.error(traceback.format_exc())
155 self.status = -1
156 continue
157 return qc_files
159 def get_signatures(self, **kwargs):
160 probes = spikeglx.get_probes_from_folder(self.session_path) 1e
162 full_output_files = [] 1e
163 for sig in self.signature['output_files']: 1e
164 if 'raw_ephys_data/probe*' in sig[1]: 1e
165 for probe in probes: 1e
166 full_output_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1e
168 self.output_files = full_output_files 1e
170 # input lf signature required or not required status is going to depend on the output we have, need to be agile here to
171 # avoid unnecessary downloading of lf.cbin files
172 expected_count = 0 1e
173 count = 0 1e
174 # check to see if we have lfp qc datasets
175 for expected_file in full_output_files: 1e
176 if 'LF' in expected_file[0]: 1e
177 expected_count += 1 1e
178 actual_files = list(Path(self.session_path).rglob(str(Path(expected_file[1]).joinpath(expected_file[0])))) 1e
179 if len(actual_files) == 1: 1e
180 count += 1 1e
182 lf_required = False if count == expected_count else True 1e
184 full_input_files = [] 1e
185 for sig in self.signature['input_files']: 1e
186 if 'raw_ephys_data/probe*' in sig[1]: 1e
187 for probe in probes: 1e
188 if 'lf' in sig[0]: 1e
189 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', lf_required if sig[2] else sig[2])) 1e
190 else:
191 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1e
193 self.input_files = full_input_files 1e
196class EphysAudio(tasks.Task):
197 """
198 Compresses the microphone wav file in a lossless flac file
199 """
200 # TESTS DONE
201 cpu = 2
202 priority = 10 # a lot of jobs depend on this one
203 level = 0 # this job doesn't depend on anything
204 force = False
205 signature = {
206 'input_files': [('_iblrig_micData.raw.wav', 'raw_behavior_data', True)],
207 'output_files': [('_iblrig_micData.raw.flac', 'raw_behavior_data', True)],
208 }
210 def _run(self, overwrite=False):
211 command = "ffmpeg -i {file_in} -y -nostdin -c:a flac -nostats {file_out}" 1b
212 file_in = next(self.session_path.rglob("_iblrig_micData.raw.wav"), None) 1b
213 if file_in is None: 1b
214 return
215 file_out = file_in.with_suffix(".flac") 1b
216 status, output_file = ffmpeg.compress(file_in=file_in, file_out=file_out, command=command) 1b
217 return [output_file] 1b
220class SpikeSorting(tasks.Task):
221 """
222 (DEPRECATED) Pykilosort 2.5 pipeline
223 """
224 gpu = 1
225 io_charge = 100 # this jobs reads raw ap files
226 priority = 60
227 level = 1 # this job doesn't depend on anything
228 force = True
229 job_size = 'large'
231 SHELL_SCRIPT = Path.home().joinpath(
232 "Documents/PYTHON/iblscripts/deploy/serverpc/kilosort2/run_pykilosort.sh"
233 )
234 SPIKE_SORTER_NAME = 'pykilosort'
235 PYKILOSORT_REPO = Path.home().joinpath('Documents/PYTHON/SPIKE_SORTING/pykilosort')
236 signature = {
237 'input_files': [], # see setUp method for declaration of inputs
238 'output_files': [] # see setUp method for declaration of inputs
239 }
241 def __init__(self, *args, **kwargs):
242 warnings.warn('`pipes.ephys_preprocessing.SpikeSorting` to be removed ' 1d
243 'in favour of `pipes.ephys_tasks.SpikeSorting`',
244 FutureWarning)
245 super().__init__(*args, **kwargs) 1d
247 @staticmethod
248 def spike_sorting_signature(pname=None):
249 pname = pname if pname is not None else "probe*" 1d
250 input_signature = [('*ap.meta', f'raw_ephys_data/{pname}', True), 1d
251 ('*ap.ch', f'raw_ephys_data/{pname}', True),
252 ('*ap.cbin', f'raw_ephys_data/{pname}', True),
253 ('*nidq.meta', 'raw_ephys_data', False),
254 ('_spikeglx_sync.channels.*', 'raw_ephys_data*', True),
255 ('_spikeglx_sync.polarities.*', 'raw_ephys_data*', True),
256 ('_spikeglx_sync.times.*', 'raw_ephys_data*', True),
257 ('_iblrig_taskData.raw.*', 'raw_behavior_data', True),
258 ('_iblrig_taskSettings.raw.*', 'raw_behavior_data', True)]
259 output_signature = [('spike_sorting_pykilosort.log', f'spike_sorters/pykilosort/{pname}', True), 1d
260 ('_iblqc_ephysTimeRmsAP.rms.npy', f'raw_ephys_data/{pname}', True), # new ibllib 2.5
261 ('_iblqc_ephysTimeRmsAP.timestamps.npy', f'raw_ephys_data/{pname}', True)] # new ibllib 2.5
262 return input_signature, output_signature 1d
264 @staticmethod
265 def _sample2v(ap_file):
266 md = spikeglx.read_meta_data(ap_file.with_suffix(".meta"))
267 s2v = spikeglx._conversion_sample2v_from_meta(md)
268 return s2v["ap"][0]
270 @staticmethod
271 def _fetch_pykilosort_version(repo_path):
272 init_file = Path(repo_path).joinpath('pykilosort', '__init__.py')
273 version = SpikeSorting._fetch_ks2_commit_hash(repo_path) # default
274 try:
275 with open(init_file) as fid:
276 lines = fid.readlines()
277 for line in lines:
278 if line.startswith("__version__ = "):
279 version = line.split('=')[-1].strip().replace('"', '').replace("'", '')
280 except Exception:
281 pass
282 return f"pykilosort_{version}"
284 @staticmethod
285 def _fetch_pykilosort_run_version(log_file):
286 """
287 Parse the following line (2 formats depending on version) from the log files to get the version
288 '\x1b[0m15:39:37.919 [I] ibl:90 Starting Pykilosort version ibl_1.2.1, output in gnagga^[[0m\n'
289 '\x1b[0m15:39:37.919 [I] ibl:90 Starting Pykilosort version ibl_1.3.0^[[0m\n'
290 """
291 with open(log_file) as fid:
292 line = fid.readline()
293 version = re.search('version (.*), output', line)
294 version = version or re.search('version (.*)', line) # old versions have output, new have a version line
295 version = re.sub('\\^[[0-9]+m', '', version.group(1)) # removes the coloring tags
296 return f"pykilosort_{version}"
298 @staticmethod
299 def _fetch_ks2_commit_hash(repo_path):
300 command2run = f"git --git-dir {repo_path}/.git rev-parse --verify HEAD"
301 process = subprocess.Popen(
302 command2run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
303 )
304 info, error = process.communicate()
305 if process.returncode != 0:
306 _logger.error(
307 f"Can't fetch pykilsort commit hash, will still attempt to run \n"
308 f"Error: {error.decode('utf-8')}"
309 )
310 return ""
311 return info.decode("utf-8").strip()
313 @staticmethod
314 def parse_version(v) -> packaging.version.Version:
315 """
316 Extracts and parses semantic version (major.minor.patch) from a version string.
318 Parameters
319 ----------
320 v : str
321 A version string containing a semantic version.
323 Returns
324 -------
325 packaging.version.Version
326 The parsed semantic version number
328 Examples
329 --------
330 >>> SpikeSorting.parse_version('ibl_1.2')
331 <Version('1.2')>
332 >>> SpikeSorting.parse_version('pykilosort_ibl_1.2.0-new')
333 <Version('1.2.0')>
334 >>> SpikeSorting.parse_version('ibl_0.2') < SpikeSorting.parse_version('pykilosort_v1')
335 True
336 """
337 m = re.search(r'(\d+\.?){1,3}', v) 1n
338 if not m: 1n
339 raise packaging.version.InvalidVersion(f'Failed to detect SemVer in "{v}"') 1n
340 return packaging.version.parse(m.group()) 1n
342 def setUp(self, probes=None):
343 """
344 Overwrite setup method to allow inputs and outputs to be only one probe
345 :param probes: list of probes e.g ['probe00']
346 :return:
347 """
348 if not probes or len(probes) == 2:
349 self.signature['input_files'], self.signature['output_files'] = self.spike_sorting_signature()
350 else:
351 self.signature['input_files'], self.signature['output_files'] = self.spike_sorting_signature(probes[0])
353 return super().setUp(probes=probes)
355 def _run_pykilosort(self, ap_file):
356 """
357 Runs the ks2 matlab spike sorting for one probe dataset
358 the raw spike sorting output is in session_path/spike_sorters/{self.SPIKE_SORTER_NAME}/probeXX folder
359 (discontinued support for old spike sortings in the probe folder <1.5.5)
360 :return: path of the folder containing ks2 spike sorting output
361 """
362 self.version = self._fetch_pykilosort_version(self.PYKILOSORT_REPO)
363 label = ap_file.parts[-2] # this is usually the probe name
364 sorter_dir = self.session_path.joinpath("spike_sorters", self.SPIKE_SORTER_NAME, label)
365 self.FORCE_RERUN = False
366 if not self.FORCE_RERUN:
367 log_file = sorter_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log")
368 if log_file.exists():
369 run_version = self._fetch_pykilosort_run_version(log_file)
370 if self.parse_version(run_version) > self.parse_version('pykilosort_ibl_1.1.0'):
371 _logger.info(f"Already ran: spike_sorting_{self.SPIKE_SORTER_NAME}.log"
372 f" found in {sorter_dir}, skipping.")
373 return sorter_dir
374 else:
375 self.FORCE_RERUN = True
377 print(sorter_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log"))
378 # get the scratch drive from the shell script
379 with open(self.SHELL_SCRIPT) as fid:
380 lines = fid.readlines()
381 line = [line for line in lines if line.startswith("SCRATCH_DRIVE=")][0]
382 m = re.search(r"\=(.*?)(\#|\n)", line)[0]
383 scratch_drive = Path(m[1:-1].strip())
384 assert scratch_drive.exists()
386 # clean up and create directory, this also checks write permissions
387 # temp_dir has the following shape: pykilosort/ZM_3003_2020-07-29_001_probe00
388 # first makes sure the tmp dir is clean
389 shutil.rmtree(scratch_drive.joinpath(self.SPIKE_SORTER_NAME), ignore_errors=True)
390 temp_dir = scratch_drive.joinpath(
391 self.SPIKE_SORTER_NAME, "_".join(list(self.session_path.parts[-3:]) + [label])
392 )
393 if temp_dir.exists(): # hmmm this has to be decided, we may want to restart ?
394 # But failed sessions may then clog the scratch dir and have users run out of space
395 shutil.rmtree(temp_dir, ignore_errors=True)
396 temp_dir.mkdir(parents=True, exist_ok=True)
398 check_nvidia_driver()
399 command2run = f"{self.SHELL_SCRIPT} {ap_file} {temp_dir}"
400 _logger.info(command2run)
401 process = subprocess.Popen(
402 command2run,
403 shell=True,
404 stdout=subprocess.PIPE,
405 stderr=subprocess.PIPE,
406 executable="/bin/bash",
407 )
408 info, error = process.communicate()
409 info_str = info.decode("utf-8").strip()
410 _logger.info(info_str)
411 if process.returncode != 0:
412 error_str = error.decode("utf-8").strip()
413 # try and get the kilosort log if any
414 for log_file in temp_dir.rglob('*_kilosort.log'):
415 with open(log_file) as fid:
416 log = fid.read()
417 _logger.error(log)
418 break
419 raise RuntimeError(f"{self.SPIKE_SORTER_NAME} {info_str}, {error_str}")
421 shutil.copytree(temp_dir.joinpath('output'), sorter_dir, dirs_exist_ok=True)
422 shutil.rmtree(temp_dir, ignore_errors=True)
423 return sorter_dir
425 def _run(self, probes=None):
426 """
427 Multiple steps. For each probe:
428 - Runs ks2 (skips if it already ran)
429 - synchronize the spike sorting
430 - output the probe description files
431 :param probes: (list of str) if provided, will only run spike sorting for specified probe names
432 :return: list of files to be registered on database
433 """
434 efiles = spikeglx.glob_ephys_files(self.session_path)
435 ap_files = [(ef.get("ap"), ef.get("label")) for ef in efiles if "ap" in ef.keys()]
436 out_files = []
437 for ap_file, label in ap_files:
438 if isinstance(probes, list) and label not in probes:
439 continue
440 try:
441 # if the file is part of a sequence, handles the run accordingly
442 sequence_file = ap_file.parent.joinpath(ap_file.stem.replace('ap', 'sequence.json'))
443 # temporary just skips for now
444 if sequence_file.exists():
445 continue
446 ks2_dir = self._run_pykilosort(ap_file) # runs ks2, skips if it already ran
447 probe_out_path = self.session_path.joinpath("alf", label, self.SPIKE_SORTER_NAME)
448 shutil.rmtree(probe_out_path, ignore_errors=True)
449 probe_out_path.mkdir(parents=True, exist_ok=True)
450 spikes.ks2_to_alf(
451 ks2_dir,
452 bin_path=ap_file.parent,
453 out_path=probe_out_path,
454 bin_file=ap_file,
455 ampfactor=self._sample2v(ap_file),
456 )
457 logfile = ks2_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log")
458 if logfile.exists():
459 shutil.copyfile(logfile, probe_out_path.joinpath(
460 f"_ibl_log.info_{self.SPIKE_SORTER_NAME}.log"))
461 out, _ = spikes.sync_spike_sorting(ap_file=ap_file, out_path=probe_out_path)
462 out_files.extend(out)
463 # convert ks2_output into tar file and also register
464 # Make this in case spike sorting is in old raw_ephys_data folders, for new
465 # sessions it should already exist
466 tar_dir = self.session_path.joinpath(
467 'spike_sorters', self.SPIKE_SORTER_NAME, label)
468 tar_dir.mkdir(parents=True, exist_ok=True)
469 out = spikes.ks2_to_tar(ks2_dir, tar_dir, force=self.FORCE_RERUN)
470 out_files.extend(out)
472 if self.one:
473 eid = self.one.path2eid(self.session_path, query_type='remote')
474 ins = self.one.alyx.rest('insertions', 'list', session=eid, name=label, query_type='remote')
475 if len(ins) != 0:
476 _logger.info("Creating SpikeSorting QC plots")
477 plot_task = ApPlots(ins[0]['id'], session_path=self.session_path, one=self.one)
478 _ = plot_task.run()
479 self.plot_tasks.append(plot_task)
481 plot_task = SpikeSortingPlots(ins[0]['id'], session_path=self.session_path, one=self.one)
482 _ = plot_task.run(collection=str(probe_out_path.relative_to(self.session_path)))
483 self.plot_tasks.append(plot_task)
485 resolved = ins[0].get('json', {'temp': 0}).get('extended_qc', {'temp': 0}). \
486 get('alignment_resolved', False)
487 if resolved:
488 chns = np.load(probe_out_path.joinpath('channels.localCoordinates.npy'))
489 out = get_aligned_channels(ins[0], chns, one=self.one, save_dir=probe_out_path)
490 out_files.extend(out)
492 except Exception:
493 _logger.error(traceback.format_exc())
494 self.status = -1
495 continue
496 probe_files = spikes.probes_description(self.session_path, one=self.one)
497 return out_files + probe_files
499 def get_signatures(self, probes=None, **kwargs):
500 """
501 This transforms all wildcards in collection to exact match
502 :param probes:
503 :return:
504 """
506 neuropixel_version = spikeglx.get_neuropixel_version_from_folder(self.session_path) 1d
507 probes = probes or spikeglx.get_probes_from_folder(self.session_path) 1d
509 full_input_files = [] 1d
510 for sig in self.signature['input_files']: 1d
511 if 'raw_ephys_data*' in sig[1]: 1d
512 if neuropixel_version != '3A': 1d
513 full_input_files.append((sig[0], 'raw_ephys_data', sig[2])) 1d
514 for probe in probes: 1d
515 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1d
517 elif 'raw_ephys_data/probe*' in sig[1]: 1d
518 for probe in probes: 1d
519 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1d
520 elif 'raw_ephys_data' in sig[1]: 1d
521 if neuropixel_version != '3A': 1d
522 full_input_files.append((sig[0], 'raw_ephys_data', sig[2])) 1d
523 else:
524 full_input_files.append(sig) 1d
526 self.input_files = full_input_files 1d
528 full_output_files = [] 1d
529 for sig in self.signature['output_files']: 1d
530 if 'probe*' in sig[1]: 1d
531 for probe in probes: 1d
532 col = sig[1].split('/')[:-1] + [probe] 1d
533 full_output_files.append((sig[0], '/'.join(col), sig[2])) 1d
534 else:
535 full_input_files.append(sig)
537 self.output_files = full_output_files 1d
540class EphysVideoCompress(tasks.Task):
541 priority = 90
542 level = 0
543 force = False
544 job_size = 'large'
545 io_charge = 100
547 signature = {
548 'input_files': [('_iblrig_*Camera.raw.*', 'raw_video_data', True)],
549 'output_files': [('_iblrig_*Camera.raw.mp4', 'raw_video_data', True)]
550 }
552 def _run(self, **kwargs):
553 # avi to mp4 compression
554 command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 17 ' 1cb
555 '-loglevel 0 -codec:a copy {file_out}')
556 output_files = ffmpeg.iblrig_video_compression(self.session_path, command) 1cb
558 if len(output_files) == 0: 1cb
559 _logger.info('No compressed videos found')
560 return
562 return output_files 1cb
564 def get_signatures(self, **kwargs):
565 # need to detect the number of cameras
566 output_files = Path(self.session_path).joinpath('raw_video_data').glob('*') 1cjb
567 labels = {label_from_path(x) for x in output_files} 1cjb
569 full_input_files = [] 1cjb
570 for sig in self.signature['input_files']: 1cjb
571 for label in labels: 1cjb
572 full_input_files.append((sig[0].replace('*Camera', f'{label}Camera'), sig[1], sig[2])) 1cjb
574 self.input_files = full_input_files 1cjb
576 full_output_files = [] 1cjb
577 for sig in self.signature['output_files']: 1cjb
578 for label in labels: 1cjb
579 full_output_files.append((sig[0].replace('*Camera', f'{label}Camera'), sig[1], sig[2])) 1cjb
581 self.output_files = full_output_files 1cjb
584class EphysVideoSyncQc(tasks.Task):
585 priority = 40
586 level = 2
587 force = True
588 signature = {
589 'input_files': [('_iblrig_*Camera.raw.mp4', 'raw_video_data', True),
590 ('_iblrig_*Camera.timestamps.ssv', 'raw_video_data', False),
591 ('_iblrig_*Camera.timestamps.npy', 'raw_video_data', False),
592 ('_iblrig_*Camera.frameData.bin', 'raw_video_data', False),
593 ('_iblrig_*Camera.GPIO.bin', 'raw_video_data', False),
594 ('_iblrig_*Camera.frame_counter.bin', 'raw_video_data', False),
595 ('_iblrig_taskData.raw.*', 'raw_behavior_data', True),
596 ('_iblrig_taskSettings.raw.*', 'raw_behavior_data', True),
597 ('_spikeglx_sync.channels.*', 'raw_ephys_data*', True),
598 ('_spikeglx_sync.polarities.*', 'raw_ephys_data*', True),
599 ('_spikeglx_sync.times.*', 'raw_ephys_data*', True),
600 ('*wheel.position.npy', 'alf', False),
601 ('*wheel.timestamps.npy', 'alf', False),
602 ('*wiring.json', 'raw_ephys_data*', False),
603 ('*.meta', 'raw_ephys_data*', True)],
605 'output_files': [('_ibl_*Camera.times.npy', 'alf', True)]
606 }
608 def _run(self, **kwargs):
610 mp4_files = self.session_path.joinpath('raw_video_data').rglob('*.mp4') 1cb
611 labels = [label_from_path(x) for x in mp4_files] 1cb
612 # Video timestamps extraction
613 output_files = [] 1cb
614 data, files = camera.extract_all(self.session_path, save=True, labels=labels) 1cb
615 output_files.extend(files) 1cb
617 # Video QC
618 run_camera_qc(self.session_path, update=True, one=self.one, cameras=labels) 1cb
620 return output_files 1cb
622 def get_signatures(self, **kwargs):
623 neuropixel_version = spikeglx.get_neuropixel_version_from_folder(self.session_path) 1cgb
624 probes = spikeglx.get_probes_from_folder(self.session_path) 1cgb
625 # need to detect the number of cameras
626 output_files = Path(self.session_path).joinpath('raw_video_data').rglob('*') 1cgb
627 labels = np.unique([label_from_path(x) for x in output_files]) 1cgb
629 full_input_files = [] 1cgb
630 for sig in self.signature['input_files']: 1cgb
631 if 'raw_ephys_data*' in sig[1]: 1cgb
632 if neuropixel_version != '3A': 1cgb
633 full_input_files.append((sig[0], 'raw_ephys_data', sig[2])) 1cg
634 for probe in probes: 1cgb
635 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1g
636 elif 'Camera' in sig[0]: 1cgb
637 for lab in labels: 1cgb
638 full_input_files.append((sig[0].replace('*Camera', f'{lab}Camera'), sig[1], sig[2])) 1cgb
639 else:
640 full_input_files.append((sig[0], sig[1], sig[2])) 1cgb
642 self.input_files = full_input_files 1cgb
644 full_output_files = [] 1cgb
645 for sig in self.signature['output_files']: 1cgb
646 if 'raw_ephys_data*' in sig[1]: 1cgb
647 if neuropixel_version != '3A':
648 full_output_files.append((sig[0], 'raw_ephys_data', sig[2]))
649 for probe in probes:
650 full_output_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2]))
651 elif 'Camera' in sig[0]: 1cgb
652 for lab in labels: 1cgb
653 full_output_files.append((sig[0].replace('*Camera', f'{lab}Camera'), sig[1], sig[2])) 1cgb
654 else:
655 full_output_files.append((sig[0], sig[1], sig[2]))
657 self.output_files = full_output_files 1cgb
660# level 1
661class EphysTrials(tasks.Task):
662 priority = 90
663 level = 1
664 force = False
665 signature = {
666 'input_files': [('_iblrig_taskData.raw.*', 'raw_behavior_data', True),
667 ('_iblrig_taskSettings.raw.*', 'raw_behavior_data', True),
668 ('_spikeglx_sync.channels.*', 'raw_ephys_data*', True),
669 ('_spikeglx_sync.polarities.*', 'raw_ephys_data*', True),
670 ('_spikeglx_sync.times.*', 'raw_ephys_data*', True),
671 ('_iblrig_encoderEvents.raw*', 'raw_behavior_data', True),
672 ('_iblrig_encoderPositions.raw*', 'raw_behavior_data', True),
673 ('*wiring.json', 'raw_ephys_data*', False),
674 ('*.meta', 'raw_ephys_data*', True)],
675 'output_files': [('*trials.table.pqt', 'alf', True),
676 ('*trials.goCueTrigger_times.npy', 'alf', True),
677 ('*trials.intervals_bpod.npy', 'alf', True),
678 ('*trials.stimOff_times.npy', 'alf', True),
679 ('*trials.quiescencePeriod.npy', 'alf', True),
680 ('*wheel.position.npy', 'alf', True),
681 ('*wheel.timestamps.npy', 'alf', True),
682 ('*wheelMoves.intervals.npy', 'alf', True),
683 ('*wheelMoves.peakAmplitude.npy', 'alf', True)]
684 }
686 def _behaviour_criterion(self):
687 """
688 Computes and update the behaviour criterion on Alyx
689 """
690 from brainbox.behavior import training
692 trials = alfio.load_object(self.session_path.joinpath("alf"), "trials")
693 good_enough = training.criterion_delay(
694 n_trials=trials["intervals"].shape[0],
695 perf_easy=training.compute_performance_easy(trials),
696 )
697 eid = self.one.path2eid(self.session_path, query_type='remote')
698 self.one.alyx.json_field_update(
699 "sessions", eid, "extended_qc", {"behavior": int(good_enough)}
700 )
702 def extract_behaviour(self, save=True):
703 dsets, out_files, self.extractor = ephys_fpga.extract_all( 1f
704 self.session_path, save=save, return_extractor=True)
706 return dsets, out_files 1f
708 def run_qc(self, trials_data=None, update=True, plot_qc=False):
709 if trials_data is None:
710 trials_data, _ = self.extract_behaviour(save=False)
711 if not trials_data:
712 raise ValueError('No trials data found')
714 # Run the task QC
715 qc = TaskQC(self.session_path, one=self.one, log=_logger)
716 qc.extractor = TaskQCExtractor(self.session_path, lazy=True, one=qc.one)
717 # Extract extra datasets required for QC
718 qc.extractor.data = qc.extractor.rename_data(trials_data)
719 wheel_ts_bpod = self.extractor.bpod2fpga(self.extractor.bpod_trials['wheel_timestamps'])
720 qc.extractor.data['wheel_timestamps_bpod'] = wheel_ts_bpod
721 qc.extractor.data['wheel_position_bpod'] = self.extractor.bpod_trials['wheel_position']
722 qc.extractor.wheel_encoding = 'X4'
723 qc.extractor.settings = self.extractor.settings
724 qc.extractor.frame_ttls = self.extractor.frame2ttl
725 qc.extractor.audio_ttls = self.extractor.audio
726 qc.extractor.bpod_ttls = self.extractor.bpod
728 # Aggregate and update Alyx QC fields
729 qc.run(update=update)
731 if plot_qc:
732 _logger.info("Creating Trials QC plots")
733 try:
734 session_id = self.one.path2eid(self.session_path)
735 plot_task = BehaviourPlots(session_id, self.session_path, one=self.one)
736 _ = plot_task.run()
737 self.plot_tasks.append(plot_task)
738 except Exception:
739 _logger.error('Could not create Trials QC Plot')
740 _logger.error(traceback.format_exc())
741 self.status = -1
742 return qc
744 def _run(self, plot_qc=True):
745 dsets, out_files = self.extract_behaviour() 1f
747 if self.one and not self.one.offline: 1f
748 self._behaviour_criterion()
749 self.run_qc(trials_data=dsets, update=True, plot_qc=plot_qc)
750 return out_files 1f
752 def get_signatures(self, **kwargs):
753 neuropixel_version = spikeglx.get_neuropixel_version_from_folder(self.session_path) 1kf
754 probes = spikeglx.get_probes_from_folder(self.session_path) 1kf
756 full_input_files = [] 1kf
757 for sig in self.signature['input_files']: 1kf
758 if 'raw_ephys_data*' in sig[1]: 1kf
759 if neuropixel_version != '3A': 1kf
760 full_input_files.append((sig[0], 'raw_ephys_data', sig[2])) 1kf
761 for probe in probes: 1kf
762 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1kf
763 else:
764 full_input_files.append(sig) 1kf
766 self.input_files = full_input_files 1kf
768 self.output_files = self.signature['output_files'] 1kf
771class LaserTrialsLegacy(EphysTrials):
772 """This is the legacy extractor for Guido's ephys optogenetic stimulation protocol.
774 This is legacy because personal project extractors should be in a separate repository.
775 """
776 def extract_behaviour(self):
777 dsets, out_files = super().extract_behaviour() 1f
779 # Re-extract the laser datasets as the above default extractor discards them
780 from ibllib.io.extractors import opto_trials 1f
781 laser = opto_trials.LaserBool(self.session_path) 1f
782 dsets_laser, out_files_laser = laser.extract(save=True) 1f
783 dsets.update({k: v for k, v in zip(laser.var_names, dsets_laser)}) 1f
784 out_files.extend(out_files_laser) 1f
785 return dsets, out_files 1f
788class EphysCellsQc(tasks.Task):
789 priority = 90
790 level = 3
791 force = False
793 signature = {
794 'input_files': [('spikes.times.npy', 'alf/probe*', True),
795 ('spikes.clusters.npy', 'alf/probe*', True),
796 ('spikes.amps.npy', 'alf/probe*', True),
797 ('spikes.depths.npy', 'alf/probe*', True),
798 ('clusters.channels.npy', 'alf/probe*', True)],
799 'output_files': [('clusters.metrics.pqt', 'alf/probe*', True)]
800 }
802 def _compute_cell_qc(self, folder_probe):
803 """
804 Computes the cell QC given an extracted probe alf path
805 :param folder_probe: folder
806 :return:
807 """
808 # compute the straight qc
809 _logger.info(f"Computing cluster qc for {folder_probe}")
810 spikes = alfio.load_object(folder_probe, 'spikes')
811 clusters = alfio.load_object(folder_probe, 'clusters')
812 df_units, drift = ephysqc.spike_sorting_metrics(
813 spikes.times, spikes.clusters, spikes.amps, spikes.depths,
814 cluster_ids=np.arange(clusters.channels.size))
815 # if the ks2 labels file exist, load them and add the column
816 file_labels = folder_probe.joinpath('cluster_KSLabel.tsv')
817 if file_labels.exists():
818 ks2_labels = pd.read_csv(file_labels, sep='\t')
819 ks2_labels.rename(columns={'KSLabel': 'ks2_label'}, inplace=True)
820 df_units = pd.concat(
821 [df_units, ks2_labels['ks2_label'].reindex(df_units.index)], axis=1)
822 # save as parquet file
823 df_units.to_parquet(folder_probe.joinpath("clusters.metrics.pqt"))
824 return folder_probe.joinpath("clusters.metrics.pqt"), df_units, drift
826 def _label_probe_qc(self, folder_probe, df_units, drift):
827 """
828 Labels the json field of the alyx corresponding probe insertion
829 :param folder_probe:
830 :param df_units:
831 :param drift:
832 :return:
833 """
834 eid = self.one.path2eid(self.session_path, query_type='remote')
835 # the probe name is the first folder after alf: {session_path}/alf/{probe_name}/{spike_sorter_name}
836 probe_name = Path(folder_probe).relative_to(self.session_path.joinpath('alf')).parts[0]
837 pdict = self.one.alyx.rest('insertions', 'list', session=eid, name=probe_name, no_cache=True)
838 if len(pdict) != 1:
839 _logger.warning(f'No probe found for probe name: {probe_name}')
840 return
841 isok = df_units['label'] == 1
842 qcdict = {'n_units': int(df_units.shape[0]),
843 'n_units_qc_pass': int(np.sum(isok)),
844 'firing_rate_max': np.max(df_units['firing_rate'][isok]),
845 'firing_rate_median': np.median(df_units['firing_rate'][isok]),
846 'amplitude_max_uV': np.max(df_units['amp_max'][isok]) * 1e6,
847 'amplitude_median_uV': np.max(df_units['amp_median'][isok]) * 1e6,
848 'drift_rms_um': rms(drift['drift_um']),
849 }
850 file_wm = folder_probe.joinpath('_kilosort_whitening.matrix.npy')
851 if file_wm.exists():
852 wm = np.load(file_wm)
853 qcdict['whitening_matrix_conditioning'] = np.linalg.cond(wm)
854 # groom qc dict (this function will eventually go directly into the json field update)
855 for k in qcdict:
856 if isinstance(qcdict[k], np.int64):
857 qcdict[k] = int(qcdict[k])
858 elif isinstance(qcdict[k], float):
859 qcdict[k] = np.round(qcdict[k], 2)
860 self.one.alyx.json_field_update("insertions", pdict[0]["id"], "json", qcdict)
862 def _run(self):
863 """
864 Post spike-sorting quality control at the cluster level.
865 Outputs a QC table in the clusters ALF object and labels corresponding probes in Alyx
866 """
867 files_spikes = Path(self.session_path).joinpath('alf').rglob('spikes.times.npy')
868 folder_probes = [f.parent for f in files_spikes]
869 out_files = []
870 for folder_probe in folder_probes:
871 try:
872 qc_file, df_units, drift = self._compute_cell_qc(folder_probe)
873 out_files.append(qc_file)
874 self._label_probe_qc(folder_probe, df_units, drift)
875 except Exception:
876 _logger.error(traceback.format_exc())
877 self.status = -1
878 continue
879 return out_files
881 def get_signatures(self, **kwargs):
882 files_spikes = Path(self.session_path).joinpath('alf').rglob('spikes.times.npy') 1l
883 folder_probes = [f.parent for f in files_spikes] 1l
885 full_input_files = [] 1l
886 for sig in self.signature['input_files']: 1l
887 for folder in folder_probes: 1l
888 full_input_files.append((sig[0], str(folder.relative_to(self.session_path)), sig[2])) 1l
890 self.input_files = full_input_files 1l
892 full_output_files = [] 1l
893 for sig in self.signature['output_files']: 1l
894 for folder in folder_probes: 1l
895 full_output_files.append((sig[0], str(folder.relative_to(self.session_path)), sig[2])) 1l
897 self.output_files = full_output_files 1l
900class EphysMtscomp(tasks.Task):
901 priority = 50 # ideally after spike sorting
902 level = 0
903 force = False
904 signature = {
905 'input_files': [('*ap.meta', 'raw_ephys_data/probe*', True),
906 ('*ap.*bin', 'raw_ephys_data/probe*', True),
907 ('*lf.meta', 'raw_ephys_data/probe*', False), # NP2 doesn't have lf files
908 ('*lf.*bin', 'raw_ephys_data/probe*', False), # NP2 doesn't have lf files
909 ('*nidq.meta', 'raw_ephys_data', True),
910 ('*nidq.*bin', 'raw_ephys_data', True)],
911 'output_files': [('*ap.meta', 'raw_ephys_data/probe*', True),
912 ('*ap.cbin', 'raw_ephys_data/probe*', False), # may not be present on local server anymore
913 ('*ap.ch', 'raw_ephys_data/probe*', True),
914 ('*lf.meta', 'raw_ephys_data/probe*', False), # NP2 doesn't have lf files # TODO detect from meta
915 ('*lf.cbin', 'raw_ephys_data/probe*', False), # may not be present on local server anymore
916 ('*lf.ch', 'raw_ephys_data/probe*', False),
917 ('*nidq.meta', 'raw_ephys_data', True),
918 ('*nidq.cbin', 'raw_ephys_data', False), # may not be present on local server anymore
919 ('*nidq.ch', 'raw_ephys_data', True)]
920 }
922 def _run(self):
923 """
924 Compress ephys files looking for `compress_ephys.flag` within the probes folder
925 Original bin file will be removed
926 The registration flag created contains targeted file names at the root of the session
927 """
929 out_files = []
930 ephys_files = spikeglx.glob_ephys_files(self.session_path)
931 ephys_files += spikeglx.glob_ephys_files(self.session_path, ext="ch")
932 ephys_files += spikeglx.glob_ephys_files(self.session_path, ext="meta")
934 for ef in ephys_files:
935 for typ in ["ap", "lf", "nidq"]:
936 bin_file = ef.get(typ)
937 if not bin_file:
938 continue
939 if bin_file.suffix.find("bin") == 1:
940 with spikeglx.Reader(bin_file) as sr:
941 if sr.is_mtscomp:
942 out_files.append(bin_file)
943 else:
944 _logger.info(f"Compressing binary file {bin_file}")
945 out_files.append(sr.compress_file(keep_original=False))
946 out_files.append(bin_file.with_suffix('.ch'))
947 else:
948 out_files.append(bin_file)
950 return out_files
952 def get_signatures(self, **kwargs):
953 neuropixel_version = spikeglx.get_neuropixel_version_from_folder(self.session_path) 1h
954 probes = spikeglx.get_probes_from_folder(self.session_path) 1h
956 full_input_files = [] 1h
957 for sig in self.signature['input_files']: 1h
958 if 'raw_ephys_data/probe*' in sig[1]: 1h
959 for probe in probes: 1h
960 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1h
961 else:
962 if neuropixel_version != '3A': 1h
963 full_input_files.append((sig[0], sig[1], sig[2])) 1h
965 self.input_files = full_input_files 1h
967 full_output_files = [] 1h
968 for sig in self.signature['output_files']: 1h
969 if 'raw_ephys_data/probe*' in sig[1]: 1h
970 for probe in probes: 1h
971 full_output_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1h
972 else:
973 if neuropixel_version != '3A': 1h
974 full_output_files.append((sig[0], sig[1], sig[2])) 1h
976 self.output_files = full_output_files 1h
979class EphysDLC(tasks.Task):
980 """
981 This task relies on a correctly installed dlc environment as per
982 https://docs.google.com/document/d/1g0scP6_3EmaXCU4SsDNZWwDTaD9MG0es_grLA-d0gh0/edit#
984 If your environment is set up otherwise, make sure that you set the respective attributes:
985 t = EphysDLC(session_path)
986 t.dlcenv = Path('/path/to/your/dlcenv/bin/activate')
987 t.scripts = Path('/path/to/your/iblscripts/deploy/serverpc/dlc')
988 """
989 gpu = 1
990 cpu = 4
991 io_charge = 100
992 level = 2
993 force = True
994 job_size = 'large'
996 dlcenv = Path.home().joinpath('Documents', 'PYTHON', 'envs', 'dlcenv', 'bin', 'activate')
997 scripts = Path.home().joinpath('Documents', 'PYTHON', 'iblscripts', 'deploy', 'serverpc', 'dlc')
998 signature = {
999 'input_files': [
1000 ('_iblrig_leftCamera.raw.mp4', 'raw_video_data', True),
1001 ('_iblrig_rightCamera.raw.mp4', 'raw_video_data', True),
1002 ('_iblrig_bodyCamera.raw.mp4', 'raw_video_data', True),
1003 ],
1004 'output_files': [
1005 ('_ibl_leftCamera.dlc.pqt', 'alf', True),
1006 ('_ibl_rightCamera.dlc.pqt', 'alf', True),
1007 ('_ibl_bodyCamera.dlc.pqt', 'alf', True),
1008 ('leftCamera.ROIMotionEnergy.npy', 'alf', True),
1009 ('rightCamera.ROIMotionEnergy.npy', 'alf', True),
1010 ('bodyCamera.ROIMotionEnergy.npy', 'alf', True),
1011 ('leftROIMotionEnergy.position.npy', 'alf', True),
1012 ('rightROIMotionEnergy.position.npy', 'alf', True),
1013 ('bodyROIMotionEnergy.position.npy', 'alf', True),
1014 ],
1015 }
1017 def _check_dlcenv(self):
1018 """Check that scripts are present, dlcenv can be activated and get iblvideo version"""
1019 assert len(list(self.scripts.rglob('run_dlc.*'))) == 2, \
1020 f'Scripts run_dlc.sh and run_dlc.py do not exist in {self.scripts}'
1021 assert len(list(self.scripts.rglob('run_motion.*'))) == 2, \
1022 f'Scripts run_motion.sh and run_motion.py do not exist in {self.scripts}'
1023 assert self.dlcenv.exists(), f"DLC environment does not exist in assumed location {self.dlcenv}"
1024 command2run = f"source {self.dlcenv}; python -c 'import iblvideo; print(iblvideo.__version__)'"
1025 process = subprocess.Popen(
1026 command2run,
1027 shell=True,
1028 stdout=subprocess.PIPE,
1029 stderr=subprocess.PIPE,
1030 executable="/bin/bash"
1031 )
1032 info, error = process.communicate()
1033 if process.returncode != 0:
1034 raise AssertionError(f"DLC environment check failed\n{error.decode('utf-8')}")
1035 version = info.decode("utf-8").strip().split('\n')[-1]
1036 return version
1038 @staticmethod
1039 def _video_intact(file_mp4):
1040 """Checks that the downloaded video can be opened and is not empty"""
1041 cap = cv2.VideoCapture(str(file_mp4))
1042 frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
1043 intact = True if frame_count > 0 else False
1044 cap.release()
1045 return intact
1047 def _run(self, cams=None, overwrite=False):
1048 # Default to all three cams
1049 cams = cams or ['left', 'right', 'body']
1050 cams = assert_valid_label(cams)
1051 # Set up
1052 self.session_id = self.one.path2eid(self.session_path)
1053 actual_outputs = []
1055 # Loop through cams
1056 for cam in cams:
1057 # Catch exceptions so that following cameras can still run
1058 try:
1059 # If all results exist and overwrite is False, skip computation
1060 expected_outputs_present, expected_outputs = self.assert_expected(self.output_files, silent=True)
1061 if overwrite is False and expected_outputs_present is True:
1062 actual_outputs.extend(expected_outputs)
1063 return actual_outputs
1064 else:
1065 file_mp4 = next(self.session_path.joinpath('raw_video_data').glob(f'_iblrig_{cam}Camera.raw*.mp4'))
1066 if not file_mp4.exists():
1067 # In this case we set the status to Incomplete.
1068 _logger.error(f"No raw video file available for {cam}, skipping.")
1069 self.status = -3
1070 continue
1071 if not self._video_intact(file_mp4):
1072 _logger.error(f"Corrupt raw video file {file_mp4}")
1073 self.status = -1
1074 continue
1075 # Check that dlc environment is ok, shell scripts exists, and get iblvideo version, GPU addressable
1076 self.version = self._check_dlcenv()
1077 _logger.info(f'iblvideo version {self.version}')
1078 check_nvidia_driver()
1080 _logger.info(f'Running DLC on {cam}Camera.')
1081 command2run = f"{self.scripts.joinpath('run_dlc.sh')} {str(self.dlcenv)} {file_mp4} {overwrite}"
1082 _logger.info(command2run)
1083 process = subprocess.Popen(
1084 command2run,
1085 shell=True,
1086 stdout=subprocess.PIPE,
1087 stderr=subprocess.PIPE,
1088 executable="/bin/bash",
1089 )
1090 info, error = process.communicate()
1091 # info_str = info.decode("utf-8").strip()
1092 # _logger.info(info_str)
1093 if process.returncode != 0:
1094 error_str = error.decode("utf-8").strip()
1095 _logger.error(f'DLC failed for {cam}Camera.\n\n'
1096 f'++++++++ Output of subprocess for debugging ++++++++\n\n'
1097 f'{error_str}\n'
1098 f'++++++++++++++++++++++++++++++++++++++++++++\n')
1099 self.status = -1
1100 # We dont' run motion energy, or add any files if dlc failed to run
1101 continue
1102 dlc_result = next(self.session_path.joinpath('alf').glob(f'_ibl_{cam}Camera.dlc*.pqt'))
1103 actual_outputs.append(dlc_result)
1105 _logger.info(f'Computing motion energy for {cam}Camera')
1106 command2run = f"{self.scripts.joinpath('run_motion.sh')} {str(self.dlcenv)} {file_mp4} {dlc_result}"
1107 _logger.info(command2run)
1108 process = subprocess.Popen(
1109 command2run,
1110 shell=True,
1111 stdout=subprocess.PIPE,
1112 stderr=subprocess.PIPE,
1113 executable="/bin/bash",
1114 )
1115 info, error = process.communicate()
1116 # info_str = info.decode("utf-8").strip()
1117 # _logger.info(info_str)
1118 if process.returncode != 0:
1119 error_str = error.decode("utf-8").strip()
1120 _logger.error(f'Motion energy failed for {cam}Camera.\n\n'
1121 f'++++++++ Output of subprocess for debugging ++++++++\n\n'
1122 f'{error_str}\n'
1123 f'++++++++++++++++++++++++++++++++++++++++++++\n')
1124 self.status = -1
1125 continue
1126 actual_outputs.append(next(self.session_path.joinpath('alf').glob(
1127 f'{cam}Camera.ROIMotionEnergy*.npy')))
1128 actual_outputs.append(next(self.session_path.joinpath('alf').glob(
1129 f'{cam}ROIMotionEnergy.position*.npy')))
1130 except BaseException:
1131 _logger.error(traceback.format_exc())
1132 self.status = -1
1133 continue
1134 # If status is Incomplete, check that there is at least one output.
1135 # # Otherwise make sure it gets set to Empty (outputs = None), and set status to -1 to make sure it doesn't slip
1136 if self.status == -3 and len(actual_outputs) == 0:
1137 actual_outputs = None
1138 self.status = -1
1139 return actual_outputs
1142class EphysPostDLC(tasks.Task):
1143 """
1144 The post_dlc task takes dlc traces as input and computes useful quantities, as well as qc.
1145 """
1146 io_charge = 90
1147 level = 3
1148 force = True
1149 signature = {'input_files': [('_ibl_leftCamera.dlc.pqt', 'alf', True),
1150 ('_ibl_bodyCamera.dlc.pqt', 'alf', True),
1151 ('_ibl_rightCamera.dlc.pqt', 'alf', True),
1152 ('_ibl_rightCamera.times.npy', 'alf', True),
1153 ('_ibl_leftCamera.times.npy', 'alf', True),
1154 ('_ibl_bodyCamera.times.npy', 'alf', True),
1155 # the following are required for the DLC plot only
1156 # they are not strictly required, some plots just might be skipped
1157 # In particular the raw videos don't need to be downloaded as they can be streamed
1158 ('_iblrig_bodyCamera.raw.mp4', 'raw_video_data', True),
1159 ('_iblrig_leftCamera.raw.mp4', 'raw_video_data', True),
1160 ('_iblrig_rightCamera.raw.mp4', 'raw_video_data', True),
1161 ('rightROIMotionEnergy.position.npy', 'alf', False),
1162 ('leftROIMotionEnergy.position.npy', 'alf', False),
1163 ('bodyROIMotionEnergy.position.npy', 'alf', False),
1164 ('_ibl_trials.table.pqt', 'alf', True),
1165 ('_ibl_wheel.position.npy', 'alf', True),
1166 ('_ibl_wheel.timestamps.npy', 'alf', True),
1167 ],
1168 # More files are required for all panels of the DLC QC plot to function
1169 'output_files': [('_ibl_leftCamera.features.pqt', 'alf', True),
1170 ('_ibl_rightCamera.features.pqt', 'alf', True),
1171 ('licks.times.npy', 'alf', True),
1172 # ('dlc_qc_plot.png', 'snapshot', False)
1173 ]
1174 }
1176 def _run(self, overwrite=True, run_qc=True, plot_qc=True):
1177 """
1178 Run the EphysPostDLC task. Returns a list of file locations for the output files in signature. The created plot
1179 (dlc_qc_plot.png) is not returned, but saved in session_path/snapshots and uploaded to Alyx as a note.
1181 :param overwrite: bool, whether to recompute existing output files (default is False).
1182 Note that the dlc_qc_plot will be (re-)computed even if overwrite = False
1183 :param run_qc: bool, whether to run the DLC QC (default is True)
1184 :param plot_qc: book, whether to create the dlc_qc_plot (default is True)
1186 """
1187 # Check if output files exist locally
1188 exist, output_files = self.assert_expected(self.signature['output_files'], silent=True)
1189 if exist and not overwrite:
1190 _logger.warning('EphysPostDLC outputs exist and overwrite=False, skipping computations of outputs.')
1191 else:
1192 if exist and overwrite:
1193 _logger.warning('EphysPostDLC outputs exist and overwrite=True, overwriting existing outputs.')
1194 # Find all available dlc files
1195 dlc_files = list(Path(self.session_path).joinpath('alf').glob('_ibl_*Camera.dlc.*'))
1196 for dlc_file in dlc_files:
1197 _logger.debug(dlc_file)
1198 output_files = []
1199 combined_licks = []
1201 for dlc_file in dlc_files:
1202 # Catch unforeseen exceptions and move on to next cam
1203 try:
1204 cam = label_from_path(dlc_file)
1205 # load dlc trace and camera times
1206 dlc = pd.read_parquet(dlc_file)
1207 dlc_thresh = likelihood_threshold(dlc, 0.9)
1208 # try to load respective camera times
1209 try:
1210 dlc_t = np.load(next(Path(self.session_path).joinpath('alf').glob(f'_ibl_{cam}Camera.times.*npy')))
1211 times = True
1212 if dlc_t.shape[0] == 0:
1213 _logger.error(f'camera.times empty for {cam} camera. '
1214 f'Computations using camera.times will be skipped')
1215 self.status = -1
1216 times = False
1217 elif dlc_t.shape[0] < len(dlc_thresh):
1218 _logger.error(f'Camera times shorter than DLC traces for {cam} camera. '
1219 f'Computations using camera.times will be skipped')
1220 self.status = -1
1221 times = 'short'
1222 except StopIteration:
1223 self.status = -1
1224 times = False
1225 _logger.error(f'No camera.times for {cam} camera. '
1226 f'Computations using camera.times will be skipped')
1227 # These features are only computed from left and right cam
1228 if cam in ('left', 'right'):
1229 features = pd.DataFrame()
1230 # If camera times are available, get the lick time stamps for combined array
1231 if times is True:
1232 _logger.info(f"Computing lick times for {cam} camera.")
1233 combined_licks.append(get_licks(dlc_thresh, dlc_t))
1234 elif times is False:
1235 _logger.warning(f"Skipping lick times for {cam} camera as no camera.times available")
1236 elif times == 'short':
1237 _logger.warning(f"Skipping lick times for {cam} camera as camera.times are too short")
1238 # Compute pupil diameter, raw and smoothed
1239 _logger.info(f"Computing raw pupil diameter for {cam} camera.")
1240 features['pupilDiameter_raw'] = get_pupil_diameter(dlc_thresh)
1241 try:
1242 _logger.info(f"Computing smooth pupil diameter for {cam} camera.")
1243 features['pupilDiameter_smooth'] = get_smooth_pupil_diameter(features['pupilDiameter_raw'],
1244 cam)
1245 except BaseException:
1246 _logger.error(f"Computing smooth pupil diameter for {cam} camera failed, saving all NaNs.")
1247 _logger.error(traceback.format_exc())
1248 features['pupilDiameter_smooth'] = np.nan
1249 # Safe to pqt
1250 features_file = Path(self.session_path).joinpath('alf', f'_ibl_{cam}Camera.features.pqt')
1251 features.to_parquet(features_file)
1252 output_files.append(features_file)
1254 # For all cams, compute DLC qc if times available
1255 if run_qc is True and times in [True, 'short']:
1256 # Setting download_data to False because at this point the data should be there
1257 qc = DlcQC(self.session_path, side=cam, one=self.one, download_data=False)
1258 qc.run(update=True)
1259 else:
1260 if times is False:
1261 _logger.warning(f"Skipping QC for {cam} camera as no camera.times available")
1262 if not run_qc:
1263 _logger.warning(f"Skipping QC for {cam} camera as run_qc=False")
1265 except BaseException:
1266 _logger.error(traceback.format_exc())
1267 self.status = -1
1268 continue
1270 # Combined lick times
1271 if len(combined_licks) > 0:
1272 lick_times_file = Path(self.session_path).joinpath('alf', 'licks.times.npy')
1273 np.save(lick_times_file, sorted(np.concatenate(combined_licks)))
1274 output_files.append(lick_times_file)
1275 else:
1276 _logger.warning("No lick times computed for this session.")
1278 if plot_qc:
1279 _logger.info("Creating DLC QC plot")
1280 try:
1281 session_id = self.one.path2eid(self.session_path)
1282 fig_path = self.session_path.joinpath('snapshot', 'dlc_qc_plot.png')
1283 if not fig_path.parent.exists():
1284 fig_path.parent.mkdir(parents=True, exist_ok=True)
1285 fig = dlc_qc_plot(self.session_path, one=self.one)
1286 fig.savefig(fig_path)
1287 fig.clf()
1288 snp = ReportSnapshot(self.session_path, session_id, one=self.one)
1289 snp.outputs = [fig_path]
1290 snp.register_images(widths=['orig'],
1291 function=str(dlc_qc_plot.__module__) + '.' + str(dlc_qc_plot.__name__))
1292 except BaseException:
1293 _logger.error('Could not create and/or upload DLC QC Plot')
1294 _logger.error(traceback.format_exc())
1295 self.status = -1
1297 return output_files
1300class EphysPassive(tasks.Task):
1301 cpu = 1
1302 io_charge = 90
1303 level = 1
1304 force = False
1305 signature = {
1306 'input_files': [('_iblrig_taskSettings.raw*', 'raw_behavior_data', True),
1307 ('_spikeglx_sync.channels.*', 'raw_ephys_data*', True),
1308 ('_spikeglx_sync.polarities.*', 'raw_ephys_data*', True),
1309 ('_spikeglx_sync.times.*', 'raw_ephys_data*', True),
1310 ('*.meta', 'raw_ephys_data*', True),
1311 ('*wiring.json', 'raw_ephys_data*', False),
1312 ('_iblrig_RFMapStim.raw*', 'raw_passive_data', True)],
1313 'output_files': [('_ibl_passiveGabor.table.csv', 'alf', True),
1314 ('_ibl_passivePeriods.intervalsTable.csv', 'alf', True),
1315 ('_ibl_passiveRFM.times.npy', 'alf', True),
1316 ('_ibl_passiveStims.table.csv', 'alf', True)]}
1318 def _run(self):
1319 """returns a list of pathlib.Paths. """
1320 data, paths = ephys_passive.PassiveChoiceWorld(self.session_path).extract(save=True)
1321 if any([x is None for x in paths]):
1322 self.status = -1
1323 # Register?
1324 return paths
1326 def get_signatures(self, **kwargs):
1327 neuropixel_version = spikeglx.get_neuropixel_version_from_folder(self.session_path) 1m
1328 probes = spikeglx.get_probes_from_folder(self.session_path) 1m
1330 full_input_files = [] 1m
1331 for sig in self.signature['input_files']: 1m
1332 if 'raw_ephys_data*' in sig[1]: 1m
1333 if neuropixel_version != '3A': 1m
1334 full_input_files.append((sig[0], 'raw_ephys_data', sig[2])) 1m
1335 for probe in probes: 1m
1336 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1m
1337 else:
1338 full_input_files.append(sig) 1m
1340 self.input_files = full_input_files 1m
1342 self.output_files = self.signature['output_files'] 1m
1345class EphysExtractionPipeline(tasks.Pipeline):
1346 label = __name__
1348 def __init__(self, session_path=None, **kwargs):
1349 super(EphysExtractionPipeline, self).__init__(session_path, **kwargs)
1350 tasks = OrderedDict()
1351 self.session_path = session_path
1352 # level 0
1353 tasks['ExperimentDescriptionRegisterRaw'] = base_tasks.ExperimentDescriptionRegisterRaw(self.session_path)
1354 tasks["EphysRegisterRaw"] = tpp.TrainingRegisterRaw(self.session_path)
1355 tasks["EphysPulses"] = EphysPulses(self.session_path)
1356 tasks["EphysRawQC"] = RawEphysQC(self.session_path)
1357 tasks["EphysAudio"] = EphysAudio(self.session_path)
1358 tasks["EphysMtscomp"] = EphysMtscomp(self.session_path)
1359 tasks['EphysVideoCompress'] = EphysVideoCompress(self.session_path)
1360 # level 1
1361 tasks["SpikeSorting"] = SpikeSorting(
1362 self.session_path, parents=[tasks["EphysMtscomp"], tasks["EphysPulses"]])
1363 tasks["EphysTrials"] = EphysTrials(self.session_path, parents=[tasks["EphysPulses"]])
1365 tasks["EphysPassive"] = EphysPassive(self.session_path, parents=[tasks["EphysPulses"]])
1366 # level 2
1367 tasks["EphysVideoSyncQc"] = EphysVideoSyncQc(
1368 self.session_path, parents=[tasks["EphysVideoCompress"], tasks["EphysPulses"], tasks["EphysTrials"]])
1369 tasks["EphysCellsQc"] = EphysCellsQc(self.session_path, parents=[tasks["SpikeSorting"]])
1370 tasks["EphysDLC"] = EphysDLC(self.session_path, parents=[tasks["EphysVideoCompress"]])
1371 tasks['EphysTrainingStatus'] = tpp.TrainingStatus(self.session_path, parents=[tasks["EphysTrials"]])
1372 # level 3
1373 tasks["EphysPostDLC"] = EphysPostDLC(self.session_path, parents=[tasks["EphysDLC"], tasks["EphysTrials"],
1374 tasks["EphysVideoSyncQc"]])
1375 self.tasks = tasks