Coverage for ibllib/pipes/ephys_preprocessing.py: 51%

783 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-08 17:16 +0100

1"""(Deprecated) Electrophysiology data preprocessing tasks. 

2 

3These tasks are part of the old pipeline. This module has been replaced by the `ephys_tasks` module 

4and the dynamic pipeline. 

5""" 

6import logging 

7import re 

8import shutil 

9import subprocess 

10from collections import OrderedDict 

11import traceback 

12from pathlib import Path 

13import warnings 

14 

15import cv2 

16import numpy as np 

17import pandas as pd 

18import packaging.version 

19 

20import one.alf.io as alfio 

21from ibldsp.utils import rms 

22import spikeglx 

23 

24from ibllib.misc import check_nvidia_driver 

25from ibllib.ephys import ephysqc, spikes, sync_probes 

26from ibllib.io import ffmpeg 

27from ibllib.io.video import label_from_path, assert_valid_label 

28from ibllib.io.extractors import ephys_fpga, ephys_passive, camera 

29from ibllib.pipes import tasks, base_tasks 

30import ibllib.pipes.training_preprocessing as tpp 

31from ibllib.pipes.misc import create_alyx_probe_insertions 

32from ibllib.qc.alignment_qc import get_aligned_channels 

33from ibllib.qc.task_extractors import TaskQCExtractor 

34from ibllib.qc.task_metrics import TaskQC 

35from ibllib.qc.camera import run_all_qc as run_camera_qc 

36from ibllib.qc.dlc import DlcQC 

37from ibllib.plots.figures import dlc_qc_plot, BehaviourPlots, LfpPlots, ApPlots, BadChannelsAp 

38from ibllib.plots.figures import SpikeSorting as SpikeSortingPlots 

39from ibllib.plots.snapshot import ReportSnapshot 

40from brainbox.behavior.dlc import likelihood_threshold, get_licks, get_pupil_diameter, get_smooth_pupil_diameter 

41 

42_logger = logging.getLogger('ibllib') 

43warnings.warn('`pipes.ephys_preprocessing` to be removed in favour of dynamic pipeline', FutureWarning) 

44 

45 

46# level 0 

47class EphysPulses(tasks.Task): 

48 """ 

49 Extract Pulses from raw electrophysiology data into numpy arrays 

50 Perform the probes synchronisation with nidq (3B) or main probe (3A) 

51 """ 

52 cpu = 2 

53 io_charge = 30 # this jobs reads raw ap files 

54 priority = 90 # a lot of jobs depend on this one 

55 level = 0 # this job doesn't depend on anything 

56 force = False # whether to force download of missing data on local server if outputs already exist 

57 signature = { 

58 'input_files': [('*ap.meta', 'raw_ephys_data/probe*', True), 

59 ('*ap.ch', 'raw_ephys_data/probe*', False), # not necessary when we have .bin file 

60 ('*ap.*bin', 'raw_ephys_data/probe*', True), 

61 ('*nidq.meta', 'raw_ephys_data', True), 

62 ('*nidq.ch', 'raw_ephys_data', False), # not necessary when we have .bin file 

63 ('*nidq.*bin', 'raw_ephys_data', True)], 

64 'output_files': [('_spikeglx_sync*.npy', 'raw_ephys_data*', True), 

65 ('_spikeglx_sync.polarities*.npy', 'raw_ephys_data*', True), 

66 ('_spikeglx_sync.times*.npy', 'raw_ephys_data*', True)] 

67 } 

68 

69 def get_signatures(self, **kwargs): 

70 """ 

71 Find the input and output signatures specific for local filesystem 

72 :return: 

73 """ 

74 neuropixel_version = spikeglx.get_neuropixel_version_from_folder(self.session_path) 1i

75 probes = spikeglx.get_probes_from_folder(self.session_path) 1i

76 

77 full_input_files = [] 1i

78 for sig in self.signature['input_files']: 1i

79 if 'raw_ephys_data/probe*' in sig[1]: 1i

80 for probe in probes: 1i

81 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1i

82 else: 

83 if neuropixel_version != '3A': 1i

84 full_input_files.append((sig[0], sig[1], sig[2])) 1i

85 

86 self.input_files = full_input_files 1i

87 

88 full_output_files = [] 1i

89 for sig in self.signature['output_files']: 1i

90 if neuropixel_version != '3A': 1i

91 full_output_files.append((sig[0], 'raw_ephys_data', sig[2])) 1i

92 for probe in probes: 1i

93 full_output_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1i

94 

95 self.output_files = full_output_files 1i

96 

97 def _run(self, overwrite=False): 

98 # outputs numpy 

99 syncs, out_files = ephys_fpga.extract_sync(self.session_path, overwrite=overwrite) 

100 for out_file in out_files: 

101 _logger.info(f"extracted pulses for {out_file}") 

102 

103 status, sync_files = sync_probes.sync(self.session_path) 

104 return out_files + sync_files 

105 

106 

107class RawEphysQC(tasks.Task): 

108 """ 

109 Computes raw electrophysiology QC 

110 """ 

111 cpu = 2 

112 io_charge = 30 # this jobs reads raw ap files 

113 priority = 10 # a lot of jobs depend on this one 

114 level = 0 # this job doesn't depend on anything 

115 force = False 

116 signature = { 

117 'input_files': [('*ap.meta', 'raw_ephys_data/probe*', True), 

118 ('*lf.meta', 'raw_ephys_data/probe*', True), # not necessary to run task as optional computation 

119 ('*lf.ch', 'raw_ephys_data/probe*', False), # not required it .bin file 

120 ('*lf.*bin', 'raw_ephys_data/probe*', True)], # not necessary to run task as optional computation 

121 'output_files': [('_iblqc_ephysChannels.apRMS.npy', 'raw_ephys_data/probe*', True), 

122 ('_iblqc_ephysChannels.rawSpikeRates.npy', 'raw_ephys_data/probe*', True), 

123 ('_iblqc_ephysChannels.labels.npy', 'raw_ephys_data/probe*', True), 

124 ('_iblqc_ephysSpectralDensityLF.freqs.npy', 'raw_ephys_data/probe*', True), 

125 ('_iblqc_ephysSpectralDensityLF.power.npy', 'raw_ephys_data/probe*', True), 

126 ('_iblqc_ephysSpectralDensityAP.freqs.npy', 'raw_ephys_data/probe*', True), 

127 ('_iblqc_ephysSpectralDensityAP.power.npy', 'raw_ephys_data/probe*', True), 

128 ('_iblqc_ephysTimeRmsLF.rms.npy', 'raw_ephys_data/probe*', True), 

129 ('_iblqc_ephysTimeRmsLF.timestamps.npy', 'raw_ephys_data/probe*', True)] 

130 } 

131 

132 def _run(self, overwrite=False): 

133 eid = self.one.path2eid(self.session_path) 

134 probes = [(x['id'], x['name']) for x in self.one.alyx.rest('insertions', 'list', session=eid)] 

135 # Usually there should be two probes, if there are less, check if all probes are registered 

136 if len(probes) < 2: 

137 _logger.warning(f"{len(probes)} probes registered for session {eid}, trying to register from local data") 

138 probes = [(p['id'], p['name']) for p in create_alyx_probe_insertions(self.session_path, one=self.one)] 

139 qc_files = [] 

140 for pid, pname in probes: 

141 _logger.info(f"\nRunning QC for probe insertion {pname}") 

142 try: 

143 eqc = ephysqc.EphysQC(pid, session_path=self.session_path, one=self.one) 

144 qc_files.extend(eqc.run(update=True, overwrite=overwrite)) 

145 _logger.info("Creating LFP QC plots") 

146 plot_task = LfpPlots(pid, session_path=self.session_path, one=self.one) 

147 _ = plot_task.run() 

148 self.plot_tasks.append(plot_task) 

149 plot_task = BadChannelsAp(pid, session_path=self.session_path, one=self.one) 

150 _ = plot_task.run() 

151 self.plot_tasks.append(plot_task) 

152 

153 except AssertionError: 

154 _logger.error(traceback.format_exc()) 

155 self.status = -1 

156 continue 

157 return qc_files 

158 

159 def get_signatures(self, **kwargs): 

160 probes = spikeglx.get_probes_from_folder(self.session_path) 1e

161 

162 full_output_files = [] 1e

163 for sig in self.signature['output_files']: 1e

164 if 'raw_ephys_data/probe*' in sig[1]: 1e

165 for probe in probes: 1e

166 full_output_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1e

167 

168 self.output_files = full_output_files 1e

169 

170 # input lf signature required or not required status is going to depend on the output we have, need to be agile here to 

171 # avoid unnecessary downloading of lf.cbin files 

172 expected_count = 0 1e

173 count = 0 1e

174 # check to see if we have lfp qc datasets 

175 for expected_file in full_output_files: 1e

176 if 'LF' in expected_file[0]: 1e

177 expected_count += 1 1e

178 actual_files = list(Path(self.session_path).rglob(str(Path(expected_file[1]).joinpath(expected_file[0])))) 1e

179 if len(actual_files) == 1: 1e

180 count += 1 1e

181 

182 lf_required = False if count == expected_count else True 1e

183 

184 full_input_files = [] 1e

185 for sig in self.signature['input_files']: 1e

186 if 'raw_ephys_data/probe*' in sig[1]: 1e

187 for probe in probes: 1e

188 if 'lf' in sig[0]: 1e

189 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', lf_required if sig[2] else sig[2])) 1e

190 else: 

191 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1e

192 

193 self.input_files = full_input_files 1e

194 

195 

196class EphysAudio(tasks.Task): 

197 """ 

198 Compresses the microphone wav file in a lossless flac file 

199 """ 

200 # TESTS DONE 

201 cpu = 2 

202 priority = 10 # a lot of jobs depend on this one 

203 level = 0 # this job doesn't depend on anything 

204 force = False 

205 signature = { 

206 'input_files': [('_iblrig_micData.raw.wav', 'raw_behavior_data', True)], 

207 'output_files': [('_iblrig_micData.raw.flac', 'raw_behavior_data', True)], 

208 } 

209 

210 def _run(self, overwrite=False): 

211 command = "ffmpeg -i {file_in} -y -nostdin -c:a flac -nostats {file_out}" 1b

212 file_in = next(self.session_path.rglob("_iblrig_micData.raw.wav"), None) 1b

213 if file_in is None: 1b

214 return 

215 file_out = file_in.with_suffix(".flac") 1b

216 status, output_file = ffmpeg.compress(file_in=file_in, file_out=file_out, command=command) 1b

217 return [output_file] 1b

218 

219 

220class SpikeSorting(tasks.Task): 

221 """ 

222 (DEPRECATED) Pykilosort 2.5 pipeline 

223 """ 

224 gpu = 1 

225 io_charge = 100 # this jobs reads raw ap files 

226 priority = 60 

227 level = 1 # this job doesn't depend on anything 

228 force = True 

229 job_size = 'large' 

230 

231 SHELL_SCRIPT = Path.home().joinpath( 

232 "Documents/PYTHON/iblscripts/deploy/serverpc/kilosort2/run_pykilosort.sh" 

233 ) 

234 SPIKE_SORTER_NAME = 'pykilosort' 

235 PYKILOSORT_REPO = Path.home().joinpath('Documents/PYTHON/SPIKE_SORTING/pykilosort') 

236 signature = { 

237 'input_files': [], # see setUp method for declaration of inputs 

238 'output_files': [] # see setUp method for declaration of inputs 

239 } 

240 

241 def __init__(self, *args, **kwargs): 

242 warnings.warn('`pipes.ephys_preprocessing.SpikeSorting` to be removed ' 1d

243 'in favour of `pipes.ephys_tasks.SpikeSorting`', 

244 FutureWarning) 

245 super().__init__(*args, **kwargs) 1d

246 

247 @staticmethod 

248 def spike_sorting_signature(pname=None): 

249 pname = pname if pname is not None else "probe*" 1d

250 input_signature = [('*ap.meta', f'raw_ephys_data/{pname}', True), 1d

251 ('*ap.ch', f'raw_ephys_data/{pname}', True), 

252 ('*ap.cbin', f'raw_ephys_data/{pname}', True), 

253 ('*nidq.meta', 'raw_ephys_data', False), 

254 ('_spikeglx_sync.channels.*', 'raw_ephys_data*', True), 

255 ('_spikeglx_sync.polarities.*', 'raw_ephys_data*', True), 

256 ('_spikeglx_sync.times.*', 'raw_ephys_data*', True), 

257 ('_iblrig_taskData.raw.*', 'raw_behavior_data', True), 

258 ('_iblrig_taskSettings.raw.*', 'raw_behavior_data', True)] 

259 output_signature = [('spike_sorting_pykilosort.log', f'spike_sorters/pykilosort/{pname}', True), 1d

260 ('_iblqc_ephysTimeRmsAP.rms.npy', f'raw_ephys_data/{pname}', True), # new ibllib 2.5 

261 ('_iblqc_ephysTimeRmsAP.timestamps.npy', f'raw_ephys_data/{pname}', True)] # new ibllib 2.5 

262 return input_signature, output_signature 1d

263 

264 @staticmethod 

265 def _sample2v(ap_file): 

266 md = spikeglx.read_meta_data(ap_file.with_suffix(".meta")) 

267 s2v = spikeglx._conversion_sample2v_from_meta(md) 

268 return s2v["ap"][0] 

269 

270 @staticmethod 

271 def _fetch_pykilosort_version(repo_path): 

272 init_file = Path(repo_path).joinpath('pykilosort', '__init__.py') 

273 version = SpikeSorting._fetch_ks2_commit_hash(repo_path) # default 

274 try: 

275 with open(init_file) as fid: 

276 lines = fid.readlines() 

277 for line in lines: 

278 if line.startswith("__version__ = "): 

279 version = line.split('=')[-1].strip().replace('"', '').replace("'", '') 

280 except Exception: 

281 pass 

282 return f"pykilosort_{version}" 

283 

284 @staticmethod 

285 def _fetch_pykilosort_run_version(log_file): 

286 """ 

287 Parse the following line (2 formats depending on version) from the log files to get the version 

288 '\x1b[0m15:39:37.919 [I] ibl:90 Starting Pykilosort version ibl_1.2.1, output in gnagga^[[0m\n' 

289 '\x1b[0m15:39:37.919 [I] ibl:90 Starting Pykilosort version ibl_1.3.0^[[0m\n' 

290 """ 

291 with open(log_file) as fid: 

292 line = fid.readline() 

293 version = re.search('version (.*), output', line) 

294 version = version or re.search('version (.*)', line) # old versions have output, new have a version line 

295 version = re.sub('\\^[[0-9]+m', '', version.group(1)) # removes the coloring tags 

296 return f"pykilosort_{version}" 

297 

298 @staticmethod 

299 def _fetch_ks2_commit_hash(repo_path): 

300 command2run = f"git --git-dir {repo_path}/.git rev-parse --verify HEAD" 

301 process = subprocess.Popen( 

302 command2run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE 

303 ) 

304 info, error = process.communicate() 

305 if process.returncode != 0: 

306 _logger.error( 

307 f"Can't fetch pykilsort commit hash, will still attempt to run \n" 

308 f"Error: {error.decode('utf-8')}" 

309 ) 

310 return "" 

311 return info.decode("utf-8").strip() 

312 

313 @staticmethod 

314 def parse_version(v) -> packaging.version.Version: 

315 """ 

316 Extracts and parses semantic version (major.minor.patch) from a version string. 

317 

318 Parameters 

319 ---------- 

320 v : str 

321 A version string containing a semantic version. 

322 

323 Returns 

324 ------- 

325 packaging.version.Version 

326 The parsed semantic version number 

327 

328 Examples 

329 -------- 

330 >>> SpikeSorting.parse_version('ibl_1.2') 

331 <Version('1.2')> 

332 >>> SpikeSorting.parse_version('pykilosort_ibl_1.2.0-new') 

333 <Version('1.2.0')> 

334 >>> SpikeSorting.parse_version('ibl_0.2') < SpikeSorting.parse_version('pykilosort_v1') 

335 True 

336 """ 

337 m = re.search(r'(\d+\.?){1,3}', v) 1n

338 if not m: 1n

339 raise packaging.version.InvalidVersion(f'Failed to detect SemVer in "{v}"') 1n

340 return packaging.version.parse(m.group()) 1n

341 

342 def setUp(self, probes=None): 

343 """ 

344 Overwrite setup method to allow inputs and outputs to be only one probe 

345 :param probes: list of probes e.g ['probe00'] 

346 :return: 

347 """ 

348 if not probes or len(probes) == 2: 

349 self.signature['input_files'], self.signature['output_files'] = self.spike_sorting_signature() 

350 else: 

351 self.signature['input_files'], self.signature['output_files'] = self.spike_sorting_signature(probes[0]) 

352 

353 return super().setUp(probes=probes) 

354 

355 def _run_pykilosort(self, ap_file): 

356 """ 

357 Runs the ks2 matlab spike sorting for one probe dataset 

358 the raw spike sorting output is in session_path/spike_sorters/{self.SPIKE_SORTER_NAME}/probeXX folder 

359 (discontinued support for old spike sortings in the probe folder <1.5.5) 

360 :return: path of the folder containing ks2 spike sorting output 

361 """ 

362 self.version = self._fetch_pykilosort_version(self.PYKILOSORT_REPO) 

363 label = ap_file.parts[-2] # this is usually the probe name 

364 sorter_dir = self.session_path.joinpath("spike_sorters", self.SPIKE_SORTER_NAME, label) 

365 self.FORCE_RERUN = False 

366 if not self.FORCE_RERUN: 

367 log_file = sorter_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log") 

368 if log_file.exists(): 

369 run_version = self._fetch_pykilosort_run_version(log_file) 

370 if self.parse_version(run_version) > self.parse_version('pykilosort_ibl_1.1.0'): 

371 _logger.info(f"Already ran: spike_sorting_{self.SPIKE_SORTER_NAME}.log" 

372 f" found in {sorter_dir}, skipping.") 

373 return sorter_dir 

374 else: 

375 self.FORCE_RERUN = True 

376 

377 print(sorter_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log")) 

378 # get the scratch drive from the shell script 

379 with open(self.SHELL_SCRIPT) as fid: 

380 lines = fid.readlines() 

381 line = [line for line in lines if line.startswith("SCRATCH_DRIVE=")][0] 

382 m = re.search(r"\=(.*?)(\#|\n)", line)[0] 

383 scratch_drive = Path(m[1:-1].strip()) 

384 assert scratch_drive.exists() 

385 

386 # clean up and create directory, this also checks write permissions 

387 # temp_dir has the following shape: pykilosort/ZM_3003_2020-07-29_001_probe00 

388 # first makes sure the tmp dir is clean 

389 shutil.rmtree(scratch_drive.joinpath(self.SPIKE_SORTER_NAME), ignore_errors=True) 

390 temp_dir = scratch_drive.joinpath( 

391 self.SPIKE_SORTER_NAME, "_".join(list(self.session_path.parts[-3:]) + [label]) 

392 ) 

393 if temp_dir.exists(): # hmmm this has to be decided, we may want to restart ? 

394 # But failed sessions may then clog the scratch dir and have users run out of space 

395 shutil.rmtree(temp_dir, ignore_errors=True) 

396 temp_dir.mkdir(parents=True, exist_ok=True) 

397 

398 check_nvidia_driver() 

399 command2run = f"{self.SHELL_SCRIPT} {ap_file} {temp_dir}" 

400 _logger.info(command2run) 

401 process = subprocess.Popen( 

402 command2run, 

403 shell=True, 

404 stdout=subprocess.PIPE, 

405 stderr=subprocess.PIPE, 

406 executable="/bin/bash", 

407 ) 

408 info, error = process.communicate() 

409 info_str = info.decode("utf-8").strip() 

410 _logger.info(info_str) 

411 if process.returncode != 0: 

412 error_str = error.decode("utf-8").strip() 

413 # try and get the kilosort log if any 

414 for log_file in temp_dir.rglob('*_kilosort.log'): 

415 with open(log_file) as fid: 

416 log = fid.read() 

417 _logger.error(log) 

418 break 

419 raise RuntimeError(f"{self.SPIKE_SORTER_NAME} {info_str}, {error_str}") 

420 

421 shutil.copytree(temp_dir.joinpath('output'), sorter_dir, dirs_exist_ok=True) 

422 shutil.rmtree(temp_dir, ignore_errors=True) 

423 return sorter_dir 

424 

425 def _run(self, probes=None): 

426 """ 

427 Multiple steps. For each probe: 

428 - Runs ks2 (skips if it already ran) 

429 - synchronize the spike sorting 

430 - output the probe description files 

431 :param probes: (list of str) if provided, will only run spike sorting for specified probe names 

432 :return: list of files to be registered on database 

433 """ 

434 efiles = spikeglx.glob_ephys_files(self.session_path) 

435 ap_files = [(ef.get("ap"), ef.get("label")) for ef in efiles if "ap" in ef.keys()] 

436 out_files = [] 

437 for ap_file, label in ap_files: 

438 if isinstance(probes, list) and label not in probes: 

439 continue 

440 try: 

441 # if the file is part of a sequence, handles the run accordingly 

442 sequence_file = ap_file.parent.joinpath(ap_file.stem.replace('ap', 'sequence.json')) 

443 # temporary just skips for now 

444 if sequence_file.exists(): 

445 continue 

446 ks2_dir = self._run_pykilosort(ap_file) # runs ks2, skips if it already ran 

447 probe_out_path = self.session_path.joinpath("alf", label, self.SPIKE_SORTER_NAME) 

448 shutil.rmtree(probe_out_path, ignore_errors=True) 

449 probe_out_path.mkdir(parents=True, exist_ok=True) 

450 spikes.ks2_to_alf( 

451 ks2_dir, 

452 bin_path=ap_file.parent, 

453 out_path=probe_out_path, 

454 bin_file=ap_file, 

455 ampfactor=self._sample2v(ap_file), 

456 ) 

457 logfile = ks2_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log") 

458 if logfile.exists(): 

459 shutil.copyfile(logfile, probe_out_path.joinpath( 

460 f"_ibl_log.info_{self.SPIKE_SORTER_NAME}.log")) 

461 out, _ = spikes.sync_spike_sorting(ap_file=ap_file, out_path=probe_out_path) 

462 out_files.extend(out) 

463 # convert ks2_output into tar file and also register 

464 # Make this in case spike sorting is in old raw_ephys_data folders, for new 

465 # sessions it should already exist 

466 tar_dir = self.session_path.joinpath( 

467 'spike_sorters', self.SPIKE_SORTER_NAME, label) 

468 tar_dir.mkdir(parents=True, exist_ok=True) 

469 out = spikes.ks2_to_tar(ks2_dir, tar_dir, force=self.FORCE_RERUN) 

470 out_files.extend(out) 

471 

472 if self.one: 

473 eid = self.one.path2eid(self.session_path, query_type='remote') 

474 ins = self.one.alyx.rest('insertions', 'list', session=eid, name=label, query_type='remote') 

475 if len(ins) != 0: 

476 _logger.info("Creating SpikeSorting QC plots") 

477 plot_task = ApPlots(ins[0]['id'], session_path=self.session_path, one=self.one) 

478 _ = plot_task.run() 

479 self.plot_tasks.append(plot_task) 

480 

481 plot_task = SpikeSortingPlots(ins[0]['id'], session_path=self.session_path, one=self.one) 

482 _ = plot_task.run(collection=str(probe_out_path.relative_to(self.session_path))) 

483 self.plot_tasks.append(plot_task) 

484 

485 resolved = ins[0].get('json', {'temp': 0}).get('extended_qc', {'temp': 0}). \ 

486 get('alignment_resolved', False) 

487 if resolved: 

488 chns = np.load(probe_out_path.joinpath('channels.localCoordinates.npy')) 

489 out = get_aligned_channels(ins[0], chns, one=self.one, save_dir=probe_out_path) 

490 out_files.extend(out) 

491 

492 except Exception: 

493 _logger.error(traceback.format_exc()) 

494 self.status = -1 

495 continue 

496 probe_files = spikes.probes_description(self.session_path, one=self.one) 

497 return out_files + probe_files 

498 

499 def get_signatures(self, probes=None, **kwargs): 

500 """ 

501 This transforms all wildcards in collection to exact match 

502 :param probes: 

503 :return: 

504 """ 

505 

506 neuropixel_version = spikeglx.get_neuropixel_version_from_folder(self.session_path) 1d

507 probes = probes or spikeglx.get_probes_from_folder(self.session_path) 1d

508 

509 full_input_files = [] 1d

510 for sig in self.signature['input_files']: 1d

511 if 'raw_ephys_data*' in sig[1]: 1d

512 if neuropixel_version != '3A': 1d

513 full_input_files.append((sig[0], 'raw_ephys_data', sig[2])) 1d

514 for probe in probes: 1d

515 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1d

516 

517 elif 'raw_ephys_data/probe*' in sig[1]: 1d

518 for probe in probes: 1d

519 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1d

520 elif 'raw_ephys_data' in sig[1]: 1d

521 if neuropixel_version != '3A': 1d

522 full_input_files.append((sig[0], 'raw_ephys_data', sig[2])) 1d

523 else: 

524 full_input_files.append(sig) 1d

525 

526 self.input_files = full_input_files 1d

527 

528 full_output_files = [] 1d

529 for sig in self.signature['output_files']: 1d

530 if 'probe*' in sig[1]: 1d

531 for probe in probes: 1d

532 col = sig[1].split('/')[:-1] + [probe] 1d

533 full_output_files.append((sig[0], '/'.join(col), sig[2])) 1d

534 else: 

535 full_input_files.append(sig) 

536 

537 self.output_files = full_output_files 1d

538 

539 

540class EphysVideoCompress(tasks.Task): 

541 priority = 90 

542 level = 0 

543 force = False 

544 job_size = 'large' 

545 io_charge = 100 

546 

547 signature = { 

548 'input_files': [('_iblrig_*Camera.raw.*', 'raw_video_data', True)], 

549 'output_files': [('_iblrig_*Camera.raw.mp4', 'raw_video_data', True)] 

550 } 

551 

552 def _run(self, **kwargs): 

553 # avi to mp4 compression 

554 command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 17 ' 1cb

555 '-loglevel 0 -codec:a copy {file_out}') 

556 output_files = ffmpeg.iblrig_video_compression(self.session_path, command) 1cb

557 

558 if len(output_files) == 0: 1cb

559 _logger.info('No compressed videos found') 

560 return 

561 

562 return output_files 1cb

563 

564 def get_signatures(self, **kwargs): 

565 # need to detect the number of cameras 

566 output_files = Path(self.session_path).joinpath('raw_video_data').glob('*') 1cjb

567 labels = {label_from_path(x) for x in output_files} 1cjb

568 

569 full_input_files = [] 1cjb

570 for sig in self.signature['input_files']: 1cjb

571 for label in labels: 1cjb

572 full_input_files.append((sig[0].replace('*Camera', f'{label}Camera'), sig[1], sig[2])) 1cjb

573 

574 self.input_files = full_input_files 1cjb

575 

576 full_output_files = [] 1cjb

577 for sig in self.signature['output_files']: 1cjb

578 for label in labels: 1cjb

579 full_output_files.append((sig[0].replace('*Camera', f'{label}Camera'), sig[1], sig[2])) 1cjb

580 

581 self.output_files = full_output_files 1cjb

582 

583 

584class EphysVideoSyncQc(tasks.Task): 

585 priority = 40 

586 level = 2 

587 force = True 

588 signature = { 

589 'input_files': [('_iblrig_*Camera.raw.mp4', 'raw_video_data', True), 

590 ('_iblrig_*Camera.timestamps.ssv', 'raw_video_data', False), 

591 ('_iblrig_*Camera.timestamps.npy', 'raw_video_data', False), 

592 ('_iblrig_*Camera.frameData.bin', 'raw_video_data', False), 

593 ('_iblrig_*Camera.GPIO.bin', 'raw_video_data', False), 

594 ('_iblrig_*Camera.frame_counter.bin', 'raw_video_data', False), 

595 ('_iblrig_taskData.raw.*', 'raw_behavior_data', True), 

596 ('_iblrig_taskSettings.raw.*', 'raw_behavior_data', True), 

597 ('_spikeglx_sync.channels.*', 'raw_ephys_data*', True), 

598 ('_spikeglx_sync.polarities.*', 'raw_ephys_data*', True), 

599 ('_spikeglx_sync.times.*', 'raw_ephys_data*', True), 

600 ('*wheel.position.npy', 'alf', False), 

601 ('*wheel.timestamps.npy', 'alf', False), 

602 ('*wiring.json', 'raw_ephys_data*', False), 

603 ('*.meta', 'raw_ephys_data*', True)], 

604 

605 'output_files': [('_ibl_*Camera.times.npy', 'alf', True)] 

606 } 

607 

608 def _run(self, **kwargs): 

609 

610 mp4_files = self.session_path.joinpath('raw_video_data').rglob('*.mp4') 1cb

611 labels = [label_from_path(x) for x in mp4_files] 1cb

612 # Video timestamps extraction 

613 output_files = [] 1cb

614 data, files = camera.extract_all(self.session_path, save=True, labels=labels) 1cb

615 output_files.extend(files) 1cb

616 

617 # Video QC 

618 run_camera_qc(self.session_path, update=True, one=self.one, cameras=labels) 1cb

619 

620 return output_files 1cb

621 

622 def get_signatures(self, **kwargs): 

623 neuropixel_version = spikeglx.get_neuropixel_version_from_folder(self.session_path) 1cgb

624 probes = spikeglx.get_probes_from_folder(self.session_path) 1cgb

625 # need to detect the number of cameras 

626 output_files = Path(self.session_path).joinpath('raw_video_data').rglob('*') 1cgb

627 labels = np.unique([label_from_path(x) for x in output_files]) 1cgb

628 

629 full_input_files = [] 1cgb

630 for sig in self.signature['input_files']: 1cgb

631 if 'raw_ephys_data*' in sig[1]: 1cgb

632 if neuropixel_version != '3A': 1cgb

633 full_input_files.append((sig[0], 'raw_ephys_data', sig[2])) 1cg

634 for probe in probes: 1cgb

635 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1g

636 elif 'Camera' in sig[0]: 1cgb

637 for lab in labels: 1cgb

638 full_input_files.append((sig[0].replace('*Camera', f'{lab}Camera'), sig[1], sig[2])) 1cgb

639 else: 

640 full_input_files.append((sig[0], sig[1], sig[2])) 1cgb

641 

642 self.input_files = full_input_files 1cgb

643 

644 full_output_files = [] 1cgb

645 for sig in self.signature['output_files']: 1cgb

646 if 'raw_ephys_data*' in sig[1]: 1cgb

647 if neuropixel_version != '3A': 

648 full_output_files.append((sig[0], 'raw_ephys_data', sig[2])) 

649 for probe in probes: 

650 full_output_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 

651 elif 'Camera' in sig[0]: 1cgb

652 for lab in labels: 1cgb

653 full_output_files.append((sig[0].replace('*Camera', f'{lab}Camera'), sig[1], sig[2])) 1cgb

654 else: 

655 full_output_files.append((sig[0], sig[1], sig[2])) 

656 

657 self.output_files = full_output_files 1cgb

658 

659 

660# level 1 

661class EphysTrials(tasks.Task): 

662 priority = 90 

663 level = 1 

664 force = False 

665 signature = { 

666 'input_files': [('_iblrig_taskData.raw.*', 'raw_behavior_data', True), 

667 ('_iblrig_taskSettings.raw.*', 'raw_behavior_data', True), 

668 ('_spikeglx_sync.channels.*', 'raw_ephys_data*', True), 

669 ('_spikeglx_sync.polarities.*', 'raw_ephys_data*', True), 

670 ('_spikeglx_sync.times.*', 'raw_ephys_data*', True), 

671 ('_iblrig_encoderEvents.raw*', 'raw_behavior_data', True), 

672 ('_iblrig_encoderPositions.raw*', 'raw_behavior_data', True), 

673 ('*wiring.json', 'raw_ephys_data*', False), 

674 ('*.meta', 'raw_ephys_data*', True)], 

675 'output_files': [('*trials.table.pqt', 'alf', True), 

676 ('*trials.goCueTrigger_times.npy', 'alf', True), 

677 ('*trials.intervals_bpod.npy', 'alf', True), 

678 ('*trials.stimOff_times.npy', 'alf', True), 

679 ('*trials.quiescencePeriod.npy', 'alf', True), 

680 ('*wheel.position.npy', 'alf', True), 

681 ('*wheel.timestamps.npy', 'alf', True), 

682 ('*wheelMoves.intervals.npy', 'alf', True), 

683 ('*wheelMoves.peakAmplitude.npy', 'alf', True)] 

684 } 

685 

686 def _behaviour_criterion(self): 

687 """ 

688 Computes and update the behaviour criterion on Alyx 

689 """ 

690 from brainbox.behavior import training 

691 

692 trials = alfio.load_object(self.session_path.joinpath("alf"), "trials") 

693 good_enough = training.criterion_delay( 

694 n_trials=trials["intervals"].shape[0], 

695 perf_easy=training.compute_performance_easy(trials), 

696 ) 

697 eid = self.one.path2eid(self.session_path, query_type='remote') 

698 self.one.alyx.json_field_update( 

699 "sessions", eid, "extended_qc", {"behavior": int(good_enough)} 

700 ) 

701 

702 def extract_behaviour(self, save=True): 

703 dsets, out_files, self.extractor = ephys_fpga.extract_all( 1f

704 self.session_path, save=save, return_extractor=True) 

705 

706 return dsets, out_files 1f

707 

708 def run_qc(self, trials_data=None, update=True, plot_qc=False): 

709 if trials_data is None: 

710 trials_data, _ = self.extract_behaviour(save=False) 

711 if not trials_data: 

712 raise ValueError('No trials data found') 

713 

714 # Run the task QC 

715 qc = TaskQC(self.session_path, one=self.one, log=_logger) 

716 qc.extractor = TaskQCExtractor(self.session_path, lazy=True, one=qc.one) 

717 # Extract extra datasets required for QC 

718 qc.extractor.data = qc.extractor.rename_data(trials_data) 

719 wheel_ts_bpod = self.extractor.bpod2fpga(self.extractor.bpod_trials['wheel_timestamps']) 

720 qc.extractor.data['wheel_timestamps_bpod'] = wheel_ts_bpod 

721 qc.extractor.data['wheel_position_bpod'] = self.extractor.bpod_trials['wheel_position'] 

722 qc.extractor.wheel_encoding = 'X4' 

723 qc.extractor.settings = self.extractor.settings 

724 qc.extractor.frame_ttls = self.extractor.frame2ttl 

725 qc.extractor.audio_ttls = self.extractor.audio 

726 qc.extractor.bpod_ttls = self.extractor.bpod 

727 

728 # Aggregate and update Alyx QC fields 

729 qc.run(update=update) 

730 

731 if plot_qc: 

732 _logger.info("Creating Trials QC plots") 

733 try: 

734 session_id = self.one.path2eid(self.session_path) 

735 plot_task = BehaviourPlots(session_id, self.session_path, one=self.one) 

736 _ = plot_task.run() 

737 self.plot_tasks.append(plot_task) 

738 except Exception: 

739 _logger.error('Could not create Trials QC Plot') 

740 _logger.error(traceback.format_exc()) 

741 self.status = -1 

742 return qc 

743 

744 def _run(self, plot_qc=True): 

745 dsets, out_files = self.extract_behaviour() 1f

746 

747 if self.one and not self.one.offline: 1f

748 self._behaviour_criterion() 

749 self.run_qc(trials_data=dsets, update=True, plot_qc=plot_qc) 

750 return out_files 1f

751 

752 def get_signatures(self, **kwargs): 

753 neuropixel_version = spikeglx.get_neuropixel_version_from_folder(self.session_path) 1kf

754 probes = spikeglx.get_probes_from_folder(self.session_path) 1kf

755 

756 full_input_files = [] 1kf

757 for sig in self.signature['input_files']: 1kf

758 if 'raw_ephys_data*' in sig[1]: 1kf

759 if neuropixel_version != '3A': 1kf

760 full_input_files.append((sig[0], 'raw_ephys_data', sig[2])) 1kf

761 for probe in probes: 1kf

762 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1kf

763 else: 

764 full_input_files.append(sig) 1kf

765 

766 self.input_files = full_input_files 1kf

767 

768 self.output_files = self.signature['output_files'] 1kf

769 

770 

771class LaserTrialsLegacy(EphysTrials): 

772 """This is the legacy extractor for Guido's ephys optogenetic stimulation protocol. 

773 

774 This is legacy because personal project extractors should be in a separate repository. 

775 """ 

776 def extract_behaviour(self): 

777 dsets, out_files = super().extract_behaviour() 1f

778 

779 # Re-extract the laser datasets as the above default extractor discards them 

780 from ibllib.io.extractors import opto_trials 1f

781 laser = opto_trials.LaserBool(self.session_path) 1f

782 dsets_laser, out_files_laser = laser.extract(save=True) 1f

783 dsets.update({k: v for k, v in zip(laser.var_names, dsets_laser)}) 1f

784 out_files.extend(out_files_laser) 1f

785 return dsets, out_files 1f

786 

787 

788class EphysCellsQc(tasks.Task): 

789 priority = 90 

790 level = 3 

791 force = False 

792 

793 signature = { 

794 'input_files': [('spikes.times.npy', 'alf/probe*', True), 

795 ('spikes.clusters.npy', 'alf/probe*', True), 

796 ('spikes.amps.npy', 'alf/probe*', True), 

797 ('spikes.depths.npy', 'alf/probe*', True), 

798 ('clusters.channels.npy', 'alf/probe*', True)], 

799 'output_files': [('clusters.metrics.pqt', 'alf/probe*', True)] 

800 } 

801 

802 def _compute_cell_qc(self, folder_probe): 

803 """ 

804 Computes the cell QC given an extracted probe alf path 

805 :param folder_probe: folder 

806 :return: 

807 """ 

808 # compute the straight qc 

809 _logger.info(f"Computing cluster qc for {folder_probe}") 

810 spikes = alfio.load_object(folder_probe, 'spikes') 

811 clusters = alfio.load_object(folder_probe, 'clusters') 

812 df_units, drift = ephysqc.spike_sorting_metrics( 

813 spikes.times, spikes.clusters, spikes.amps, spikes.depths, 

814 cluster_ids=np.arange(clusters.channels.size)) 

815 # if the ks2 labels file exist, load them and add the column 

816 file_labels = folder_probe.joinpath('cluster_KSLabel.tsv') 

817 if file_labels.exists(): 

818 ks2_labels = pd.read_csv(file_labels, sep='\t') 

819 ks2_labels.rename(columns={'KSLabel': 'ks2_label'}, inplace=True) 

820 df_units = pd.concat( 

821 [df_units, ks2_labels['ks2_label'].reindex(df_units.index)], axis=1) 

822 # save as parquet file 

823 df_units.to_parquet(folder_probe.joinpath("clusters.metrics.pqt")) 

824 return folder_probe.joinpath("clusters.metrics.pqt"), df_units, drift 

825 

826 def _label_probe_qc(self, folder_probe, df_units, drift): 

827 """ 

828 Labels the json field of the alyx corresponding probe insertion 

829 :param folder_probe: 

830 :param df_units: 

831 :param drift: 

832 :return: 

833 """ 

834 eid = self.one.path2eid(self.session_path, query_type='remote') 

835 # the probe name is the first folder after alf: {session_path}/alf/{probe_name}/{spike_sorter_name} 

836 probe_name = Path(folder_probe).relative_to(self.session_path.joinpath('alf')).parts[0] 

837 pdict = self.one.alyx.rest('insertions', 'list', session=eid, name=probe_name, no_cache=True) 

838 if len(pdict) != 1: 

839 _logger.warning(f'No probe found for probe name: {probe_name}') 

840 return 

841 isok = df_units['label'] == 1 

842 qcdict = {'n_units': int(df_units.shape[0]), 

843 'n_units_qc_pass': int(np.sum(isok)), 

844 'firing_rate_max': np.max(df_units['firing_rate'][isok]), 

845 'firing_rate_median': np.median(df_units['firing_rate'][isok]), 

846 'amplitude_max_uV': np.max(df_units['amp_max'][isok]) * 1e6, 

847 'amplitude_median_uV': np.max(df_units['amp_median'][isok]) * 1e6, 

848 'drift_rms_um': rms(drift['drift_um']), 

849 } 

850 file_wm = folder_probe.joinpath('_kilosort_whitening.matrix.npy') 

851 if file_wm.exists(): 

852 wm = np.load(file_wm) 

853 qcdict['whitening_matrix_conditioning'] = np.linalg.cond(wm) 

854 # groom qc dict (this function will eventually go directly into the json field update) 

855 for k in qcdict: 

856 if isinstance(qcdict[k], np.int64): 

857 qcdict[k] = int(qcdict[k]) 

858 elif isinstance(qcdict[k], float): 

859 qcdict[k] = np.round(qcdict[k], 2) 

860 self.one.alyx.json_field_update("insertions", pdict[0]["id"], "json", qcdict) 

861 

862 def _run(self): 

863 """ 

864 Post spike-sorting quality control at the cluster level. 

865 Outputs a QC table in the clusters ALF object and labels corresponding probes in Alyx 

866 """ 

867 files_spikes = Path(self.session_path).joinpath('alf').rglob('spikes.times.npy') 

868 folder_probes = [f.parent for f in files_spikes] 

869 out_files = [] 

870 for folder_probe in folder_probes: 

871 try: 

872 qc_file, df_units, drift = self._compute_cell_qc(folder_probe) 

873 out_files.append(qc_file) 

874 self._label_probe_qc(folder_probe, df_units, drift) 

875 except Exception: 

876 _logger.error(traceback.format_exc()) 

877 self.status = -1 

878 continue 

879 return out_files 

880 

881 def get_signatures(self, **kwargs): 

882 files_spikes = Path(self.session_path).joinpath('alf').rglob('spikes.times.npy') 1l

883 folder_probes = [f.parent for f in files_spikes] 1l

884 

885 full_input_files = [] 1l

886 for sig in self.signature['input_files']: 1l

887 for folder in folder_probes: 1l

888 full_input_files.append((sig[0], str(folder.relative_to(self.session_path)), sig[2])) 1l

889 

890 self.input_files = full_input_files 1l

891 

892 full_output_files = [] 1l

893 for sig in self.signature['output_files']: 1l

894 for folder in folder_probes: 1l

895 full_output_files.append((sig[0], str(folder.relative_to(self.session_path)), sig[2])) 1l

896 

897 self.output_files = full_output_files 1l

898 

899 

900class EphysMtscomp(tasks.Task): 

901 priority = 50 # ideally after spike sorting 

902 level = 0 

903 force = False 

904 signature = { 

905 'input_files': [('*ap.meta', 'raw_ephys_data/probe*', True), 

906 ('*ap.*bin', 'raw_ephys_data/probe*', True), 

907 ('*lf.meta', 'raw_ephys_data/probe*', False), # NP2 doesn't have lf files 

908 ('*lf.*bin', 'raw_ephys_data/probe*', False), # NP2 doesn't have lf files 

909 ('*nidq.meta', 'raw_ephys_data', True), 

910 ('*nidq.*bin', 'raw_ephys_data', True)], 

911 'output_files': [('*ap.meta', 'raw_ephys_data/probe*', True), 

912 ('*ap.cbin', 'raw_ephys_data/probe*', False), # may not be present on local server anymore 

913 ('*ap.ch', 'raw_ephys_data/probe*', True), 

914 ('*lf.meta', 'raw_ephys_data/probe*', False), # NP2 doesn't have lf files # TODO detect from meta 

915 ('*lf.cbin', 'raw_ephys_data/probe*', False), # may not be present on local server anymore 

916 ('*lf.ch', 'raw_ephys_data/probe*', False), 

917 ('*nidq.meta', 'raw_ephys_data', True), 

918 ('*nidq.cbin', 'raw_ephys_data', False), # may not be present on local server anymore 

919 ('*nidq.ch', 'raw_ephys_data', True)] 

920 } 

921 

922 def _run(self): 

923 """ 

924 Compress ephys files looking for `compress_ephys.flag` within the probes folder 

925 Original bin file will be removed 

926 The registration flag created contains targeted file names at the root of the session 

927 """ 

928 

929 out_files = [] 

930 ephys_files = spikeglx.glob_ephys_files(self.session_path) 

931 ephys_files += spikeglx.glob_ephys_files(self.session_path, ext="ch") 

932 ephys_files += spikeglx.glob_ephys_files(self.session_path, ext="meta") 

933 

934 for ef in ephys_files: 

935 for typ in ["ap", "lf", "nidq"]: 

936 bin_file = ef.get(typ) 

937 if not bin_file: 

938 continue 

939 if bin_file.suffix.find("bin") == 1: 

940 with spikeglx.Reader(bin_file) as sr: 

941 if sr.is_mtscomp: 

942 out_files.append(bin_file) 

943 else: 

944 _logger.info(f"Compressing binary file {bin_file}") 

945 out_files.append(sr.compress_file(keep_original=False)) 

946 out_files.append(bin_file.with_suffix('.ch')) 

947 else: 

948 out_files.append(bin_file) 

949 

950 return out_files 

951 

952 def get_signatures(self, **kwargs): 

953 neuropixel_version = spikeglx.get_neuropixel_version_from_folder(self.session_path) 1h

954 probes = spikeglx.get_probes_from_folder(self.session_path) 1h

955 

956 full_input_files = [] 1h

957 for sig in self.signature['input_files']: 1h

958 if 'raw_ephys_data/probe*' in sig[1]: 1h

959 for probe in probes: 1h

960 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1h

961 else: 

962 if neuropixel_version != '3A': 1h

963 full_input_files.append((sig[0], sig[1], sig[2])) 1h

964 

965 self.input_files = full_input_files 1h

966 

967 full_output_files = [] 1h

968 for sig in self.signature['output_files']: 1h

969 if 'raw_ephys_data/probe*' in sig[1]: 1h

970 for probe in probes: 1h

971 full_output_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1h

972 else: 

973 if neuropixel_version != '3A': 1h

974 full_output_files.append((sig[0], sig[1], sig[2])) 1h

975 

976 self.output_files = full_output_files 1h

977 

978 

979class EphysDLC(tasks.Task): 

980 """ 

981 This task relies on a correctly installed dlc environment as per 

982 https://docs.google.com/document/d/1g0scP6_3EmaXCU4SsDNZWwDTaD9MG0es_grLA-d0gh0/edit# 

983 

984 If your environment is set up otherwise, make sure that you set the respective attributes: 

985 t = EphysDLC(session_path) 

986 t.dlcenv = Path('/path/to/your/dlcenv/bin/activate') 

987 t.scripts = Path('/path/to/your/iblscripts/deploy/serverpc/dlc') 

988 """ 

989 gpu = 1 

990 cpu = 4 

991 io_charge = 100 

992 level = 2 

993 force = True 

994 job_size = 'large' 

995 

996 dlcenv = Path.home().joinpath('Documents', 'PYTHON', 'envs', 'dlcenv', 'bin', 'activate') 

997 scripts = Path.home().joinpath('Documents', 'PYTHON', 'iblscripts', 'deploy', 'serverpc', 'dlc') 

998 signature = { 

999 'input_files': [ 

1000 ('_iblrig_leftCamera.raw.mp4', 'raw_video_data', True), 

1001 ('_iblrig_rightCamera.raw.mp4', 'raw_video_data', True), 

1002 ('_iblrig_bodyCamera.raw.mp4', 'raw_video_data', True), 

1003 ], 

1004 'output_files': [ 

1005 ('_ibl_leftCamera.dlc.pqt', 'alf', True), 

1006 ('_ibl_rightCamera.dlc.pqt', 'alf', True), 

1007 ('_ibl_bodyCamera.dlc.pqt', 'alf', True), 

1008 ('leftCamera.ROIMotionEnergy.npy', 'alf', True), 

1009 ('rightCamera.ROIMotionEnergy.npy', 'alf', True), 

1010 ('bodyCamera.ROIMotionEnergy.npy', 'alf', True), 

1011 ('leftROIMotionEnergy.position.npy', 'alf', True), 

1012 ('rightROIMotionEnergy.position.npy', 'alf', True), 

1013 ('bodyROIMotionEnergy.position.npy', 'alf', True), 

1014 ], 

1015 } 

1016 

1017 def _check_dlcenv(self): 

1018 """Check that scripts are present, dlcenv can be activated and get iblvideo version""" 

1019 assert len(list(self.scripts.rglob('run_dlc.*'))) == 2, \ 

1020 f'Scripts run_dlc.sh and run_dlc.py do not exist in {self.scripts}' 

1021 assert len(list(self.scripts.rglob('run_motion.*'))) == 2, \ 

1022 f'Scripts run_motion.sh and run_motion.py do not exist in {self.scripts}' 

1023 assert self.dlcenv.exists(), f"DLC environment does not exist in assumed location {self.dlcenv}" 

1024 command2run = f"source {self.dlcenv}; python -c 'import iblvideo; print(iblvideo.__version__)'" 

1025 process = subprocess.Popen( 

1026 command2run, 

1027 shell=True, 

1028 stdout=subprocess.PIPE, 

1029 stderr=subprocess.PIPE, 

1030 executable="/bin/bash" 

1031 ) 

1032 info, error = process.communicate() 

1033 if process.returncode != 0: 

1034 raise AssertionError(f"DLC environment check failed\n{error.decode('utf-8')}") 

1035 version = info.decode("utf-8").strip().split('\n')[-1] 

1036 return version 

1037 

1038 @staticmethod 

1039 def _video_intact(file_mp4): 

1040 """Checks that the downloaded video can be opened and is not empty""" 

1041 cap = cv2.VideoCapture(str(file_mp4)) 

1042 frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) 

1043 intact = True if frame_count > 0 else False 

1044 cap.release() 

1045 return intact 

1046 

1047 def _run(self, cams=None, overwrite=False): 

1048 # Default to all three cams 

1049 cams = cams or ['left', 'right', 'body'] 

1050 cams = assert_valid_label(cams) 

1051 # Set up 

1052 self.session_id = self.one.path2eid(self.session_path) 

1053 actual_outputs = [] 

1054 

1055 # Loop through cams 

1056 for cam in cams: 

1057 # Catch exceptions so that following cameras can still run 

1058 try: 

1059 # If all results exist and overwrite is False, skip computation 

1060 expected_outputs_present, expected_outputs = self.assert_expected(self.output_files, silent=True) 

1061 if overwrite is False and expected_outputs_present is True: 

1062 actual_outputs.extend(expected_outputs) 

1063 return actual_outputs 

1064 else: 

1065 file_mp4 = next(self.session_path.joinpath('raw_video_data').glob(f'_iblrig_{cam}Camera.raw*.mp4')) 

1066 if not file_mp4.exists(): 

1067 # In this case we set the status to Incomplete. 

1068 _logger.error(f"No raw video file available for {cam}, skipping.") 

1069 self.status = -3 

1070 continue 

1071 if not self._video_intact(file_mp4): 

1072 _logger.error(f"Corrupt raw video file {file_mp4}") 

1073 self.status = -1 

1074 continue 

1075 # Check that dlc environment is ok, shell scripts exists, and get iblvideo version, GPU addressable 

1076 self.version = self._check_dlcenv() 

1077 _logger.info(f'iblvideo version {self.version}') 

1078 check_nvidia_driver() 

1079 

1080 _logger.info(f'Running DLC on {cam}Camera.') 

1081 command2run = f"{self.scripts.joinpath('run_dlc.sh')} {str(self.dlcenv)} {file_mp4} {overwrite}" 

1082 _logger.info(command2run) 

1083 process = subprocess.Popen( 

1084 command2run, 

1085 shell=True, 

1086 stdout=subprocess.PIPE, 

1087 stderr=subprocess.PIPE, 

1088 executable="/bin/bash", 

1089 ) 

1090 info, error = process.communicate() 

1091 # info_str = info.decode("utf-8").strip() 

1092 # _logger.info(info_str) 

1093 if process.returncode != 0: 

1094 error_str = error.decode("utf-8").strip() 

1095 _logger.error(f'DLC failed for {cam}Camera.\n\n' 

1096 f'++++++++ Output of subprocess for debugging ++++++++\n\n' 

1097 f'{error_str}\n' 

1098 f'++++++++++++++++++++++++++++++++++++++++++++\n') 

1099 self.status = -1 

1100 # We dont' run motion energy, or add any files if dlc failed to run 

1101 continue 

1102 dlc_result = next(self.session_path.joinpath('alf').glob(f'_ibl_{cam}Camera.dlc*.pqt')) 

1103 actual_outputs.append(dlc_result) 

1104 

1105 _logger.info(f'Computing motion energy for {cam}Camera') 

1106 command2run = f"{self.scripts.joinpath('run_motion.sh')} {str(self.dlcenv)} {file_mp4} {dlc_result}" 

1107 _logger.info(command2run) 

1108 process = subprocess.Popen( 

1109 command2run, 

1110 shell=True, 

1111 stdout=subprocess.PIPE, 

1112 stderr=subprocess.PIPE, 

1113 executable="/bin/bash", 

1114 ) 

1115 info, error = process.communicate() 

1116 # info_str = info.decode("utf-8").strip() 

1117 # _logger.info(info_str) 

1118 if process.returncode != 0: 

1119 error_str = error.decode("utf-8").strip() 

1120 _logger.error(f'Motion energy failed for {cam}Camera.\n\n' 

1121 f'++++++++ Output of subprocess for debugging ++++++++\n\n' 

1122 f'{error_str}\n' 

1123 f'++++++++++++++++++++++++++++++++++++++++++++\n') 

1124 self.status = -1 

1125 continue 

1126 actual_outputs.append(next(self.session_path.joinpath('alf').glob( 

1127 f'{cam}Camera.ROIMotionEnergy*.npy'))) 

1128 actual_outputs.append(next(self.session_path.joinpath('alf').glob( 

1129 f'{cam}ROIMotionEnergy.position*.npy'))) 

1130 except BaseException: 

1131 _logger.error(traceback.format_exc()) 

1132 self.status = -1 

1133 continue 

1134 # If status is Incomplete, check that there is at least one output. 

1135 # # Otherwise make sure it gets set to Empty (outputs = None), and set status to -1 to make sure it doesn't slip 

1136 if self.status == -3 and len(actual_outputs) == 0: 

1137 actual_outputs = None 

1138 self.status = -1 

1139 return actual_outputs 

1140 

1141 

1142class EphysPostDLC(tasks.Task): 

1143 """ 

1144 The post_dlc task takes dlc traces as input and computes useful quantities, as well as qc. 

1145 """ 

1146 io_charge = 90 

1147 level = 3 

1148 force = True 

1149 signature = {'input_files': [('_ibl_leftCamera.dlc.pqt', 'alf', True), 

1150 ('_ibl_bodyCamera.dlc.pqt', 'alf', True), 

1151 ('_ibl_rightCamera.dlc.pqt', 'alf', True), 

1152 ('_ibl_rightCamera.times.npy', 'alf', True), 

1153 ('_ibl_leftCamera.times.npy', 'alf', True), 

1154 ('_ibl_bodyCamera.times.npy', 'alf', True), 

1155 # the following are required for the DLC plot only 

1156 # they are not strictly required, some plots just might be skipped 

1157 # In particular the raw videos don't need to be downloaded as they can be streamed 

1158 ('_iblrig_bodyCamera.raw.mp4', 'raw_video_data', True), 

1159 ('_iblrig_leftCamera.raw.mp4', 'raw_video_data', True), 

1160 ('_iblrig_rightCamera.raw.mp4', 'raw_video_data', True), 

1161 ('rightROIMotionEnergy.position.npy', 'alf', False), 

1162 ('leftROIMotionEnergy.position.npy', 'alf', False), 

1163 ('bodyROIMotionEnergy.position.npy', 'alf', False), 

1164 ('_ibl_trials.table.pqt', 'alf', True), 

1165 ('_ibl_wheel.position.npy', 'alf', True), 

1166 ('_ibl_wheel.timestamps.npy', 'alf', True), 

1167 ], 

1168 # More files are required for all panels of the DLC QC plot to function 

1169 'output_files': [('_ibl_leftCamera.features.pqt', 'alf', True), 

1170 ('_ibl_rightCamera.features.pqt', 'alf', True), 

1171 ('licks.times.npy', 'alf', True), 

1172 # ('dlc_qc_plot.png', 'snapshot', False) 

1173 ] 

1174 } 

1175 

1176 def _run(self, overwrite=True, run_qc=True, plot_qc=True): 

1177 """ 

1178 Run the EphysPostDLC task. Returns a list of file locations for the output files in signature. The created plot 

1179 (dlc_qc_plot.png) is not returned, but saved in session_path/snapshots and uploaded to Alyx as a note. 

1180 

1181 :param overwrite: bool, whether to recompute existing output files (default is False). 

1182 Note that the dlc_qc_plot will be (re-)computed even if overwrite = False 

1183 :param run_qc: bool, whether to run the DLC QC (default is True) 

1184 :param plot_qc: book, whether to create the dlc_qc_plot (default is True) 

1185 

1186 """ 

1187 # Check if output files exist locally 

1188 exist, output_files = self.assert_expected(self.signature['output_files'], silent=True) 

1189 if exist and not overwrite: 

1190 _logger.warning('EphysPostDLC outputs exist and overwrite=False, skipping computations of outputs.') 

1191 else: 

1192 if exist and overwrite: 

1193 _logger.warning('EphysPostDLC outputs exist and overwrite=True, overwriting existing outputs.') 

1194 # Find all available dlc files 

1195 dlc_files = list(Path(self.session_path).joinpath('alf').glob('_ibl_*Camera.dlc.*')) 

1196 for dlc_file in dlc_files: 

1197 _logger.debug(dlc_file) 

1198 output_files = [] 

1199 combined_licks = [] 

1200 

1201 for dlc_file in dlc_files: 

1202 # Catch unforeseen exceptions and move on to next cam 

1203 try: 

1204 cam = label_from_path(dlc_file) 

1205 # load dlc trace and camera times 

1206 dlc = pd.read_parquet(dlc_file) 

1207 dlc_thresh = likelihood_threshold(dlc, 0.9) 

1208 # try to load respective camera times 

1209 try: 

1210 dlc_t = np.load(next(Path(self.session_path).joinpath('alf').glob(f'_ibl_{cam}Camera.times.*npy'))) 

1211 times = True 

1212 if dlc_t.shape[0] == 0: 

1213 _logger.error(f'camera.times empty for {cam} camera. ' 

1214 f'Computations using camera.times will be skipped') 

1215 self.status = -1 

1216 times = False 

1217 elif dlc_t.shape[0] < len(dlc_thresh): 

1218 _logger.error(f'Camera times shorter than DLC traces for {cam} camera. ' 

1219 f'Computations using camera.times will be skipped') 

1220 self.status = -1 

1221 times = 'short' 

1222 except StopIteration: 

1223 self.status = -1 

1224 times = False 

1225 _logger.error(f'No camera.times for {cam} camera. ' 

1226 f'Computations using camera.times will be skipped') 

1227 # These features are only computed from left and right cam 

1228 if cam in ('left', 'right'): 

1229 features = pd.DataFrame() 

1230 # If camera times are available, get the lick time stamps for combined array 

1231 if times is True: 

1232 _logger.info(f"Computing lick times for {cam} camera.") 

1233 combined_licks.append(get_licks(dlc_thresh, dlc_t)) 

1234 elif times is False: 

1235 _logger.warning(f"Skipping lick times for {cam} camera as no camera.times available") 

1236 elif times == 'short': 

1237 _logger.warning(f"Skipping lick times for {cam} camera as camera.times are too short") 

1238 # Compute pupil diameter, raw and smoothed 

1239 _logger.info(f"Computing raw pupil diameter for {cam} camera.") 

1240 features['pupilDiameter_raw'] = get_pupil_diameter(dlc_thresh) 

1241 try: 

1242 _logger.info(f"Computing smooth pupil diameter for {cam} camera.") 

1243 features['pupilDiameter_smooth'] = get_smooth_pupil_diameter(features['pupilDiameter_raw'], 

1244 cam) 

1245 except BaseException: 

1246 _logger.error(f"Computing smooth pupil diameter for {cam} camera failed, saving all NaNs.") 

1247 _logger.error(traceback.format_exc()) 

1248 features['pupilDiameter_smooth'] = np.nan 

1249 # Safe to pqt 

1250 features_file = Path(self.session_path).joinpath('alf', f'_ibl_{cam}Camera.features.pqt') 

1251 features.to_parquet(features_file) 

1252 output_files.append(features_file) 

1253 

1254 # For all cams, compute DLC qc if times available 

1255 if run_qc is True and times in [True, 'short']: 

1256 # Setting download_data to False because at this point the data should be there 

1257 qc = DlcQC(self.session_path, side=cam, one=self.one, download_data=False) 

1258 qc.run(update=True) 

1259 else: 

1260 if times is False: 

1261 _logger.warning(f"Skipping QC for {cam} camera as no camera.times available") 

1262 if not run_qc: 

1263 _logger.warning(f"Skipping QC for {cam} camera as run_qc=False") 

1264 

1265 except BaseException: 

1266 _logger.error(traceback.format_exc()) 

1267 self.status = -1 

1268 continue 

1269 

1270 # Combined lick times 

1271 if len(combined_licks) > 0: 

1272 lick_times_file = Path(self.session_path).joinpath('alf', 'licks.times.npy') 

1273 np.save(lick_times_file, sorted(np.concatenate(combined_licks))) 

1274 output_files.append(lick_times_file) 

1275 else: 

1276 _logger.warning("No lick times computed for this session.") 

1277 

1278 if plot_qc: 

1279 _logger.info("Creating DLC QC plot") 

1280 try: 

1281 session_id = self.one.path2eid(self.session_path) 

1282 fig_path = self.session_path.joinpath('snapshot', 'dlc_qc_plot.png') 

1283 if not fig_path.parent.exists(): 

1284 fig_path.parent.mkdir(parents=True, exist_ok=True) 

1285 fig = dlc_qc_plot(self.session_path, one=self.one) 

1286 fig.savefig(fig_path) 

1287 fig.clf() 

1288 snp = ReportSnapshot(self.session_path, session_id, one=self.one) 

1289 snp.outputs = [fig_path] 

1290 snp.register_images(widths=['orig'], 

1291 function=str(dlc_qc_plot.__module__) + '.' + str(dlc_qc_plot.__name__)) 

1292 except BaseException: 

1293 _logger.error('Could not create and/or upload DLC QC Plot') 

1294 _logger.error(traceback.format_exc()) 

1295 self.status = -1 

1296 

1297 return output_files 

1298 

1299 

1300class EphysPassive(tasks.Task): 

1301 cpu = 1 

1302 io_charge = 90 

1303 level = 1 

1304 force = False 

1305 signature = { 

1306 'input_files': [('_iblrig_taskSettings.raw*', 'raw_behavior_data', True), 

1307 ('_spikeglx_sync.channels.*', 'raw_ephys_data*', True), 

1308 ('_spikeglx_sync.polarities.*', 'raw_ephys_data*', True), 

1309 ('_spikeglx_sync.times.*', 'raw_ephys_data*', True), 

1310 ('*.meta', 'raw_ephys_data*', True), 

1311 ('*wiring.json', 'raw_ephys_data*', False), 

1312 ('_iblrig_RFMapStim.raw*', 'raw_passive_data', True)], 

1313 'output_files': [('_ibl_passiveGabor.table.csv', 'alf', True), 

1314 ('_ibl_passivePeriods.intervalsTable.csv', 'alf', True), 

1315 ('_ibl_passiveRFM.times.npy', 'alf', True), 

1316 ('_ibl_passiveStims.table.csv', 'alf', True)]} 

1317 

1318 def _run(self): 

1319 """returns a list of pathlib.Paths. """ 

1320 data, paths = ephys_passive.PassiveChoiceWorld(self.session_path).extract(save=True) 

1321 if any([x is None for x in paths]): 

1322 self.status = -1 

1323 # Register? 

1324 return paths 

1325 

1326 def get_signatures(self, **kwargs): 

1327 neuropixel_version = spikeglx.get_neuropixel_version_from_folder(self.session_path) 1m

1328 probes = spikeglx.get_probes_from_folder(self.session_path) 1m

1329 

1330 full_input_files = [] 1m

1331 for sig in self.signature['input_files']: 1m

1332 if 'raw_ephys_data*' in sig[1]: 1m

1333 if neuropixel_version != '3A': 1m

1334 full_input_files.append((sig[0], 'raw_ephys_data', sig[2])) 1m

1335 for probe in probes: 1m

1336 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1m

1337 else: 

1338 full_input_files.append(sig) 1m

1339 

1340 self.input_files = full_input_files 1m

1341 

1342 self.output_files = self.signature['output_files'] 1m

1343 

1344 

1345class EphysExtractionPipeline(tasks.Pipeline): 

1346 label = __name__ 

1347 

1348 def __init__(self, session_path=None, **kwargs): 

1349 super(EphysExtractionPipeline, self).__init__(session_path, **kwargs) 

1350 tasks = OrderedDict() 

1351 self.session_path = session_path 

1352 # level 0 

1353 tasks['ExperimentDescriptionRegisterRaw'] = base_tasks.ExperimentDescriptionRegisterRaw(self.session_path) 

1354 tasks["EphysRegisterRaw"] = tpp.TrainingRegisterRaw(self.session_path) 

1355 tasks["EphysPulses"] = EphysPulses(self.session_path) 

1356 tasks["EphysRawQC"] = RawEphysQC(self.session_path) 

1357 tasks["EphysAudio"] = EphysAudio(self.session_path) 

1358 tasks["EphysMtscomp"] = EphysMtscomp(self.session_path) 

1359 tasks['EphysVideoCompress'] = EphysVideoCompress(self.session_path) 

1360 # level 1 

1361 tasks["SpikeSorting"] = SpikeSorting( 

1362 self.session_path, parents=[tasks["EphysMtscomp"], tasks["EphysPulses"]]) 

1363 tasks["EphysTrials"] = EphysTrials(self.session_path, parents=[tasks["EphysPulses"]]) 

1364 

1365 tasks["EphysPassive"] = EphysPassive(self.session_path, parents=[tasks["EphysPulses"]]) 

1366 # level 2 

1367 tasks["EphysVideoSyncQc"] = EphysVideoSyncQc( 

1368 self.session_path, parents=[tasks["EphysVideoCompress"], tasks["EphysPulses"], tasks["EphysTrials"]]) 

1369 tasks["EphysCellsQc"] = EphysCellsQc(self.session_path, parents=[tasks["SpikeSorting"]]) 

1370 tasks["EphysDLC"] = EphysDLC(self.session_path, parents=[tasks["EphysVideoCompress"]]) 

1371 tasks['EphysTrainingStatus'] = tpp.TrainingStatus(self.session_path, parents=[tasks["EphysTrials"]]) 

1372 # level 3 

1373 tasks["EphysPostDLC"] = EphysPostDLC(self.session_path, parents=[tasks["EphysDLC"], tasks["EphysTrials"], 

1374 tasks["EphysVideoSyncQc"]]) 

1375 self.tasks = tasks