Coverage for ibllib/pipes/ephys_preprocessing.py: 78%
767 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
1import logging
2import re
3import shutil
4import subprocess
5from collections import OrderedDict
6import traceback
7from pathlib import Path
8import warnings
10import cv2
11import numpy as np
12import pandas as pd
13import packaging.version
15import one.alf.io as alfio
16from neurodsp.utils import rms
17import spikeglx
19from ibllib.misc import check_nvidia_driver
20from ibllib.ephys import ephysqc, spikes, sync_probes
21from ibllib.io import ffmpeg
22from ibllib.io.video import label_from_path, assert_valid_label
23from ibllib.io.extractors import ephys_fpga, ephys_passive, camera
24from ibllib.pipes import tasks, base_tasks
25import ibllib.pipes.training_preprocessing as tpp
26from ibllib.pipes.misc import create_alyx_probe_insertions
27from ibllib.qc.alignment_qc import get_aligned_channels
28from ibllib.qc.task_extractors import TaskQCExtractor
29from ibllib.qc.task_metrics import TaskQC
30from ibllib.qc.camera import run_all_qc as run_camera_qc
31from ibllib.qc.dlc import DlcQC
32from ibllib.plots.figures import dlc_qc_plot, BehaviourPlots, LfpPlots, ApPlots, BadChannelsAp
33from ibllib.plots.figures import SpikeSorting as SpikeSortingPlots
34from ibllib.plots.snapshot import ReportSnapshot
35from brainbox.behavior.dlc import likelihood_threshold, get_licks, get_pupil_diameter, get_smooth_pupil_diameter
37_logger = logging.getLogger("ibllib")
38warnings.warn('`pipes.training_preprocessing` to be removed in favour of dynamic pipeline')
41# level 0
42class EphysPulses(tasks.Task):
43 """
44 Extract Pulses from raw electrophysiology data into numpy arrays
45 Perform the probes synchronisation with nidq (3B) or main probe (3A)
46 """
47 cpu = 2
48 io_charge = 30 # this jobs reads raw ap files
49 priority = 90 # a lot of jobs depend on this one
50 level = 0 # this job doesn't depend on anything
51 force = False # whether or not to force download of missing data on local server if outputs already exist
52 signature = {
53 'input_files': [('*ap.meta', 'raw_ephys_data/probe*', True),
54 ('*ap.ch', 'raw_ephys_data/probe*', False), # not necessary when we have .bin file
55 ('*ap.*bin', 'raw_ephys_data/probe*', True),
56 ('*nidq.meta', 'raw_ephys_data', True),
57 ('*nidq.ch', 'raw_ephys_data', False), # not necessary when we have .bin file
58 ('*nidq.*bin', 'raw_ephys_data', True)],
59 'output_files': [('_spikeglx_sync*.npy', 'raw_ephys_data*', True),
60 ('_spikeglx_sync.polarities*.npy', 'raw_ephys_data*', True),
61 ('_spikeglx_sync.times*.npy', 'raw_ephys_data*', True)]
62 }
64 def get_signatures(self, **kwargs):
65 """
66 Find the input and output signatures specific for local filesystem
67 :return:
68 """
69 neuropixel_version = spikeglx.get_neuropixel_version_from_folder(self.session_path) 1al
70 probes = spikeglx.get_probes_from_folder(self.session_path) 1al
72 full_input_files = [] 1al
73 for sig in self.signature['input_files']: 1al
74 if 'raw_ephys_data/probe*' in sig[1]: 1al
75 for probe in probes: 1al
76 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1al
77 else:
78 if neuropixel_version != '3A': 1al
79 full_input_files.append((sig[0], sig[1], sig[2])) 1al
81 self.input_files = full_input_files 1al
83 full_output_files = [] 1al
84 for sig in self.signature['output_files']: 1al
85 if neuropixel_version != '3A': 1al
86 full_output_files.append((sig[0], 'raw_ephys_data', sig[2])) 1al
87 for probe in probes: 1al
88 full_output_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1al
90 self.output_files = full_output_files 1al
92 def _run(self, overwrite=False):
93 # outputs numpy
94 syncs, out_files = ephys_fpga.extract_sync(self.session_path, overwrite=overwrite) 1a
95 for out_file in out_files: 1a
96 _logger.info(f"extracted pulses for {out_file}") 1a
98 status, sync_files = sync_probes.sync(self.session_path) 1a
99 return out_files + sync_files 1a
102class RawEphysQC(tasks.Task):
103 """
104 Computes raw electrophysiology QC
105 """
106 cpu = 2
107 io_charge = 30 # this jobs reads raw ap files
108 priority = 10 # a lot of jobs depend on this one
109 level = 0 # this job doesn't depend on anything
110 force = False
111 signature = {
112 'input_files': [('*ap.meta', 'raw_ephys_data/probe*', True),
113 ('*lf.meta', 'raw_ephys_data/probe*', True), # not necessary to run task as optional computation
114 ('*lf.ch', 'raw_ephys_data/probe*', False), # not required it .bin file
115 ('*lf.*bin', 'raw_ephys_data/probe*', True)], # not necessary to run task as optional computation
116 'output_files': [('_iblqc_ephysChannels.apRMS.npy', 'raw_ephys_data/probe*', True),
117 ('_iblqc_ephysChannels.rawSpikeRates.npy', 'raw_ephys_data/probe*', True),
118 ('_iblqc_ephysChannels.labels.npy', 'raw_ephys_data/probe*', True),
119 ('_iblqc_ephysSpectralDensityLF.freqs.npy', 'raw_ephys_data/probe*', True),
120 ('_iblqc_ephysSpectralDensityLF.power.npy', 'raw_ephys_data/probe*', True),
121 ('_iblqc_ephysSpectralDensityAP.freqs.npy', 'raw_ephys_data/probe*', True),
122 ('_iblqc_ephysSpectralDensityAP.power.npy', 'raw_ephys_data/probe*', True),
123 ('_iblqc_ephysTimeRmsLF.rms.npy', 'raw_ephys_data/probe*', True),
124 ('_iblqc_ephysTimeRmsLF.timestamps.npy', 'raw_ephys_data/probe*', True)]
125 }
127 def _run(self, overwrite=False):
128 eid = self.one.path2eid(self.session_path) 1a
129 probes = [(x['id'], x['name']) for x in self.one.alyx.rest('insertions', 'list', session=eid)] 1a
130 # Usually there should be two probes, if there are less, check if all probes are registered
131 if len(probes) < 2: 1a
132 _logger.warning(f"{len(probes)} probes registered for session {eid}, trying to register from local data") 1a
133 probes = [(p['id'], p['name']) for p in create_alyx_probe_insertions(self.session_path, one=self.one)] 1a
134 qc_files = [] 1a
135 for pid, pname in probes: 1a
136 _logger.info(f"\nRunning QC for probe insertion {pname}") 1a
137 try: 1a
138 eqc = ephysqc.EphysQC(pid, session_path=self.session_path, one=self.one) 1a
139 qc_files.extend(eqc.run(update=True, overwrite=overwrite)) 1a
140 _logger.info("Creating LFP QC plots") 1a
141 plot_task = LfpPlots(pid, session_path=self.session_path, one=self.one) 1a
142 _ = plot_task.run() 1a
143 self.plot_tasks.append(plot_task) 1a
144 plot_task = BadChannelsAp(pid, session_path=self.session_path, one=self.one) 1a
145 _ = plot_task.run() 1a
146 self.plot_tasks.append(plot_task) 1a
148 except AssertionError:
149 _logger.error(traceback.format_exc())
150 self.status = -1
151 continue
152 return qc_files 1a
154 def get_signatures(self, **kwargs):
155 probes = spikeglx.get_probes_from_folder(self.session_path) 1ah
157 full_output_files = [] 1ah
158 for sig in self.signature['output_files']: 1ah
159 if 'raw_ephys_data/probe*' in sig[1]: 1ah
160 for probe in probes: 1ah
161 full_output_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1ah
163 self.output_files = full_output_files 1ah
165 # input lf signature required or not required status is going to depend on the output we have, need to be agile here to
166 # avoid unnecessary downloading of lf.cbin files
167 expected_count = 0 1ah
168 count = 0 1ah
169 # check to see if we have lfp qc datasets
170 for expected_file in full_output_files: 1ah
171 if 'LF' in expected_file[0]: 1ah
172 expected_count += 1 1ah
173 actual_files = list(Path(self.session_path).rglob(str(Path(expected_file[1]).joinpath(expected_file[0])))) 1ah
174 if len(actual_files) == 1: 1ah
175 count += 1 1ah
177 lf_required = False if count == expected_count else True 1ah
179 full_input_files = [] 1ah
180 for sig in self.signature['input_files']: 1ah
181 if 'raw_ephys_data/probe*' in sig[1]: 1ah
182 for probe in probes: 1ah
183 if 'lf' in sig[0]: 1ah
184 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', lf_required if sig[2] else sig[2])) 1ah
185 else:
186 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1ah
188 self.input_files = full_input_files 1ah
191class EphysAudio(tasks.Task):
192 """
193 Compresses the microphone wav file in a lossless flac file
194 """
195 # TESTS DONE
196 cpu = 2
197 priority = 10 # a lot of jobs depend on this one
198 level = 0 # this job doesn't depend on anything
199 force = False
200 signature = {
201 'input_files': [('_iblrig_micData.raw.wav', 'raw_behavior_data', True)],
202 'output_files': [('_iblrig_micData.raw.flac', 'raw_behavior_data', True)],
203 }
205 def _run(self, overwrite=False):
206 command = "ffmpeg -i {file_in} -y -nostdin -c:a flac -nostats {file_out}" 1ac
207 file_in = next(self.session_path.rglob("_iblrig_micData.raw.wav"), None) 1ac
208 if file_in is None: 1ac
209 return
210 file_out = file_in.with_suffix(".flac") 1ac
211 status, output_file = ffmpeg.compress(file_in=file_in, file_out=file_out, command=command) 1ac
212 return [output_file] 1ac
215class SpikeSorting(tasks.Task):
216 """
217 Pykilosort 2.5 pipeline
218 """
219 gpu = 1
220 io_charge = 100 # this jobs reads raw ap files
221 priority = 60
222 level = 1 # this job doesn't depend on anything
223 force = True
224 job_size = 'large'
226 SHELL_SCRIPT = Path.home().joinpath(
227 "Documents/PYTHON/iblscripts/deploy/serverpc/kilosort2/run_pykilosort.sh"
228 )
229 SPIKE_SORTER_NAME = 'pykilosort'
230 PYKILOSORT_REPO = Path.home().joinpath('Documents/PYTHON/SPIKE_SORTING/pykilosort')
231 signature = {
232 'input_files': [], # see setUp method for declaration of inputs
233 'output_files': [] # see setUp method for declaration of inputs
234 }
236 @staticmethod
237 def spike_sorting_signature(pname=None):
238 pname = pname if pname is not None else "probe*" 1ag
239 input_signature = [('*ap.meta', f'raw_ephys_data/{pname}', True), 1ag
240 ('*ap.ch', f'raw_ephys_data/{pname}', True),
241 ('*ap.cbin', f'raw_ephys_data/{pname}', True),
242 ('*nidq.meta', 'raw_ephys_data', False),
243 ('_spikeglx_sync.channels.*', 'raw_ephys_data*', True),
244 ('_spikeglx_sync.polarities.*', 'raw_ephys_data*', True),
245 ('_spikeglx_sync.times.*', 'raw_ephys_data*', True),
246 ('_iblrig_taskData.raw.*', 'raw_behavior_data', True),
247 ('_iblrig_taskSettings.raw.*', 'raw_behavior_data', True)]
248 output_signature = [('spike_sorting_pykilosort.log', f'spike_sorters/pykilosort/{pname}', True), 1ag
249 ('_iblqc_ephysTimeRmsAP.rms.npy', f'raw_ephys_data/{pname}', True), # new ibllib 2.5
250 ('_iblqc_ephysTimeRmsAP.timestamps.npy', f'raw_ephys_data/{pname}', True)] # new ibllib 2.5
251 return input_signature, output_signature 1ag
253 @staticmethod
254 def _sample2v(ap_file):
255 md = spikeglx.read_meta_data(ap_file.with_suffix(".meta")) 1a
256 s2v = spikeglx._conversion_sample2v_from_meta(md) 1a
257 return s2v["ap"][0] 1a
259 @staticmethod
260 def _fetch_pykilosort_version(repo_path):
261 init_file = Path(repo_path).joinpath('pykilosort', '__init__.py') 1a
262 version = SpikeSorting._fetch_ks2_commit_hash(repo_path) # default 1a
263 try: 1a
264 with open(init_file) as fid: 1a
265 lines = fid.readlines() 1a
266 for line in lines: 1a
267 if line.startswith("__version__ = "): 1a
268 version = line.split('=')[-1].strip().replace('"', '').replace("'", '') 1a
269 except Exception:
270 pass
271 return f"pykilosort_{version}" 1a
273 @staticmethod
274 def _fetch_pykilosort_run_version(log_file):
275 """
276 Parse the following line (2 formats depending on version) from the log files to get the version
277 '\x1b[0m15:39:37.919 [I] ibl:90 Starting Pykilosort version ibl_1.2.1, output in gnagga^[[0m\n'
278 '\x1b[0m15:39:37.919 [I] ibl:90 Starting Pykilosort version ibl_1.3.0^[[0m\n'
279 """
280 with open(log_file) as fid: 1a
281 line = fid.readline() 1a
282 version = re.search('version (.*), output', line) 1a
283 version = version or re.search('version (.*)', line) # old versions have output, new have a version line 1a
284 version = re.sub('\\^[[0-9]+m', '', version.group(1)) # removes the coloring tags 1a
285 return f"pykilosort_{version}" 1a
287 @staticmethod
288 def _fetch_ks2_commit_hash(repo_path):
289 command2run = f"git --git-dir {repo_path}/.git rev-parse --verify HEAD" 1a
290 process = subprocess.Popen( 1a
291 command2run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
292 )
293 info, error = process.communicate() 1a
294 if process.returncode != 0: 1a
295 _logger.error(
296 f"Can't fetch pykilsort commit hash, will still attempt to run \n"
297 f"Error: {error.decode('utf-8')}"
298 )
299 return ""
300 return info.decode("utf-8").strip() 1a
302 @staticmethod
303 def parse_version(v) -> packaging.version.Version:
304 """
305 Extracts and parses semantic version (major.minor.patch) from a version string.
307 Parameters
308 ----------
309 v : str
310 A version string containing a semantic version.
312 Returns
313 -------
314 packaging.version.Version
315 The parsed semantic version number
317 Examples
318 --------
319 >>> SpikeSorting.parse_version('ibl_1.2')
320 <Version('1.2')>
321 >>> SpikeSorting.parse_version('pykilosort_ibl_1.2.0-new')
322 <Version('1.2.0')>
323 >>> SpikeSorting.parse_version('ibl_0.2') < SpikeSorting.parse_version('pykilosort_v1')
324 True
325 """
326 m = re.search(r'(\d+\.?){1,3}', v) 1qa
327 if not m: 1qa
328 raise packaging.version.InvalidVersion(f'Failed to detect SemVer in "{v}"') 1q
329 return packaging.version.parse(m.group()) 1qa
331 def setUp(self, probes=None):
332 """
333 Overwrite setup method to allow inputs and outputs to be only one probe
334 :param probes: list of probes e.g ['probe00']
335 :return:
336 """
337 if not probes or len(probes) == 2: 1a
338 self.signature['input_files'], self.signature['output_files'] = self.spike_sorting_signature() 1a
339 else:
340 self.signature['input_files'], self.signature['output_files'] = self.spike_sorting_signature(probes[0])
342 return super().setUp(probes=probes) 1a
344 def _run_pykilosort(self, ap_file):
345 """
346 Runs the ks2 matlab spike sorting for one probe dataset
347 the raw spike sorting output is in session_path/spike_sorters/{self.SPIKE_SORTER_NAME}/probeXX folder
348 (discontinued support for old spike sortings in the probe folder <1.5.5)
349 :return: path of the folder containing ks2 spike sorting output
350 """
351 self.version = self._fetch_pykilosort_version(self.PYKILOSORT_REPO) 1a
352 label = ap_file.parts[-2] # this is usually the probe name 1a
353 sorter_dir = self.session_path.joinpath("spike_sorters", self.SPIKE_SORTER_NAME, label) 1a
354 self.FORCE_RERUN = False 1a
355 if not self.FORCE_RERUN: 1a
356 log_file = sorter_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log") 1a
357 if log_file.exists(): 1a
358 run_version = self._fetch_pykilosort_run_version(log_file) 1a
359 if self.parse_version(run_version) > self.parse_version('pykilosort_ibl_1.1.0'): 1a
360 _logger.info(f"Already ran: spike_sorting_{self.SPIKE_SORTER_NAME}.log" 1a
361 f" found in {sorter_dir}, skipping.")
362 return sorter_dir 1a
363 else:
364 self.FORCE_RERUN = True
366 print(sorter_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log"))
367 # get the scratch drive from the shell script
368 with open(self.SHELL_SCRIPT) as fid:
369 lines = fid.readlines()
370 line = [line for line in lines if line.startswith("SCRATCH_DRIVE=")][0]
371 m = re.search(r"\=(.*?)(\#|\n)", line)[0]
372 scratch_drive = Path(m[1:-1].strip())
373 assert scratch_drive.exists()
375 # clean up and create directory, this also checks write permissions
376 # temp_dir has the following shape: pykilosort/ZM_3003_2020-07-29_001_probe00
377 # first makes sure the tmp dir is clean
378 shutil.rmtree(scratch_drive.joinpath(self.SPIKE_SORTER_NAME), ignore_errors=True)
379 temp_dir = scratch_drive.joinpath(
380 self.SPIKE_SORTER_NAME, "_".join(list(self.session_path.parts[-3:]) + [label])
381 )
382 if temp_dir.exists(): # hmmm this has to be decided, we may want to restart ?
383 # But failed sessions may then clog the scratch dir and have users run out of space
384 shutil.rmtree(temp_dir, ignore_errors=True)
385 temp_dir.mkdir(parents=True, exist_ok=True)
387 check_nvidia_driver()
388 command2run = f"{self.SHELL_SCRIPT} {ap_file} {temp_dir}"
389 _logger.info(command2run)
390 process = subprocess.Popen(
391 command2run,
392 shell=True,
393 stdout=subprocess.PIPE,
394 stderr=subprocess.PIPE,
395 executable="/bin/bash",
396 )
397 info, error = process.communicate()
398 info_str = info.decode("utf-8").strip()
399 _logger.info(info_str)
400 if process.returncode != 0:
401 error_str = error.decode("utf-8").strip()
402 # try and get the kilosort log if any
403 for log_file in temp_dir.rglob('*_kilosort.log'):
404 with open(log_file) as fid:
405 log = fid.read()
406 _logger.error(log)
407 break
408 raise RuntimeError(f"{self.SPIKE_SORTER_NAME} {info_str}, {error_str}")
410 shutil.copytree(temp_dir.joinpath('output'), sorter_dir, dirs_exist_ok=True)
411 shutil.rmtree(temp_dir, ignore_errors=True)
412 return sorter_dir
414 def _run(self, probes=None):
415 """
416 Multiple steps. For each probe:
417 - Runs ks2 (skips if it already ran)
418 - synchronize the spike sorting
419 - output the probe description files
420 :param probes: (list of str) if provided, will only run spike sorting for specified probe names
421 :return: list of files to be registered on database
422 """
423 efiles = spikeglx.glob_ephys_files(self.session_path) 1a
424 ap_files = [(ef.get("ap"), ef.get("label")) for ef in efiles if "ap" in ef.keys()] 1a
425 out_files = [] 1a
426 for ap_file, label in ap_files: 1a
427 if isinstance(probes, list) and label not in probes: 1a
428 continue
429 try: 1a
430 # if the file is part of a sequence, handles the run accordingly
431 sequence_file = ap_file.parent.joinpath(ap_file.stem.replace('ap', 'sequence.json')) 1a
432 # temporary just skips for now
433 if sequence_file.exists(): 1a
434 continue
435 ks2_dir = self._run_pykilosort(ap_file) # runs ks2, skips if it already ran 1a
436 probe_out_path = self.session_path.joinpath("alf", label, self.SPIKE_SORTER_NAME) 1a
437 shutil.rmtree(probe_out_path, ignore_errors=True) 1a
438 probe_out_path.mkdir(parents=True, exist_ok=True) 1a
439 spikes.ks2_to_alf( 1a
440 ks2_dir,
441 bin_path=ap_file.parent,
442 out_path=probe_out_path,
443 bin_file=ap_file,
444 ampfactor=self._sample2v(ap_file),
445 )
446 logfile = ks2_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log") 1a
447 if logfile.exists(): 1a
448 shutil.copyfile(logfile, probe_out_path.joinpath( 1a
449 f"_ibl_log.info_{self.SPIKE_SORTER_NAME}.log"))
450 out, _ = spikes.sync_spike_sorting(ap_file=ap_file, out_path=probe_out_path) 1a
451 out_files.extend(out) 1a
452 # convert ks2_output into tar file and also register
453 # Make this in case spike sorting is in old raw_ephys_data folders, for new
454 # sessions it should already exist
455 tar_dir = self.session_path.joinpath( 1a
456 'spike_sorters', self.SPIKE_SORTER_NAME, label)
457 tar_dir.mkdir(parents=True, exist_ok=True) 1a
458 out = spikes.ks2_to_tar(ks2_dir, tar_dir, force=self.FORCE_RERUN) 1a
459 out_files.extend(out) 1a
461 if self.one: 1a
462 eid = self.one.path2eid(self.session_path, query_type='remote') 1a
463 ins = self.one.alyx.rest('insertions', 'list', session=eid, name=label, query_type='remote') 1a
464 if len(ins) != 0: 1a
465 _logger.info("Creating SpikeSorting QC plots") 1a
466 plot_task = ApPlots(ins[0]['id'], session_path=self.session_path, one=self.one) 1a
467 _ = plot_task.run() 1a
468 self.plot_tasks.append(plot_task) 1a
470 plot_task = SpikeSortingPlots(ins[0]['id'], session_path=self.session_path, one=self.one) 1a
471 _ = plot_task.run(collection=str(probe_out_path.relative_to(self.session_path))) 1a
472 self.plot_tasks.append(plot_task) 1a
474 resolved = ins[0].get('json', {'temp': 0}).get('extended_qc', {'temp': 0}). \ 1a
475 get('alignment_resolved', False)
476 if resolved: 1a
477 chns = np.load(probe_out_path.joinpath('channels.localCoordinates.npy'))
478 out = get_aligned_channels(ins[0], chns, one=self.one, save_dir=probe_out_path)
479 out_files.extend(out)
481 except Exception:
482 _logger.error(traceback.format_exc())
483 self.status = -1
484 continue
485 probe_files = spikes.probes_description(self.session_path, one=self.one) 1a
486 return out_files + probe_files 1a
488 def get_signatures(self, probes=None, **kwargs):
489 """
490 This transforms all wildcards in collection to exact match
491 :param probes:
492 :return:
493 """
495 neuropixel_version = spikeglx.get_neuropixel_version_from_folder(self.session_path) 1ag
496 probes = probes or spikeglx.get_probes_from_folder(self.session_path) 1ag
498 full_input_files = [] 1ag
499 for sig in self.signature['input_files']: 1ag
500 if 'raw_ephys_data*' in sig[1]: 1ag
501 if neuropixel_version != '3A': 1ag
502 full_input_files.append((sig[0], 'raw_ephys_data', sig[2])) 1ag
503 for probe in probes: 1ag
504 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1ag
506 elif 'raw_ephys_data/probe*' in sig[1]: 1ag
507 for probe in probes: 1ag
508 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1ag
509 elif 'raw_ephys_data' in sig[1]: 1ag
510 if neuropixel_version != '3A': 1ag
511 full_input_files.append((sig[0], 'raw_ephys_data', sig[2])) 1ag
512 else:
513 full_input_files.append(sig) 1ag
515 self.input_files = full_input_files 1ag
517 full_output_files = [] 1ag
518 for sig in self.signature['output_files']: 1ag
519 if 'probe*' in sig[1]: 1ag
520 for probe in probes: 1ag
521 col = sig[1].split('/')[:-1] + [probe] 1ag
522 full_output_files.append((sig[0], '/'.join(col), sig[2])) 1ag
523 else:
524 full_input_files.append(sig)
526 self.output_files = full_output_files 1ag
529class EphysVideoCompress(tasks.Task):
530 priority = 90
531 level = 0
532 force = False
533 job_size = 'large'
534 io_charge = 100
536 signature = {
537 'input_files': [('_iblrig_*Camera.raw.*', 'raw_video_data', True)],
538 'output_files': [('_iblrig_*Camera.raw.mp4', 'raw_video_data', True)]
539 }
541 def _run(self, **kwargs):
542 # avi to mp4 compression
543 command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 17 ' 1dac
544 '-loglevel 0 -codec:a copy {file_out}')
545 output_files = ffmpeg.iblrig_video_compression(self.session_path, command) 1dac
547 if len(output_files) == 0: 1dac
548 _logger.info('No compressed videos found')
549 return
551 return output_files 1dac
553 def get_signatures(self, **kwargs):
554 # need to detect the number of cameras
555 output_files = Path(self.session_path).joinpath('raw_video_data').glob('*') 1damc
556 labels = {label_from_path(x) for x in output_files} 1damc
558 full_input_files = [] 1damc
559 for sig in self.signature['input_files']: 1damc
560 for label in labels: 1damc
561 full_input_files.append((sig[0].replace('*Camera', f'{label}Camera'), sig[1], sig[2])) 1damc
563 self.input_files = full_input_files 1damc
565 full_output_files = [] 1damc
566 for sig in self.signature['output_files']: 1damc
567 for label in labels: 1damc
568 full_output_files.append((sig[0].replace('*Camera', f'{label}Camera'), sig[1], sig[2])) 1damc
570 self.output_files = full_output_files 1damc
573class EphysVideoSyncQc(tasks.Task):
574 priority = 40
575 level = 2
576 force = True
577 signature = {
578 'input_files': [('_iblrig_*Camera.raw.mp4', 'raw_video_data', True),
579 ('_iblrig_*Camera.timestamps.ssv', 'raw_video_data', False),
580 ('_iblrig_*Camera.timestamps.npy', 'raw_video_data', False),
581 ('_iblrig_*Camera.frameData.bin', 'raw_video_data', False),
582 ('_iblrig_*Camera.GPIO.bin', 'raw_video_data', False),
583 ('_iblrig_*Camera.frame_counter.bin', 'raw_video_data', False),
584 ('_iblrig_taskData.raw.*', 'raw_behavior_data', True),
585 ('_iblrig_taskSettings.raw.*', 'raw_behavior_data', True),
586 ('_spikeglx_sync.channels.*', 'raw_ephys_data*', True),
587 ('_spikeglx_sync.polarities.*', 'raw_ephys_data*', True),
588 ('_spikeglx_sync.times.*', 'raw_ephys_data*', True),
589 ('*wheel.position.npy', 'alf', False),
590 ('*wheel.timestamps.npy', 'alf', False),
591 ('*wiring.json', 'raw_ephys_data*', False),
592 ('*.meta', 'raw_ephys_data*', True)],
594 'output_files': [('_ibl_*Camera.times.npy', 'alf', True)]
595 }
597 def _run(self, **kwargs):
599 mp4_files = self.session_path.joinpath('raw_video_data').rglob('*.mp4') 1dac
600 labels = [label_from_path(x) for x in mp4_files] 1dac
601 # Video timestamps extraction
602 output_files = [] 1dac
603 data, files = camera.extract_all(self.session_path, save=True, labels=labels) 1dac
604 output_files.extend(files) 1dac
606 # Video QC
607 run_camera_qc(self.session_path, update=True, one=self.one, cameras=labels) 1dac
609 return output_files 1dac
611 def get_signatures(self, **kwargs):
612 neuropixel_version = spikeglx.get_neuropixel_version_from_folder(self.session_path) 1dajc
613 probes = spikeglx.get_probes_from_folder(self.session_path) 1dajc
614 # need to detect the number of cameras
615 output_files = Path(self.session_path).joinpath('raw_video_data').rglob('*') 1dajc
616 labels = np.unique([label_from_path(x) for x in output_files]) 1dajc
618 full_input_files = [] 1dajc
619 for sig in self.signature['input_files']: 1dajc
620 if 'raw_ephys_data*' in sig[1]: 1dajc
621 if neuropixel_version != '3A': 1dajc
622 full_input_files.append((sig[0], 'raw_ephys_data', sig[2])) 1daj
623 for probe in probes: 1dajc
624 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1daj
625 elif 'Camera' in sig[0]: 1dajc
626 for lab in labels: 1dajc
627 full_input_files.append((sig[0].replace('*Camera', f'{lab}Camera'), sig[1], sig[2])) 1dajc
628 else:
629 full_input_files.append((sig[0], sig[1], sig[2])) 1dajc
631 self.input_files = full_input_files 1dajc
633 full_output_files = [] 1dajc
634 for sig in self.signature['output_files']: 1dajc
635 if 'raw_ephys_data*' in sig[1]: 1dajc
636 if neuropixel_version != '3A':
637 full_output_files.append((sig[0], 'raw_ephys_data', sig[2]))
638 for probe in probes:
639 full_output_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2]))
640 elif 'Camera' in sig[0]: 1dajc
641 for lab in labels: 1dajc
642 full_output_files.append((sig[0].replace('*Camera', f'{lab}Camera'), sig[1], sig[2])) 1dajc
643 else:
644 full_output_files.append((sig[0], sig[1], sig[2]))
646 self.output_files = full_output_files 1dajc
649# level 1
650class EphysTrials(tasks.Task):
651 priority = 90
652 level = 1
653 force = False
654 signature = {
655 'input_files': [('_iblrig_taskData.raw.*', 'raw_behavior_data', True),
656 ('_iblrig_taskSettings.raw.*', 'raw_behavior_data', True),
657 ('_spikeglx_sync.channels.*', 'raw_ephys_data*', True),
658 ('_spikeglx_sync.polarities.*', 'raw_ephys_data*', True),
659 ('_spikeglx_sync.times.*', 'raw_ephys_data*', True),
660 ('_iblrig_encoderEvents.raw*', 'raw_behavior_data', True),
661 ('_iblrig_encoderPositions.raw*', 'raw_behavior_data', True),
662 ('*wiring.json', 'raw_ephys_data*', False),
663 ('*.meta', 'raw_ephys_data*', True)],
664 'output_files': [('*trials.table.pqt', 'alf', True),
665 ('*trials.goCueTrigger_times.npy', 'alf', True),
666 ('*trials.intervals_bpod.npy', 'alf', True),
667 ('*trials.stimOff_times.npy', 'alf', True),
668 ('*trials.quiescencePeriod.npy', 'alf', True),
669 ('*wheel.position.npy', 'alf', True),
670 ('*wheel.timestamps.npy', 'alf', True),
671 ('*wheelMoves.intervals.npy', 'alf', True),
672 ('*wheelMoves.peakAmplitude.npy', 'alf', True)]
673 }
675 def _behaviour_criterion(self):
676 """
677 Computes and update the behaviour criterion on Alyx
678 """
679 from brainbox.behavior import training 1a
681 trials = alfio.load_object(self.session_path.joinpath("alf"), "trials") 1a
682 good_enough = training.criterion_delay( 1a
683 n_trials=trials["intervals"].shape[0],
684 perf_easy=training.compute_performance_easy(trials),
685 )
686 eid = self.one.path2eid(self.session_path, query_type='remote') 1a
687 self.one.alyx.json_field_update( 1a
688 "sessions", eid, "extended_qc", {"behavior": int(good_enough)}
689 )
691 def _extract_behaviour(self):
692 dsets, out_files = ephys_fpga.extract_all(self.session_path, save=True) 1ai
694 return dsets, out_files 1ai
696 def _run(self, plot_qc=True):
697 dsets, out_files = self._extract_behaviour() 1ai
699 if not self.one or self.one.offline: 1ai
700 return out_files 1i
702 self._behaviour_criterion() 1a
703 # Run the task QC
704 qc = TaskQC(self.session_path, one=self.one, log=_logger) 1a
705 qc.extractor = TaskQCExtractor(self.session_path, lazy=True, one=qc.one) 1a
706 # Extract extra datasets required for QC
707 qc.extractor.data = dsets 1a
708 qc.extractor.extract_data() 1a
709 # Aggregate and update Alyx QC fields
710 qc.run(update=True) 1a
712 if plot_qc: 1a
713 _logger.info("Creating Trials QC plots") 1a
714 try: 1a
715 session_id = self.one.path2eid(self.session_path) 1a
716 plot_task = BehaviourPlots(session_id, self.session_path, one=self.one) 1a
717 _ = plot_task.run() 1a
718 self.plot_tasks.append(plot_task) 1a
719 except Exception:
720 _logger.error('Could not create Trials QC Plot')
721 _logger.error(traceback.format_exc())
722 self.status = -1
724 return out_files 1a
726 def get_signatures(self, **kwargs):
727 neuropixel_version = spikeglx.get_neuropixel_version_from_folder(self.session_path) 1ani
728 probes = spikeglx.get_probes_from_folder(self.session_path) 1ani
730 full_input_files = [] 1ani
731 for sig in self.signature['input_files']: 1ani
732 if 'raw_ephys_data*' in sig[1]: 1ani
733 if neuropixel_version != '3A': 1ani
734 full_input_files.append((sig[0], 'raw_ephys_data', sig[2])) 1ani
735 for probe in probes: 1ani
736 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1ani
737 else:
738 full_input_files.append(sig) 1ani
740 self.input_files = full_input_files 1ani
742 self.output_files = self.signature['output_files'] 1ani
745class LaserTrialsLegacy(EphysTrials):
746 """This is the legacy extractor for Guido's ephys optogenetic stimulation protocol.
748 This is legacy because personal project extractors should be in a separate repository.
749 """
750 def _extract_behaviour(self):
751 dsets, out_files = super()._extract_behaviour() 1i
753 # Re-extract the laser datasets as the above default extractor discards them
754 from ibllib.io.extractors import opto_trials 1i
755 laser = opto_trials.LaserBool(self.session_path) 1i
756 dsets_laser, out_files_laser = laser.extract(save=True) 1i
757 dsets.update({k: v for k, v in zip(laser.var_names, dsets_laser)}) 1i
758 out_files.extend(out_files_laser) 1i
759 return dsets, out_files 1i
762class EphysCellsQc(tasks.Task):
763 priority = 90
764 level = 3
765 force = False
767 signature = {
768 'input_files': [('spikes.times.npy', 'alf/probe*', True),
769 ('spikes.clusters.npy', 'alf/probe*', True),
770 ('spikes.amps.npy', 'alf/probe*', True),
771 ('spikes.depths.npy', 'alf/probe*', True),
772 ('clusters.channels.npy', 'alf/probe*', True)],
773 'output_files': [('clusters.metrics.pqt', 'alf/probe*', True)]
774 }
776 def _compute_cell_qc(self, folder_probe):
777 """
778 Computes the cell QC given an extracted probe alf path
779 :param folder_probe: folder
780 :return:
781 """
782 # compute the straight qc
783 _logger.info(f"Computing cluster qc for {folder_probe}") 1a
784 spikes = alfio.load_object(folder_probe, 'spikes') 1a
785 clusters = alfio.load_object(folder_probe, 'clusters') 1a
786 df_units, drift = ephysqc.spike_sorting_metrics( 1a
787 spikes.times, spikes.clusters, spikes.amps, spikes.depths,
788 cluster_ids=np.arange(clusters.channels.size))
789 # if the ks2 labels file exist, load them and add the column
790 file_labels = folder_probe.joinpath('cluster_KSLabel.tsv') 1a
791 if file_labels.exists(): 1a
792 ks2_labels = pd.read_csv(file_labels, sep='\t')
793 ks2_labels.rename(columns={'KSLabel': 'ks2_label'}, inplace=True)
794 df_units = pd.concat(
795 [df_units, ks2_labels['ks2_label'].reindex(df_units.index)], axis=1)
796 # save as parquet file
797 df_units.to_parquet(folder_probe.joinpath("clusters.metrics.pqt")) 1a
798 return folder_probe.joinpath("clusters.metrics.pqt"), df_units, drift 1a
800 def _label_probe_qc(self, folder_probe, df_units, drift):
801 """
802 Labels the json field of the alyx corresponding probe insertion
803 :param folder_probe:
804 :param df_units:
805 :param drift:
806 :return:
807 """
808 eid = self.one.path2eid(self.session_path, query_type='remote') 1a
809 # the probe name is the first folder after alf: {session_path}/alf/{probe_name}/{spike_sorter_name}
810 probe_name = Path(folder_probe).relative_to(self.session_path.joinpath('alf')).parts[0] 1a
811 pdict = self.one.alyx.rest('insertions', 'list', session=eid, name=probe_name, no_cache=True) 1a
812 if len(pdict) != 1: 1a
813 _logger.warning(f'No probe found for probe name: {probe_name}')
814 return
815 isok = df_units['label'] == 1 1a
816 qcdict = {'n_units': int(df_units.shape[0]), 1a
817 'n_units_qc_pass': int(np.sum(isok)),
818 'firing_rate_max': np.max(df_units['firing_rate'][isok]),
819 'firing_rate_median': np.median(df_units['firing_rate'][isok]),
820 'amplitude_max_uV': np.max(df_units['amp_max'][isok]) * 1e6,
821 'amplitude_median_uV': np.max(df_units['amp_median'][isok]) * 1e6,
822 'drift_rms_um': rms(drift['drift_um']),
823 }
824 file_wm = folder_probe.joinpath('_kilosort_whitening.matrix.npy') 1a
825 if file_wm.exists(): 1a
826 wm = np.load(file_wm) 1a
827 qcdict['whitening_matrix_conditioning'] = np.linalg.cond(wm) 1a
828 # groom qc dict (this function will eventually go directly into the json field update)
829 for k in qcdict: 1a
830 if isinstance(qcdict[k], np.int64): 1a
831 qcdict[k] = int(qcdict[k])
832 elif isinstance(qcdict[k], float): 1a
833 qcdict[k] = np.round(qcdict[k], 2) 1a
834 self.one.alyx.json_field_update("insertions", pdict[0]["id"], "json", qcdict) 1a
836 def _run(self):
837 """
838 Post spike-sorting quality control at the cluster level.
839 Outputs a QC table in the clusters ALF object and labels corresponding probes in Alyx
840 """
841 files_spikes = Path(self.session_path).joinpath('alf').rglob('spikes.times.npy') 1a
842 folder_probes = [f.parent for f in files_spikes] 1a
843 out_files = [] 1a
844 for folder_probe in folder_probes: 1a
845 try: 1a
846 qc_file, df_units, drift = self._compute_cell_qc(folder_probe) 1a
847 out_files.append(qc_file) 1a
848 self._label_probe_qc(folder_probe, df_units, drift) 1a
849 except Exception:
850 _logger.error(traceback.format_exc())
851 self.status = -1
852 continue
853 return out_files 1a
855 def get_signatures(self, **kwargs):
856 files_spikes = Path(self.session_path).joinpath('alf').rglob('spikes.times.npy') 1ao
857 folder_probes = [f.parent for f in files_spikes] 1ao
859 full_input_files = [] 1ao
860 for sig in self.signature['input_files']: 1ao
861 for folder in folder_probes: 1ao
862 full_input_files.append((sig[0], str(folder.relative_to(self.session_path)), sig[2])) 1ao
864 self.input_files = full_input_files 1ao
866 full_output_files = [] 1ao
867 for sig in self.signature['output_files']: 1ao
868 for folder in folder_probes: 1ao
869 full_output_files.append((sig[0], str(folder.relative_to(self.session_path)), sig[2])) 1ao
871 self.output_files = full_output_files 1ao
874class EphysMtscomp(tasks.Task):
875 priority = 50 # ideally after spike sorting
876 level = 0
877 force = False
878 signature = {
879 'input_files': [('*ap.meta', 'raw_ephys_data/probe*', True),
880 ('*ap.*bin', 'raw_ephys_data/probe*', True),
881 ('*lf.meta', 'raw_ephys_data/probe*', False), # NP2 doesn't have lf files
882 ('*lf.*bin', 'raw_ephys_data/probe*', False), # NP2 doesn't have lf files
883 ('*nidq.meta', 'raw_ephys_data', True),
884 ('*nidq.*bin', 'raw_ephys_data', True)],
885 'output_files': [('*ap.meta', 'raw_ephys_data/probe*', True),
886 ('*ap.cbin', 'raw_ephys_data/probe*', False), # may not be present on local server anymore
887 ('*ap.ch', 'raw_ephys_data/probe*', True),
888 ('*lf.meta', 'raw_ephys_data/probe*', False), # NP2 doesn't have lf files # TODO detect from meta
889 ('*lf.cbin', 'raw_ephys_data/probe*', False), # may not be present on local server anymore
890 ('*lf.ch', 'raw_ephys_data/probe*', False),
891 ('*nidq.meta', 'raw_ephys_data', True),
892 ('*nidq.cbin', 'raw_ephys_data', False), # may not be present on local server anymore
893 ('*nidq.ch', 'raw_ephys_data', True)]
894 }
896 def _run(self):
897 """
898 Compress ephys files looking for `compress_ephys.flag` within the probes folder
899 Original bin file will be removed
900 The registration flag created contains targeted file names at the root of the session
901 """
903 out_files = [] 1efa
904 ephys_files = spikeglx.glob_ephys_files(self.session_path) 1efa
905 ephys_files += spikeglx.glob_ephys_files(self.session_path, ext="ch") 1efa
906 ephys_files += spikeglx.glob_ephys_files(self.session_path, ext="meta") 1efa
908 for ef in ephys_files: 1efa
909 for typ in ["ap", "lf", "nidq"]: 1efa
910 bin_file = ef.get(typ) 1efa
911 if not bin_file: 1efa
912 continue 1efa
913 if bin_file.suffix.find("bin") == 1: 1efa
914 with spikeglx.Reader(bin_file) as sr: 1e
915 if sr.is_mtscomp: 1e
916 out_files.append(bin_file)
917 else:
918 _logger.info(f"Compressing binary file {bin_file}") 1e
919 out_files.append(sr.compress_file(keep_original=False)) 1e
920 out_files.append(bin_file.with_suffix('.ch')) 1e
921 else:
922 out_files.append(bin_file) 1efa
924 return out_files 1efa
926 def get_signatures(self, **kwargs):
927 neuropixel_version = spikeglx.get_neuropixel_version_from_folder(self.session_path) 1efak
928 probes = spikeglx.get_probes_from_folder(self.session_path) 1efak
930 full_input_files = [] 1efak
931 for sig in self.signature['input_files']: 1efak
932 if 'raw_ephys_data/probe*' in sig[1]: 1efak
933 for probe in probes: 1efak
934 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1efak
935 else:
936 if neuropixel_version != '3A': 1efak
937 full_input_files.append((sig[0], sig[1], sig[2])) 1fak
939 self.input_files = full_input_files 1efak
941 full_output_files = [] 1efak
942 for sig in self.signature['output_files']: 1efak
943 if 'raw_ephys_data/probe*' in sig[1]: 1efak
944 for probe in probes: 1efak
945 full_output_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1efak
946 else:
947 if neuropixel_version != '3A': 1efak
948 full_output_files.append((sig[0], sig[1], sig[2])) 1fak
950 self.output_files = full_output_files 1efak
953class EphysDLC(tasks.Task):
954 """
955 This task relies on a correctly installed dlc environment as per
956 https://docs.google.com/document/d/1g0scP6_3EmaXCU4SsDNZWwDTaD9MG0es_grLA-d0gh0/edit#
958 If your environment is set up otherwise, make sure that you set the respective attributes:
959 t = EphysDLC(session_path)
960 t.dlcenv = Path('/path/to/your/dlcenv/bin/activate')
961 t.scripts = Path('/path/to/your/iblscripts/deploy/serverpc/dlc')
962 """
963 gpu = 1
964 cpu = 4
965 io_charge = 100
966 level = 2
967 force = True
968 job_size = 'large'
970 dlcenv = Path.home().joinpath('Documents', 'PYTHON', 'envs', 'dlcenv', 'bin', 'activate')
971 scripts = Path.home().joinpath('Documents', 'PYTHON', 'iblscripts', 'deploy', 'serverpc', 'dlc')
972 signature = {
973 'input_files': [
974 ('_iblrig_leftCamera.raw.mp4', 'raw_video_data', True),
975 ('_iblrig_rightCamera.raw.mp4', 'raw_video_data', True),
976 ('_iblrig_bodyCamera.raw.mp4', 'raw_video_data', True),
977 ],
978 'output_files': [
979 ('_ibl_leftCamera.dlc.pqt', 'alf', True),
980 ('_ibl_rightCamera.dlc.pqt', 'alf', True),
981 ('_ibl_bodyCamera.dlc.pqt', 'alf', True),
982 ('leftCamera.ROIMotionEnergy.npy', 'alf', True),
983 ('rightCamera.ROIMotionEnergy.npy', 'alf', True),
984 ('bodyCamera.ROIMotionEnergy.npy', 'alf', True),
985 ('leftROIMotionEnergy.position.npy', 'alf', True),
986 ('rightROIMotionEnergy.position.npy', 'alf', True),
987 ('bodyROIMotionEnergy.position.npy', 'alf', True),
988 ],
989 }
991 def _check_dlcenv(self):
992 """Check that scripts are present, dlcenv can be activated and get iblvideo version"""
993 assert len(list(self.scripts.rglob('run_dlc.*'))) == 2, \
994 f'Scripts run_dlc.sh and run_dlc.py do not exist in {self.scripts}'
995 assert len(list(self.scripts.rglob('run_motion.*'))) == 2, \
996 f'Scripts run_motion.sh and run_motion.py do not exist in {self.scripts}'
997 assert self.dlcenv.exists(), f"DLC environment does not exist in assumed location {self.dlcenv}"
998 command2run = f"source {self.dlcenv}; python -c 'import iblvideo; print(iblvideo.__version__)'"
999 process = subprocess.Popen(
1000 command2run,
1001 shell=True,
1002 stdout=subprocess.PIPE,
1003 stderr=subprocess.PIPE,
1004 executable="/bin/bash"
1005 )
1006 info, error = process.communicate()
1007 if process.returncode != 0:
1008 raise AssertionError(f"DLC environment check failed\n{error.decode('utf-8')}")
1009 version = info.decode("utf-8").strip().split('\n')[-1]
1010 return version
1012 @staticmethod
1013 def _video_intact(file_mp4):
1014 """Checks that the downloaded video can be opened and is not empty"""
1015 cap = cv2.VideoCapture(str(file_mp4))
1016 frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
1017 intact = True if frame_count > 0 else False
1018 cap.release()
1019 return intact
1021 def _run(self, cams=None, overwrite=False):
1022 # Default to all three cams
1023 cams = cams or ['left', 'right', 'body'] 1a
1024 cams = assert_valid_label(cams) 1a
1025 # Set up
1026 self.session_id = self.one.path2eid(self.session_path) 1a
1027 actual_outputs = [] 1a
1029 # Loop through cams
1030 for cam in cams: 1a
1031 # Catch exceptions so that following cameras can still run
1032 try: 1a
1033 # If all results exist and overwrite is False, skip computation
1034 expected_outputs_present, expected_outputs = self.assert_expected(self.output_files, silent=True) 1a
1035 if overwrite is False and expected_outputs_present is True: 1a
1036 actual_outputs.extend(expected_outputs) 1a
1037 return actual_outputs 1a
1038 else:
1039 file_mp4 = next(self.session_path.joinpath('raw_video_data').glob(f'_iblrig_{cam}Camera.raw*.mp4'))
1040 if not file_mp4.exists():
1041 # In this case we set the status to Incomplete.
1042 _logger.error(f"No raw video file available for {cam}, skipping.")
1043 self.status = -3
1044 continue
1045 if not self._video_intact(file_mp4):
1046 _logger.error(f"Corrupt raw video file {file_mp4}")
1047 self.status = -1
1048 continue
1049 # Check that dlc environment is ok, shell scripts exists, and get iblvideo version, GPU addressable
1050 self.version = self._check_dlcenv()
1051 _logger.info(f'iblvideo version {self.version}')
1052 check_nvidia_driver()
1054 _logger.info(f'Running DLC on {cam}Camera.')
1055 command2run = f"{self.scripts.joinpath('run_dlc.sh')} {str(self.dlcenv)} {file_mp4} {overwrite}"
1056 _logger.info(command2run)
1057 process = subprocess.Popen(
1058 command2run,
1059 shell=True,
1060 stdout=subprocess.PIPE,
1061 stderr=subprocess.PIPE,
1062 executable="/bin/bash",
1063 )
1064 info, error = process.communicate()
1065 # info_str = info.decode("utf-8").strip()
1066 # _logger.info(info_str)
1067 if process.returncode != 0:
1068 error_str = error.decode("utf-8").strip()
1069 _logger.error(f'DLC failed for {cam}Camera.\n\n'
1070 f'++++++++ Output of subprocess for debugging ++++++++\n\n'
1071 f'{error_str}\n'
1072 f'++++++++++++++++++++++++++++++++++++++++++++\n')
1073 self.status = -1
1074 # We dont' run motion energy, or add any files if dlc failed to run
1075 continue
1076 dlc_result = next(self.session_path.joinpath('alf').glob(f'_ibl_{cam}Camera.dlc*.pqt'))
1077 actual_outputs.append(dlc_result)
1079 _logger.info(f'Computing motion energy for {cam}Camera')
1080 command2run = f"{self.scripts.joinpath('run_motion.sh')} {str(self.dlcenv)} {file_mp4} {dlc_result}"
1081 _logger.info(command2run)
1082 process = subprocess.Popen(
1083 command2run,
1084 shell=True,
1085 stdout=subprocess.PIPE,
1086 stderr=subprocess.PIPE,
1087 executable="/bin/bash",
1088 )
1089 info, error = process.communicate()
1090 # info_str = info.decode("utf-8").strip()
1091 # _logger.info(info_str)
1092 if process.returncode != 0:
1093 error_str = error.decode("utf-8").strip()
1094 _logger.error(f'Motion energy failed for {cam}Camera.\n\n'
1095 f'++++++++ Output of subprocess for debugging ++++++++\n\n'
1096 f'{error_str}\n'
1097 f'++++++++++++++++++++++++++++++++++++++++++++\n')
1098 self.status = -1
1099 continue
1100 actual_outputs.append(next(self.session_path.joinpath('alf').glob(
1101 f'{cam}Camera.ROIMotionEnergy*.npy')))
1102 actual_outputs.append(next(self.session_path.joinpath('alf').glob(
1103 f'{cam}ROIMotionEnergy.position*.npy')))
1104 except BaseException:
1105 _logger.error(traceback.format_exc())
1106 self.status = -1
1107 continue
1108 # If status is Incomplete, check that there is at least one output.
1109 # # Otherwise make sure it gets set to Empty (outputs = None), and set status to -1 to make sure it doesn't slip
1110 if self.status == -3 and len(actual_outputs) == 0:
1111 actual_outputs = None
1112 self.status = -1
1113 return actual_outputs
1116class EphysPostDLC(tasks.Task):
1117 """
1118 The post_dlc task takes dlc traces as input and computes useful quantities, as well as qc.
1119 """
1120 io_charge = 90
1121 level = 3
1122 force = True
1123 signature = {'input_files': [('_ibl_leftCamera.dlc.pqt', 'alf', True),
1124 ('_ibl_bodyCamera.dlc.pqt', 'alf', True),
1125 ('_ibl_rightCamera.dlc.pqt', 'alf', True),
1126 ('_ibl_rightCamera.times.npy', 'alf', True),
1127 ('_ibl_leftCamera.times.npy', 'alf', True),
1128 ('_ibl_bodyCamera.times.npy', 'alf', True),
1129 # the following are required for the DLC plot only
1130 # they are not strictly required, some plots just might be skipped
1131 # In particular the raw videos don't need to be downloaded as they can be streamed
1132 ('_iblrig_bodyCamera.raw.mp4', 'raw_video_data', True),
1133 ('_iblrig_leftCamera.raw.mp4', 'raw_video_data', True),
1134 ('_iblrig_rightCamera.raw.mp4', 'raw_video_data', True),
1135 ('rightROIMotionEnergy.position.npy', 'alf', False),
1136 ('leftROIMotionEnergy.position.npy', 'alf', False),
1137 ('bodyROIMotionEnergy.position.npy', 'alf', False),
1138 ('_ibl_trials.table.pqt', 'alf', True),
1139 ('_ibl_wheel.position.npy', 'alf', True),
1140 ('_ibl_wheel.timestamps.npy', 'alf', True),
1141 ],
1142 # More files are required for all panels of the DLC QC plot to function
1143 'output_files': [('_ibl_leftCamera.features.pqt', 'alf', True),
1144 ('_ibl_rightCamera.features.pqt', 'alf', True),
1145 ('licks.times.npy', 'alf', True),
1146 # ('dlc_qc_plot.png', 'snapshot', False)
1147 ]
1148 }
1150 def _run(self, overwrite=True, run_qc=True, plot_qc=True):
1151 """
1152 Run the EphysPostDLC task. Returns a list of file locations for the output files in signature. The created plot
1153 (dlc_qc_plot.png) is not returned, but saved in session_path/snapshots and uploaded to Alyx as a note.
1155 :param overwrite: bool, whether to recompute existing output files (default is False).
1156 Note that the dlc_qc_plot will be (re-)computed even if overwrite = False
1157 :param run_qc: bool, whether to run the DLC QC (default is True)
1158 :param plot_qc: book, whether to create the dlc_qc_plot (default is True)
1160 """
1161 # Check if output files exist locally
1162 exist, output_files = self.assert_expected(self.signature['output_files'], silent=True) 1ba
1163 if exist and not overwrite: 1ba
1164 _logger.warning('EphysPostDLC outputs exist and overwrite=False, skipping computations of outputs.')
1165 else:
1166 if exist and overwrite: 1ba
1167 _logger.warning('EphysPostDLC outputs exist and overwrite=True, overwriting existing outputs.')
1168 # Find all available dlc files
1169 dlc_files = list(Path(self.session_path).joinpath('alf').glob('_ibl_*Camera.dlc.*')) 1ba
1170 for dlc_file in dlc_files: 1ba
1171 _logger.debug(dlc_file) 1ba
1172 output_files = [] 1ba
1173 combined_licks = [] 1ba
1175 for dlc_file in dlc_files: 1ba
1176 # Catch unforeseen exceptions and move on to next cam
1177 try: 1ba
1178 cam = label_from_path(dlc_file) 1ba
1179 # load dlc trace and camera times
1180 dlc = pd.read_parquet(dlc_file) 1ba
1181 dlc_thresh = likelihood_threshold(dlc, 0.9) 1ba
1182 # try to load respective camera times
1183 try: 1ba
1184 dlc_t = np.load(next(Path(self.session_path).joinpath('alf').glob(f'_ibl_{cam}Camera.times.*npy'))) 1ba
1185 times = True 1ba
1186 if dlc_t.shape[0] == 0: 1ba
1187 _logger.error(f'camera.times empty for {cam} camera. '
1188 f'Computations using camera.times will be skipped')
1189 self.status = -1
1190 times = False
1191 elif dlc_t.shape[0] < len(dlc_thresh): 1ba
1192 _logger.error(f'Camera times shorter than DLC traces for {cam} camera. '
1193 f'Computations using camera.times will be skipped')
1194 self.status = -1
1195 times = 'short'
1196 except StopIteration:
1197 self.status = -1
1198 times = False
1199 _logger.error(f'No camera.times for {cam} camera. '
1200 f'Computations using camera.times will be skipped')
1201 # These features are only computed from left and right cam
1202 if cam in ('left', 'right'): 1ba
1203 features = pd.DataFrame() 1ba
1204 # If camera times are available, get the lick time stamps for combined array
1205 if times is True: 1ba
1206 _logger.info(f"Computing lick times for {cam} camera.") 1ba
1207 combined_licks.append(get_licks(dlc_thresh, dlc_t)) 1ba
1208 elif times is False:
1209 _logger.warning(f"Skipping lick times for {cam} camera as no camera.times available")
1210 elif times == 'short':
1211 _logger.warning(f"Skipping lick times for {cam} camera as camera.times are too short")
1212 # Compute pupil diameter, raw and smoothed
1213 _logger.info(f"Computing raw pupil diameter for {cam} camera.") 1ba
1214 features['pupilDiameter_raw'] = get_pupil_diameter(dlc_thresh) 1ba
1215 try: 1ba
1216 _logger.info(f"Computing smooth pupil diameter for {cam} camera.") 1ba
1217 features['pupilDiameter_smooth'] = get_smooth_pupil_diameter(features['pupilDiameter_raw'], 1ba
1218 cam)
1219 except BaseException:
1220 _logger.error(f"Computing smooth pupil diameter for {cam} camera failed, saving all NaNs.")
1221 _logger.error(traceback.format_exc())
1222 features['pupilDiameter_smooth'] = np.nan
1223 # Safe to pqt
1224 features_file = Path(self.session_path).joinpath('alf', f'_ibl_{cam}Camera.features.pqt') 1ba
1225 features.to_parquet(features_file) 1ba
1226 output_files.append(features_file) 1ba
1228 # For all cams, compute DLC qc if times available
1229 if run_qc is True and times in [True, 'short']: 1ba
1230 # Setting download_data to False because at this point the data should be there
1231 qc = DlcQC(self.session_path, side=cam, one=self.one, download_data=False) 1a
1232 qc.run(update=True) 1a
1233 else:
1234 if times is False:
1235 _logger.warning(f"Skipping QC for {cam} camera as no camera.times available")
1236 if not run_qc:
1237 _logger.warning(f"Skipping QC for {cam} camera as run_qc=False")
1239 except BaseException:
1240 _logger.error(traceback.format_exc())
1241 self.status = -1
1242 continue
1244 # Combined lick times
1245 if len(combined_licks) > 0: 1ba
1246 lick_times_file = Path(self.session_path).joinpath('alf', 'licks.times.npy') 1ba
1247 np.save(lick_times_file, sorted(np.concatenate(combined_licks))) 1ba
1248 output_files.append(lick_times_file) 1ba
1249 else:
1250 _logger.warning("No lick times computed for this session.")
1252 if plot_qc: 1ba
1253 _logger.info("Creating DLC QC plot") 1a
1254 try: 1a
1255 session_id = self.one.path2eid(self.session_path) 1a
1256 fig_path = self.session_path.joinpath('snapshot', 'dlc_qc_plot.png') 1a
1257 if not fig_path.parent.exists(): 1a
1258 fig_path.parent.mkdir(parents=True, exist_ok=True)
1259 fig = dlc_qc_plot(self.session_path, one=self.one) 1a
1260 fig.savefig(fig_path) 1a
1261 fig.clf() 1a
1262 snp = ReportSnapshot(self.session_path, session_id, one=self.one) 1a
1263 snp.outputs = [fig_path] 1a
1264 snp.register_images(widths=['orig'], 1a
1265 function=str(dlc_qc_plot.__module__) + '.' + str(dlc_qc_plot.__name__))
1266 except BaseException:
1267 _logger.error('Could not create and/or upload DLC QC Plot')
1268 _logger.error(traceback.format_exc())
1269 self.status = -1
1271 return output_files 1ba
1274class EphysPassive(tasks.Task):
1275 cpu = 1
1276 io_charge = 90
1277 level = 1
1278 force = False
1279 signature = {
1280 'input_files': [('_iblrig_taskSettings.raw*', 'raw_behavior_data', True),
1281 ('_spikeglx_sync.channels.*', 'raw_ephys_data*', True),
1282 ('_spikeglx_sync.polarities.*', 'raw_ephys_data*', True),
1283 ('_spikeglx_sync.times.*', 'raw_ephys_data*', True),
1284 ('*.meta', 'raw_ephys_data*', True),
1285 ('*wiring.json', 'raw_ephys_data*', False),
1286 ('_iblrig_RFMapStim.raw*', 'raw_passive_data', True)],
1287 'output_files': [('_ibl_passiveGabor.table.csv', 'alf', True),
1288 ('_ibl_passivePeriods.intervalsTable.csv', 'alf', True),
1289 ('_ibl_passiveRFM.times.npy', 'alf', True),
1290 ('_ibl_passiveStims.table.csv', 'alf', True)]}
1292 def _run(self):
1293 """returns a list of pathlib.Paths. """
1294 data, paths = ephys_passive.PassiveChoiceWorld(self.session_path).extract(save=True) 1a
1295 if any([x is None for x in paths]):
1296 self.status = -1
1297 # Register?
1298 return paths
1300 def get_signatures(self, **kwargs):
1301 neuropixel_version = spikeglx.get_neuropixel_version_from_folder(self.session_path) 1ap
1302 probes = spikeglx.get_probes_from_folder(self.session_path) 1ap
1304 full_input_files = [] 1ap
1305 for sig in self.signature['input_files']: 1ap
1306 if 'raw_ephys_data*' in sig[1]: 1ap
1307 if neuropixel_version != '3A': 1ap
1308 full_input_files.append((sig[0], 'raw_ephys_data', sig[2])) 1ap
1309 for probe in probes: 1ap
1310 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1ap
1311 else:
1312 full_input_files.append(sig) 1ap
1314 self.input_files = full_input_files 1ap
1316 self.output_files = self.signature['output_files'] 1ap
1319class EphysExtractionPipeline(tasks.Pipeline):
1320 label = __name__
1322 def __init__(self, session_path=None, **kwargs):
1323 super(EphysExtractionPipeline, self).__init__(session_path, **kwargs) 1a
1324 tasks = OrderedDict() 1a
1325 self.session_path = session_path 1a
1326 # level 0
1327 tasks['ExperimentDescriptionRegisterRaw'] = base_tasks.ExperimentDescriptionRegisterRaw(self.session_path) 1a
1328 tasks["EphysRegisterRaw"] = tpp.TrainingRegisterRaw(self.session_path) 1a
1329 tasks["EphysPulses"] = EphysPulses(self.session_path) 1a
1330 tasks["EphysRawQC"] = RawEphysQC(self.session_path) 1a
1331 tasks["EphysAudio"] = EphysAudio(self.session_path) 1a
1332 tasks["EphysMtscomp"] = EphysMtscomp(self.session_path) 1a
1333 tasks['EphysVideoCompress'] = EphysVideoCompress(self.session_path) 1a
1334 # level 1
1335 tasks["SpikeSorting"] = SpikeSorting( 1a
1336 self.session_path, parents=[tasks["EphysMtscomp"], tasks["EphysPulses"]])
1337 tasks["EphysTrials"] = EphysTrials(self.session_path, parents=[tasks["EphysPulses"]]) 1a
1339 tasks["EphysPassive"] = EphysPassive(self.session_path, parents=[tasks["EphysPulses"]]) 1a
1340 # level 2
1341 tasks["EphysVideoSyncQc"] = EphysVideoSyncQc( 1a
1342 self.session_path, parents=[tasks["EphysVideoCompress"], tasks["EphysPulses"], tasks["EphysTrials"]])
1343 tasks["EphysCellsQc"] = EphysCellsQc(self.session_path, parents=[tasks["SpikeSorting"]]) 1a
1344 tasks["EphysDLC"] = EphysDLC(self.session_path, parents=[tasks["EphysVideoCompress"]]) 1a
1345 tasks['EphysTrainingStatus'] = tpp.TrainingStatus(self.session_path, parents=[tasks["EphysTrials"]]) 1a
1346 # level 3
1347 tasks["EphysPostDLC"] = EphysPostDLC(self.session_path, parents=[tasks["EphysDLC"], tasks["EphysTrials"], 1a
1348 tasks["EphysVideoSyncQc"]])
1349 self.tasks = tasks 1a