Coverage for ibllib/pipes/ephys_preprocessing.py: 78%

767 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-10-11 11:13 +0100

1import logging 

2import re 

3import shutil 

4import subprocess 

5from collections import OrderedDict 

6import traceback 

7from pathlib import Path 

8import warnings 

9 

10import cv2 

11import numpy as np 

12import pandas as pd 

13import packaging.version 

14 

15import one.alf.io as alfio 

16from neurodsp.utils import rms 

17import spikeglx 

18 

19from ibllib.misc import check_nvidia_driver 

20from ibllib.ephys import ephysqc, spikes, sync_probes 

21from ibllib.io import ffmpeg 

22from ibllib.io.video import label_from_path, assert_valid_label 

23from ibllib.io.extractors import ephys_fpga, ephys_passive, camera 

24from ibllib.pipes import tasks, base_tasks 

25import ibllib.pipes.training_preprocessing as tpp 

26from ibllib.pipes.misc import create_alyx_probe_insertions 

27from ibllib.qc.alignment_qc import get_aligned_channels 

28from ibllib.qc.task_extractors import TaskQCExtractor 

29from ibllib.qc.task_metrics import TaskQC 

30from ibllib.qc.camera import run_all_qc as run_camera_qc 

31from ibllib.qc.dlc import DlcQC 

32from ibllib.plots.figures import dlc_qc_plot, BehaviourPlots, LfpPlots, ApPlots, BadChannelsAp 

33from ibllib.plots.figures import SpikeSorting as SpikeSortingPlots 

34from ibllib.plots.snapshot import ReportSnapshot 

35from brainbox.behavior.dlc import likelihood_threshold, get_licks, get_pupil_diameter, get_smooth_pupil_diameter 

36 

37_logger = logging.getLogger("ibllib") 

38warnings.warn('`pipes.training_preprocessing` to be removed in favour of dynamic pipeline') 

39 

40 

41# level 0 

42class EphysPulses(tasks.Task): 

43 """ 

44 Extract Pulses from raw electrophysiology data into numpy arrays 

45 Perform the probes synchronisation with nidq (3B) or main probe (3A) 

46 """ 

47 cpu = 2 

48 io_charge = 30 # this jobs reads raw ap files 

49 priority = 90 # a lot of jobs depend on this one 

50 level = 0 # this job doesn't depend on anything 

51 force = False # whether or not to force download of missing data on local server if outputs already exist 

52 signature = { 

53 'input_files': [('*ap.meta', 'raw_ephys_data/probe*', True), 

54 ('*ap.ch', 'raw_ephys_data/probe*', False), # not necessary when we have .bin file 

55 ('*ap.*bin', 'raw_ephys_data/probe*', True), 

56 ('*nidq.meta', 'raw_ephys_data', True), 

57 ('*nidq.ch', 'raw_ephys_data', False), # not necessary when we have .bin file 

58 ('*nidq.*bin', 'raw_ephys_data', True)], 

59 'output_files': [('_spikeglx_sync*.npy', 'raw_ephys_data*', True), 

60 ('_spikeglx_sync.polarities*.npy', 'raw_ephys_data*', True), 

61 ('_spikeglx_sync.times*.npy', 'raw_ephys_data*', True)] 

62 } 

63 

64 def get_signatures(self, **kwargs): 

65 """ 

66 Find the input and output signatures specific for local filesystem 

67 :return: 

68 """ 

69 neuropixel_version = spikeglx.get_neuropixel_version_from_folder(self.session_path) 1al

70 probes = spikeglx.get_probes_from_folder(self.session_path) 1al

71 

72 full_input_files = [] 1al

73 for sig in self.signature['input_files']: 1al

74 if 'raw_ephys_data/probe*' in sig[1]: 1al

75 for probe in probes: 1al

76 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1al

77 else: 

78 if neuropixel_version != '3A': 1al

79 full_input_files.append((sig[0], sig[1], sig[2])) 1al

80 

81 self.input_files = full_input_files 1al

82 

83 full_output_files = [] 1al

84 for sig in self.signature['output_files']: 1al

85 if neuropixel_version != '3A': 1al

86 full_output_files.append((sig[0], 'raw_ephys_data', sig[2])) 1al

87 for probe in probes: 1al

88 full_output_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1al

89 

90 self.output_files = full_output_files 1al

91 

92 def _run(self, overwrite=False): 

93 # outputs numpy 

94 syncs, out_files = ephys_fpga.extract_sync(self.session_path, overwrite=overwrite) 1a

95 for out_file in out_files: 1a

96 _logger.info(f"extracted pulses for {out_file}") 1a

97 

98 status, sync_files = sync_probes.sync(self.session_path) 1a

99 return out_files + sync_files 1a

100 

101 

102class RawEphysQC(tasks.Task): 

103 """ 

104 Computes raw electrophysiology QC 

105 """ 

106 cpu = 2 

107 io_charge = 30 # this jobs reads raw ap files 

108 priority = 10 # a lot of jobs depend on this one 

109 level = 0 # this job doesn't depend on anything 

110 force = False 

111 signature = { 

112 'input_files': [('*ap.meta', 'raw_ephys_data/probe*', True), 

113 ('*lf.meta', 'raw_ephys_data/probe*', True), # not necessary to run task as optional computation 

114 ('*lf.ch', 'raw_ephys_data/probe*', False), # not required it .bin file 

115 ('*lf.*bin', 'raw_ephys_data/probe*', True)], # not necessary to run task as optional computation 

116 'output_files': [('_iblqc_ephysChannels.apRMS.npy', 'raw_ephys_data/probe*', True), 

117 ('_iblqc_ephysChannels.rawSpikeRates.npy', 'raw_ephys_data/probe*', True), 

118 ('_iblqc_ephysChannels.labels.npy', 'raw_ephys_data/probe*', True), 

119 ('_iblqc_ephysSpectralDensityLF.freqs.npy', 'raw_ephys_data/probe*', True), 

120 ('_iblqc_ephysSpectralDensityLF.power.npy', 'raw_ephys_data/probe*', True), 

121 ('_iblqc_ephysSpectralDensityAP.freqs.npy', 'raw_ephys_data/probe*', True), 

122 ('_iblqc_ephysSpectralDensityAP.power.npy', 'raw_ephys_data/probe*', True), 

123 ('_iblqc_ephysTimeRmsLF.rms.npy', 'raw_ephys_data/probe*', True), 

124 ('_iblqc_ephysTimeRmsLF.timestamps.npy', 'raw_ephys_data/probe*', True)] 

125 } 

126 

127 def _run(self, overwrite=False): 

128 eid = self.one.path2eid(self.session_path) 1a

129 probes = [(x['id'], x['name']) for x in self.one.alyx.rest('insertions', 'list', session=eid)] 1a

130 # Usually there should be two probes, if there are less, check if all probes are registered 

131 if len(probes) < 2: 1a

132 _logger.warning(f"{len(probes)} probes registered for session {eid}, trying to register from local data") 1a

133 probes = [(p['id'], p['name']) for p in create_alyx_probe_insertions(self.session_path, one=self.one)] 1a

134 qc_files = [] 1a

135 for pid, pname in probes: 1a

136 _logger.info(f"\nRunning QC for probe insertion {pname}") 1a

137 try: 1a

138 eqc = ephysqc.EphysQC(pid, session_path=self.session_path, one=self.one) 1a

139 qc_files.extend(eqc.run(update=True, overwrite=overwrite)) 1a

140 _logger.info("Creating LFP QC plots") 1a

141 plot_task = LfpPlots(pid, session_path=self.session_path, one=self.one) 1a

142 _ = plot_task.run() 1a

143 self.plot_tasks.append(plot_task) 1a

144 plot_task = BadChannelsAp(pid, session_path=self.session_path, one=self.one) 1a

145 _ = plot_task.run() 1a

146 self.plot_tasks.append(plot_task) 1a

147 

148 except AssertionError: 

149 _logger.error(traceback.format_exc()) 

150 self.status = -1 

151 continue 

152 return qc_files 1a

153 

154 def get_signatures(self, **kwargs): 

155 probes = spikeglx.get_probes_from_folder(self.session_path) 1ah

156 

157 full_output_files = [] 1ah

158 for sig in self.signature['output_files']: 1ah

159 if 'raw_ephys_data/probe*' in sig[1]: 1ah

160 for probe in probes: 1ah

161 full_output_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1ah

162 

163 self.output_files = full_output_files 1ah

164 

165 # input lf signature required or not required status is going to depend on the output we have, need to be agile here to 

166 # avoid unnecessary downloading of lf.cbin files 

167 expected_count = 0 1ah

168 count = 0 1ah

169 # check to see if we have lfp qc datasets 

170 for expected_file in full_output_files: 1ah

171 if 'LF' in expected_file[0]: 1ah

172 expected_count += 1 1ah

173 actual_files = list(Path(self.session_path).rglob(str(Path(expected_file[1]).joinpath(expected_file[0])))) 1ah

174 if len(actual_files) == 1: 1ah

175 count += 1 1ah

176 

177 lf_required = False if count == expected_count else True 1ah

178 

179 full_input_files = [] 1ah

180 for sig in self.signature['input_files']: 1ah

181 if 'raw_ephys_data/probe*' in sig[1]: 1ah

182 for probe in probes: 1ah

183 if 'lf' in sig[0]: 1ah

184 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', lf_required if sig[2] else sig[2])) 1ah

185 else: 

186 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1ah

187 

188 self.input_files = full_input_files 1ah

189 

190 

191class EphysAudio(tasks.Task): 

192 """ 

193 Compresses the microphone wav file in a lossless flac file 

194 """ 

195 # TESTS DONE 

196 cpu = 2 

197 priority = 10 # a lot of jobs depend on this one 

198 level = 0 # this job doesn't depend on anything 

199 force = False 

200 signature = { 

201 'input_files': [('_iblrig_micData.raw.wav', 'raw_behavior_data', True)], 

202 'output_files': [('_iblrig_micData.raw.flac', 'raw_behavior_data', True)], 

203 } 

204 

205 def _run(self, overwrite=False): 

206 command = "ffmpeg -i {file_in} -y -nostdin -c:a flac -nostats {file_out}" 1ac

207 file_in = next(self.session_path.rglob("_iblrig_micData.raw.wav"), None) 1ac

208 if file_in is None: 1ac

209 return 

210 file_out = file_in.with_suffix(".flac") 1ac

211 status, output_file = ffmpeg.compress(file_in=file_in, file_out=file_out, command=command) 1ac

212 return [output_file] 1ac

213 

214 

215class SpikeSorting(tasks.Task): 

216 """ 

217 Pykilosort 2.5 pipeline 

218 """ 

219 gpu = 1 

220 io_charge = 100 # this jobs reads raw ap files 

221 priority = 60 

222 level = 1 # this job doesn't depend on anything 

223 force = True 

224 job_size = 'large' 

225 

226 SHELL_SCRIPT = Path.home().joinpath( 

227 "Documents/PYTHON/iblscripts/deploy/serverpc/kilosort2/run_pykilosort.sh" 

228 ) 

229 SPIKE_SORTER_NAME = 'pykilosort' 

230 PYKILOSORT_REPO = Path.home().joinpath('Documents/PYTHON/SPIKE_SORTING/pykilosort') 

231 signature = { 

232 'input_files': [], # see setUp method for declaration of inputs 

233 'output_files': [] # see setUp method for declaration of inputs 

234 } 

235 

236 @staticmethod 

237 def spike_sorting_signature(pname=None): 

238 pname = pname if pname is not None else "probe*" 1ag

239 input_signature = [('*ap.meta', f'raw_ephys_data/{pname}', True), 1ag

240 ('*ap.ch', f'raw_ephys_data/{pname}', True), 

241 ('*ap.cbin', f'raw_ephys_data/{pname}', True), 

242 ('*nidq.meta', 'raw_ephys_data', False), 

243 ('_spikeglx_sync.channels.*', 'raw_ephys_data*', True), 

244 ('_spikeglx_sync.polarities.*', 'raw_ephys_data*', True), 

245 ('_spikeglx_sync.times.*', 'raw_ephys_data*', True), 

246 ('_iblrig_taskData.raw.*', 'raw_behavior_data', True), 

247 ('_iblrig_taskSettings.raw.*', 'raw_behavior_data', True)] 

248 output_signature = [('spike_sorting_pykilosort.log', f'spike_sorters/pykilosort/{pname}', True), 1ag

249 ('_iblqc_ephysTimeRmsAP.rms.npy', f'raw_ephys_data/{pname}', True), # new ibllib 2.5 

250 ('_iblqc_ephysTimeRmsAP.timestamps.npy', f'raw_ephys_data/{pname}', True)] # new ibllib 2.5 

251 return input_signature, output_signature 1ag

252 

253 @staticmethod 

254 def _sample2v(ap_file): 

255 md = spikeglx.read_meta_data(ap_file.with_suffix(".meta")) 1a

256 s2v = spikeglx._conversion_sample2v_from_meta(md) 1a

257 return s2v["ap"][0] 1a

258 

259 @staticmethod 

260 def _fetch_pykilosort_version(repo_path): 

261 init_file = Path(repo_path).joinpath('pykilosort', '__init__.py') 1a

262 version = SpikeSorting._fetch_ks2_commit_hash(repo_path) # default 1a

263 try: 1a

264 with open(init_file) as fid: 1a

265 lines = fid.readlines() 1a

266 for line in lines: 1a

267 if line.startswith("__version__ = "): 1a

268 version = line.split('=')[-1].strip().replace('"', '').replace("'", '') 1a

269 except Exception: 

270 pass 

271 return f"pykilosort_{version}" 1a

272 

273 @staticmethod 

274 def _fetch_pykilosort_run_version(log_file): 

275 """ 

276 Parse the following line (2 formats depending on version) from the log files to get the version 

277 '\x1b[0m15:39:37.919 [I] ibl:90 Starting Pykilosort version ibl_1.2.1, output in gnagga^[[0m\n' 

278 '\x1b[0m15:39:37.919 [I] ibl:90 Starting Pykilosort version ibl_1.3.0^[[0m\n' 

279 """ 

280 with open(log_file) as fid: 1a

281 line = fid.readline() 1a

282 version = re.search('version (.*), output', line) 1a

283 version = version or re.search('version (.*)', line) # old versions have output, new have a version line 1a

284 version = re.sub('\\^[[0-9]+m', '', version.group(1)) # removes the coloring tags 1a

285 return f"pykilosort_{version}" 1a

286 

287 @staticmethod 

288 def _fetch_ks2_commit_hash(repo_path): 

289 command2run = f"git --git-dir {repo_path}/.git rev-parse --verify HEAD" 1a

290 process = subprocess.Popen( 1a

291 command2run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE 

292 ) 

293 info, error = process.communicate() 1a

294 if process.returncode != 0: 1a

295 _logger.error( 

296 f"Can't fetch pykilsort commit hash, will still attempt to run \n" 

297 f"Error: {error.decode('utf-8')}" 

298 ) 

299 return "" 

300 return info.decode("utf-8").strip() 1a

301 

302 @staticmethod 

303 def parse_version(v) -> packaging.version.Version: 

304 """ 

305 Extracts and parses semantic version (major.minor.patch) from a version string. 

306 

307 Parameters 

308 ---------- 

309 v : str 

310 A version string containing a semantic version. 

311 

312 Returns 

313 ------- 

314 packaging.version.Version 

315 The parsed semantic version number 

316 

317 Examples 

318 -------- 

319 >>> SpikeSorting.parse_version('ibl_1.2') 

320 <Version('1.2')> 

321 >>> SpikeSorting.parse_version('pykilosort_ibl_1.2.0-new') 

322 <Version('1.2.0')> 

323 >>> SpikeSorting.parse_version('ibl_0.2') < SpikeSorting.parse_version('pykilosort_v1') 

324 True 

325 """ 

326 m = re.search(r'(\d+\.?){1,3}', v) 1qa

327 if not m: 1qa

328 raise packaging.version.InvalidVersion(f'Failed to detect SemVer in "{v}"') 1q

329 return packaging.version.parse(m.group()) 1qa

330 

331 def setUp(self, probes=None): 

332 """ 

333 Overwrite setup method to allow inputs and outputs to be only one probe 

334 :param probes: list of probes e.g ['probe00'] 

335 :return: 

336 """ 

337 if not probes or len(probes) == 2: 1a

338 self.signature['input_files'], self.signature['output_files'] = self.spike_sorting_signature() 1a

339 else: 

340 self.signature['input_files'], self.signature['output_files'] = self.spike_sorting_signature(probes[0]) 

341 

342 return super().setUp(probes=probes) 1a

343 

344 def _run_pykilosort(self, ap_file): 

345 """ 

346 Runs the ks2 matlab spike sorting for one probe dataset 

347 the raw spike sorting output is in session_path/spike_sorters/{self.SPIKE_SORTER_NAME}/probeXX folder 

348 (discontinued support for old spike sortings in the probe folder <1.5.5) 

349 :return: path of the folder containing ks2 spike sorting output 

350 """ 

351 self.version = self._fetch_pykilosort_version(self.PYKILOSORT_REPO) 1a

352 label = ap_file.parts[-2] # this is usually the probe name 1a

353 sorter_dir = self.session_path.joinpath("spike_sorters", self.SPIKE_SORTER_NAME, label) 1a

354 self.FORCE_RERUN = False 1a

355 if not self.FORCE_RERUN: 1a

356 log_file = sorter_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log") 1a

357 if log_file.exists(): 1a

358 run_version = self._fetch_pykilosort_run_version(log_file) 1a

359 if self.parse_version(run_version) > self.parse_version('pykilosort_ibl_1.1.0'): 1a

360 _logger.info(f"Already ran: spike_sorting_{self.SPIKE_SORTER_NAME}.log" 1a

361 f" found in {sorter_dir}, skipping.") 

362 return sorter_dir 1a

363 else: 

364 self.FORCE_RERUN = True 

365 

366 print(sorter_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log")) 

367 # get the scratch drive from the shell script 

368 with open(self.SHELL_SCRIPT) as fid: 

369 lines = fid.readlines() 

370 line = [line for line in lines if line.startswith("SCRATCH_DRIVE=")][0] 

371 m = re.search(r"\=(.*?)(\#|\n)", line)[0] 

372 scratch_drive = Path(m[1:-1].strip()) 

373 assert scratch_drive.exists() 

374 

375 # clean up and create directory, this also checks write permissions 

376 # temp_dir has the following shape: pykilosort/ZM_3003_2020-07-29_001_probe00 

377 # first makes sure the tmp dir is clean 

378 shutil.rmtree(scratch_drive.joinpath(self.SPIKE_SORTER_NAME), ignore_errors=True) 

379 temp_dir = scratch_drive.joinpath( 

380 self.SPIKE_SORTER_NAME, "_".join(list(self.session_path.parts[-3:]) + [label]) 

381 ) 

382 if temp_dir.exists(): # hmmm this has to be decided, we may want to restart ? 

383 # But failed sessions may then clog the scratch dir and have users run out of space 

384 shutil.rmtree(temp_dir, ignore_errors=True) 

385 temp_dir.mkdir(parents=True, exist_ok=True) 

386 

387 check_nvidia_driver() 

388 command2run = f"{self.SHELL_SCRIPT} {ap_file} {temp_dir}" 

389 _logger.info(command2run) 

390 process = subprocess.Popen( 

391 command2run, 

392 shell=True, 

393 stdout=subprocess.PIPE, 

394 stderr=subprocess.PIPE, 

395 executable="/bin/bash", 

396 ) 

397 info, error = process.communicate() 

398 info_str = info.decode("utf-8").strip() 

399 _logger.info(info_str) 

400 if process.returncode != 0: 

401 error_str = error.decode("utf-8").strip() 

402 # try and get the kilosort log if any 

403 for log_file in temp_dir.rglob('*_kilosort.log'): 

404 with open(log_file) as fid: 

405 log = fid.read() 

406 _logger.error(log) 

407 break 

408 raise RuntimeError(f"{self.SPIKE_SORTER_NAME} {info_str}, {error_str}") 

409 

410 shutil.copytree(temp_dir.joinpath('output'), sorter_dir, dirs_exist_ok=True) 

411 shutil.rmtree(temp_dir, ignore_errors=True) 

412 return sorter_dir 

413 

414 def _run(self, probes=None): 

415 """ 

416 Multiple steps. For each probe: 

417 - Runs ks2 (skips if it already ran) 

418 - synchronize the spike sorting 

419 - output the probe description files 

420 :param probes: (list of str) if provided, will only run spike sorting for specified probe names 

421 :return: list of files to be registered on database 

422 """ 

423 efiles = spikeglx.glob_ephys_files(self.session_path) 1a

424 ap_files = [(ef.get("ap"), ef.get("label")) for ef in efiles if "ap" in ef.keys()] 1a

425 out_files = [] 1a

426 for ap_file, label in ap_files: 1a

427 if isinstance(probes, list) and label not in probes: 1a

428 continue 

429 try: 1a

430 # if the file is part of a sequence, handles the run accordingly 

431 sequence_file = ap_file.parent.joinpath(ap_file.stem.replace('ap', 'sequence.json')) 1a

432 # temporary just skips for now 

433 if sequence_file.exists(): 1a

434 continue 

435 ks2_dir = self._run_pykilosort(ap_file) # runs ks2, skips if it already ran 1a

436 probe_out_path = self.session_path.joinpath("alf", label, self.SPIKE_SORTER_NAME) 1a

437 shutil.rmtree(probe_out_path, ignore_errors=True) 1a

438 probe_out_path.mkdir(parents=True, exist_ok=True) 1a

439 spikes.ks2_to_alf( 1a

440 ks2_dir, 

441 bin_path=ap_file.parent, 

442 out_path=probe_out_path, 

443 bin_file=ap_file, 

444 ampfactor=self._sample2v(ap_file), 

445 ) 

446 logfile = ks2_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log") 1a

447 if logfile.exists(): 1a

448 shutil.copyfile(logfile, probe_out_path.joinpath( 1a

449 f"_ibl_log.info_{self.SPIKE_SORTER_NAME}.log")) 

450 out, _ = spikes.sync_spike_sorting(ap_file=ap_file, out_path=probe_out_path) 1a

451 out_files.extend(out) 1a

452 # convert ks2_output into tar file and also register 

453 # Make this in case spike sorting is in old raw_ephys_data folders, for new 

454 # sessions it should already exist 

455 tar_dir = self.session_path.joinpath( 1a

456 'spike_sorters', self.SPIKE_SORTER_NAME, label) 

457 tar_dir.mkdir(parents=True, exist_ok=True) 1a

458 out = spikes.ks2_to_tar(ks2_dir, tar_dir, force=self.FORCE_RERUN) 1a

459 out_files.extend(out) 1a

460 

461 if self.one: 1a

462 eid = self.one.path2eid(self.session_path, query_type='remote') 1a

463 ins = self.one.alyx.rest('insertions', 'list', session=eid, name=label, query_type='remote') 1a

464 if len(ins) != 0: 1a

465 _logger.info("Creating SpikeSorting QC plots") 1a

466 plot_task = ApPlots(ins[0]['id'], session_path=self.session_path, one=self.one) 1a

467 _ = plot_task.run() 1a

468 self.plot_tasks.append(plot_task) 1a

469 

470 plot_task = SpikeSortingPlots(ins[0]['id'], session_path=self.session_path, one=self.one) 1a

471 _ = plot_task.run(collection=str(probe_out_path.relative_to(self.session_path))) 1a

472 self.plot_tasks.append(plot_task) 1a

473 

474 resolved = ins[0].get('json', {'temp': 0}).get('extended_qc', {'temp': 0}). \ 1a

475 get('alignment_resolved', False) 

476 if resolved: 1a

477 chns = np.load(probe_out_path.joinpath('channels.localCoordinates.npy')) 

478 out = get_aligned_channels(ins[0], chns, one=self.one, save_dir=probe_out_path) 

479 out_files.extend(out) 

480 

481 except Exception: 

482 _logger.error(traceback.format_exc()) 

483 self.status = -1 

484 continue 

485 probe_files = spikes.probes_description(self.session_path, one=self.one) 1a

486 return out_files + probe_files 1a

487 

488 def get_signatures(self, probes=None, **kwargs): 

489 """ 

490 This transforms all wildcards in collection to exact match 

491 :param probes: 

492 :return: 

493 """ 

494 

495 neuropixel_version = spikeglx.get_neuropixel_version_from_folder(self.session_path) 1ag

496 probes = probes or spikeglx.get_probes_from_folder(self.session_path) 1ag

497 

498 full_input_files = [] 1ag

499 for sig in self.signature['input_files']: 1ag

500 if 'raw_ephys_data*' in sig[1]: 1ag

501 if neuropixel_version != '3A': 1ag

502 full_input_files.append((sig[0], 'raw_ephys_data', sig[2])) 1ag

503 for probe in probes: 1ag

504 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1ag

505 

506 elif 'raw_ephys_data/probe*' in sig[1]: 1ag

507 for probe in probes: 1ag

508 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1ag

509 elif 'raw_ephys_data' in sig[1]: 1ag

510 if neuropixel_version != '3A': 1ag

511 full_input_files.append((sig[0], 'raw_ephys_data', sig[2])) 1ag

512 else: 

513 full_input_files.append(sig) 1ag

514 

515 self.input_files = full_input_files 1ag

516 

517 full_output_files = [] 1ag

518 for sig in self.signature['output_files']: 1ag

519 if 'probe*' in sig[1]: 1ag

520 for probe in probes: 1ag

521 col = sig[1].split('/')[:-1] + [probe] 1ag

522 full_output_files.append((sig[0], '/'.join(col), sig[2])) 1ag

523 else: 

524 full_input_files.append(sig) 

525 

526 self.output_files = full_output_files 1ag

527 

528 

529class EphysVideoCompress(tasks.Task): 

530 priority = 90 

531 level = 0 

532 force = False 

533 job_size = 'large' 

534 io_charge = 100 

535 

536 signature = { 

537 'input_files': [('_iblrig_*Camera.raw.*', 'raw_video_data', True)], 

538 'output_files': [('_iblrig_*Camera.raw.mp4', 'raw_video_data', True)] 

539 } 

540 

541 def _run(self, **kwargs): 

542 # avi to mp4 compression 

543 command = ('ffmpeg -i {file_in} -y -nostdin -codec:v libx264 -preset slow -crf 17 ' 1dac

544 '-loglevel 0 -codec:a copy {file_out}') 

545 output_files = ffmpeg.iblrig_video_compression(self.session_path, command) 1dac

546 

547 if len(output_files) == 0: 1dac

548 _logger.info('No compressed videos found') 

549 return 

550 

551 return output_files 1dac

552 

553 def get_signatures(self, **kwargs): 

554 # need to detect the number of cameras 

555 output_files = Path(self.session_path).joinpath('raw_video_data').glob('*') 1damc

556 labels = {label_from_path(x) for x in output_files} 1damc

557 

558 full_input_files = [] 1damc

559 for sig in self.signature['input_files']: 1damc

560 for label in labels: 1damc

561 full_input_files.append((sig[0].replace('*Camera', f'{label}Camera'), sig[1], sig[2])) 1damc

562 

563 self.input_files = full_input_files 1damc

564 

565 full_output_files = [] 1damc

566 for sig in self.signature['output_files']: 1damc

567 for label in labels: 1damc

568 full_output_files.append((sig[0].replace('*Camera', f'{label}Camera'), sig[1], sig[2])) 1damc

569 

570 self.output_files = full_output_files 1damc

571 

572 

573class EphysVideoSyncQc(tasks.Task): 

574 priority = 40 

575 level = 2 

576 force = True 

577 signature = { 

578 'input_files': [('_iblrig_*Camera.raw.mp4', 'raw_video_data', True), 

579 ('_iblrig_*Camera.timestamps.ssv', 'raw_video_data', False), 

580 ('_iblrig_*Camera.timestamps.npy', 'raw_video_data', False), 

581 ('_iblrig_*Camera.frameData.bin', 'raw_video_data', False), 

582 ('_iblrig_*Camera.GPIO.bin', 'raw_video_data', False), 

583 ('_iblrig_*Camera.frame_counter.bin', 'raw_video_data', False), 

584 ('_iblrig_taskData.raw.*', 'raw_behavior_data', True), 

585 ('_iblrig_taskSettings.raw.*', 'raw_behavior_data', True), 

586 ('_spikeglx_sync.channels.*', 'raw_ephys_data*', True), 

587 ('_spikeglx_sync.polarities.*', 'raw_ephys_data*', True), 

588 ('_spikeglx_sync.times.*', 'raw_ephys_data*', True), 

589 ('*wheel.position.npy', 'alf', False), 

590 ('*wheel.timestamps.npy', 'alf', False), 

591 ('*wiring.json', 'raw_ephys_data*', False), 

592 ('*.meta', 'raw_ephys_data*', True)], 

593 

594 'output_files': [('_ibl_*Camera.times.npy', 'alf', True)] 

595 } 

596 

597 def _run(self, **kwargs): 

598 

599 mp4_files = self.session_path.joinpath('raw_video_data').rglob('*.mp4') 1dac

600 labels = [label_from_path(x) for x in mp4_files] 1dac

601 # Video timestamps extraction 

602 output_files = [] 1dac

603 data, files = camera.extract_all(self.session_path, save=True, labels=labels) 1dac

604 output_files.extend(files) 1dac

605 

606 # Video QC 

607 run_camera_qc(self.session_path, update=True, one=self.one, cameras=labels) 1dac

608 

609 return output_files 1dac

610 

611 def get_signatures(self, **kwargs): 

612 neuropixel_version = spikeglx.get_neuropixel_version_from_folder(self.session_path) 1dajc

613 probes = spikeglx.get_probes_from_folder(self.session_path) 1dajc

614 # need to detect the number of cameras 

615 output_files = Path(self.session_path).joinpath('raw_video_data').rglob('*') 1dajc

616 labels = np.unique([label_from_path(x) for x in output_files]) 1dajc

617 

618 full_input_files = [] 1dajc

619 for sig in self.signature['input_files']: 1dajc

620 if 'raw_ephys_data*' in sig[1]: 1dajc

621 if neuropixel_version != '3A': 1dajc

622 full_input_files.append((sig[0], 'raw_ephys_data', sig[2])) 1daj

623 for probe in probes: 1dajc

624 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1daj

625 elif 'Camera' in sig[0]: 1dajc

626 for lab in labels: 1dajc

627 full_input_files.append((sig[0].replace('*Camera', f'{lab}Camera'), sig[1], sig[2])) 1dajc

628 else: 

629 full_input_files.append((sig[0], sig[1], sig[2])) 1dajc

630 

631 self.input_files = full_input_files 1dajc

632 

633 full_output_files = [] 1dajc

634 for sig in self.signature['output_files']: 1dajc

635 if 'raw_ephys_data*' in sig[1]: 1dajc

636 if neuropixel_version != '3A': 

637 full_output_files.append((sig[0], 'raw_ephys_data', sig[2])) 

638 for probe in probes: 

639 full_output_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 

640 elif 'Camera' in sig[0]: 1dajc

641 for lab in labels: 1dajc

642 full_output_files.append((sig[0].replace('*Camera', f'{lab}Camera'), sig[1], sig[2])) 1dajc

643 else: 

644 full_output_files.append((sig[0], sig[1], sig[2])) 

645 

646 self.output_files = full_output_files 1dajc

647 

648 

649# level 1 

650class EphysTrials(tasks.Task): 

651 priority = 90 

652 level = 1 

653 force = False 

654 signature = { 

655 'input_files': [('_iblrig_taskData.raw.*', 'raw_behavior_data', True), 

656 ('_iblrig_taskSettings.raw.*', 'raw_behavior_data', True), 

657 ('_spikeglx_sync.channels.*', 'raw_ephys_data*', True), 

658 ('_spikeglx_sync.polarities.*', 'raw_ephys_data*', True), 

659 ('_spikeglx_sync.times.*', 'raw_ephys_data*', True), 

660 ('_iblrig_encoderEvents.raw*', 'raw_behavior_data', True), 

661 ('_iblrig_encoderPositions.raw*', 'raw_behavior_data', True), 

662 ('*wiring.json', 'raw_ephys_data*', False), 

663 ('*.meta', 'raw_ephys_data*', True)], 

664 'output_files': [('*trials.table.pqt', 'alf', True), 

665 ('*trials.goCueTrigger_times.npy', 'alf', True), 

666 ('*trials.intervals_bpod.npy', 'alf', True), 

667 ('*trials.stimOff_times.npy', 'alf', True), 

668 ('*trials.quiescencePeriod.npy', 'alf', True), 

669 ('*wheel.position.npy', 'alf', True), 

670 ('*wheel.timestamps.npy', 'alf', True), 

671 ('*wheelMoves.intervals.npy', 'alf', True), 

672 ('*wheelMoves.peakAmplitude.npy', 'alf', True)] 

673 } 

674 

675 def _behaviour_criterion(self): 

676 """ 

677 Computes and update the behaviour criterion on Alyx 

678 """ 

679 from brainbox.behavior import training 1a

680 

681 trials = alfio.load_object(self.session_path.joinpath("alf"), "trials") 1a

682 good_enough = training.criterion_delay( 1a

683 n_trials=trials["intervals"].shape[0], 

684 perf_easy=training.compute_performance_easy(trials), 

685 ) 

686 eid = self.one.path2eid(self.session_path, query_type='remote') 1a

687 self.one.alyx.json_field_update( 1a

688 "sessions", eid, "extended_qc", {"behavior": int(good_enough)} 

689 ) 

690 

691 def _extract_behaviour(self): 

692 dsets, out_files = ephys_fpga.extract_all(self.session_path, save=True) 1ai

693 

694 return dsets, out_files 1ai

695 

696 def _run(self, plot_qc=True): 

697 dsets, out_files = self._extract_behaviour() 1ai

698 

699 if not self.one or self.one.offline: 1ai

700 return out_files 1i

701 

702 self._behaviour_criterion() 1a

703 # Run the task QC 

704 qc = TaskQC(self.session_path, one=self.one, log=_logger) 1a

705 qc.extractor = TaskQCExtractor(self.session_path, lazy=True, one=qc.one) 1a

706 # Extract extra datasets required for QC 

707 qc.extractor.data = dsets 1a

708 qc.extractor.extract_data() 1a

709 # Aggregate and update Alyx QC fields 

710 qc.run(update=True) 1a

711 

712 if plot_qc: 1a

713 _logger.info("Creating Trials QC plots") 1a

714 try: 1a

715 session_id = self.one.path2eid(self.session_path) 1a

716 plot_task = BehaviourPlots(session_id, self.session_path, one=self.one) 1a

717 _ = plot_task.run() 1a

718 self.plot_tasks.append(plot_task) 1a

719 except Exception: 

720 _logger.error('Could not create Trials QC Plot') 

721 _logger.error(traceback.format_exc()) 

722 self.status = -1 

723 

724 return out_files 1a

725 

726 def get_signatures(self, **kwargs): 

727 neuropixel_version = spikeglx.get_neuropixel_version_from_folder(self.session_path) 1ani

728 probes = spikeglx.get_probes_from_folder(self.session_path) 1ani

729 

730 full_input_files = [] 1ani

731 for sig in self.signature['input_files']: 1ani

732 if 'raw_ephys_data*' in sig[1]: 1ani

733 if neuropixel_version != '3A': 1ani

734 full_input_files.append((sig[0], 'raw_ephys_data', sig[2])) 1ani

735 for probe in probes: 1ani

736 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1ani

737 else: 

738 full_input_files.append(sig) 1ani

739 

740 self.input_files = full_input_files 1ani

741 

742 self.output_files = self.signature['output_files'] 1ani

743 

744 

745class LaserTrialsLegacy(EphysTrials): 

746 """This is the legacy extractor for Guido's ephys optogenetic stimulation protocol. 

747 

748 This is legacy because personal project extractors should be in a separate repository. 

749 """ 

750 def _extract_behaviour(self): 

751 dsets, out_files = super()._extract_behaviour() 1i

752 

753 # Re-extract the laser datasets as the above default extractor discards them 

754 from ibllib.io.extractors import opto_trials 1i

755 laser = opto_trials.LaserBool(self.session_path) 1i

756 dsets_laser, out_files_laser = laser.extract(save=True) 1i

757 dsets.update({k: v for k, v in zip(laser.var_names, dsets_laser)}) 1i

758 out_files.extend(out_files_laser) 1i

759 return dsets, out_files 1i

760 

761 

762class EphysCellsQc(tasks.Task): 

763 priority = 90 

764 level = 3 

765 force = False 

766 

767 signature = { 

768 'input_files': [('spikes.times.npy', 'alf/probe*', True), 

769 ('spikes.clusters.npy', 'alf/probe*', True), 

770 ('spikes.amps.npy', 'alf/probe*', True), 

771 ('spikes.depths.npy', 'alf/probe*', True), 

772 ('clusters.channels.npy', 'alf/probe*', True)], 

773 'output_files': [('clusters.metrics.pqt', 'alf/probe*', True)] 

774 } 

775 

776 def _compute_cell_qc(self, folder_probe): 

777 """ 

778 Computes the cell QC given an extracted probe alf path 

779 :param folder_probe: folder 

780 :return: 

781 """ 

782 # compute the straight qc 

783 _logger.info(f"Computing cluster qc for {folder_probe}") 1a

784 spikes = alfio.load_object(folder_probe, 'spikes') 1a

785 clusters = alfio.load_object(folder_probe, 'clusters') 1a

786 df_units, drift = ephysqc.spike_sorting_metrics( 1a

787 spikes.times, spikes.clusters, spikes.amps, spikes.depths, 

788 cluster_ids=np.arange(clusters.channels.size)) 

789 # if the ks2 labels file exist, load them and add the column 

790 file_labels = folder_probe.joinpath('cluster_KSLabel.tsv') 1a

791 if file_labels.exists(): 1a

792 ks2_labels = pd.read_csv(file_labels, sep='\t') 

793 ks2_labels.rename(columns={'KSLabel': 'ks2_label'}, inplace=True) 

794 df_units = pd.concat( 

795 [df_units, ks2_labels['ks2_label'].reindex(df_units.index)], axis=1) 

796 # save as parquet file 

797 df_units.to_parquet(folder_probe.joinpath("clusters.metrics.pqt")) 1a

798 return folder_probe.joinpath("clusters.metrics.pqt"), df_units, drift 1a

799 

800 def _label_probe_qc(self, folder_probe, df_units, drift): 

801 """ 

802 Labels the json field of the alyx corresponding probe insertion 

803 :param folder_probe: 

804 :param df_units: 

805 :param drift: 

806 :return: 

807 """ 

808 eid = self.one.path2eid(self.session_path, query_type='remote') 1a

809 # the probe name is the first folder after alf: {session_path}/alf/{probe_name}/{spike_sorter_name} 

810 probe_name = Path(folder_probe).relative_to(self.session_path.joinpath('alf')).parts[0] 1a

811 pdict = self.one.alyx.rest('insertions', 'list', session=eid, name=probe_name, no_cache=True) 1a

812 if len(pdict) != 1: 1a

813 _logger.warning(f'No probe found for probe name: {probe_name}') 

814 return 

815 isok = df_units['label'] == 1 1a

816 qcdict = {'n_units': int(df_units.shape[0]), 1a

817 'n_units_qc_pass': int(np.sum(isok)), 

818 'firing_rate_max': np.max(df_units['firing_rate'][isok]), 

819 'firing_rate_median': np.median(df_units['firing_rate'][isok]), 

820 'amplitude_max_uV': np.max(df_units['amp_max'][isok]) * 1e6, 

821 'amplitude_median_uV': np.max(df_units['amp_median'][isok]) * 1e6, 

822 'drift_rms_um': rms(drift['drift_um']), 

823 } 

824 file_wm = folder_probe.joinpath('_kilosort_whitening.matrix.npy') 1a

825 if file_wm.exists(): 1a

826 wm = np.load(file_wm) 1a

827 qcdict['whitening_matrix_conditioning'] = np.linalg.cond(wm) 1a

828 # groom qc dict (this function will eventually go directly into the json field update) 

829 for k in qcdict: 1a

830 if isinstance(qcdict[k], np.int64): 1a

831 qcdict[k] = int(qcdict[k]) 

832 elif isinstance(qcdict[k], float): 1a

833 qcdict[k] = np.round(qcdict[k], 2) 1a

834 self.one.alyx.json_field_update("insertions", pdict[0]["id"], "json", qcdict) 1a

835 

836 def _run(self): 

837 """ 

838 Post spike-sorting quality control at the cluster level. 

839 Outputs a QC table in the clusters ALF object and labels corresponding probes in Alyx 

840 """ 

841 files_spikes = Path(self.session_path).joinpath('alf').rglob('spikes.times.npy') 1a

842 folder_probes = [f.parent for f in files_spikes] 1a

843 out_files = [] 1a

844 for folder_probe in folder_probes: 1a

845 try: 1a

846 qc_file, df_units, drift = self._compute_cell_qc(folder_probe) 1a

847 out_files.append(qc_file) 1a

848 self._label_probe_qc(folder_probe, df_units, drift) 1a

849 except Exception: 

850 _logger.error(traceback.format_exc()) 

851 self.status = -1 

852 continue 

853 return out_files 1a

854 

855 def get_signatures(self, **kwargs): 

856 files_spikes = Path(self.session_path).joinpath('alf').rglob('spikes.times.npy') 1ao

857 folder_probes = [f.parent for f in files_spikes] 1ao

858 

859 full_input_files = [] 1ao

860 for sig in self.signature['input_files']: 1ao

861 for folder in folder_probes: 1ao

862 full_input_files.append((sig[0], str(folder.relative_to(self.session_path)), sig[2])) 1ao

863 

864 self.input_files = full_input_files 1ao

865 

866 full_output_files = [] 1ao

867 for sig in self.signature['output_files']: 1ao

868 for folder in folder_probes: 1ao

869 full_output_files.append((sig[0], str(folder.relative_to(self.session_path)), sig[2])) 1ao

870 

871 self.output_files = full_output_files 1ao

872 

873 

874class EphysMtscomp(tasks.Task): 

875 priority = 50 # ideally after spike sorting 

876 level = 0 

877 force = False 

878 signature = { 

879 'input_files': [('*ap.meta', 'raw_ephys_data/probe*', True), 

880 ('*ap.*bin', 'raw_ephys_data/probe*', True), 

881 ('*lf.meta', 'raw_ephys_data/probe*', False), # NP2 doesn't have lf files 

882 ('*lf.*bin', 'raw_ephys_data/probe*', False), # NP2 doesn't have lf files 

883 ('*nidq.meta', 'raw_ephys_data', True), 

884 ('*nidq.*bin', 'raw_ephys_data', True)], 

885 'output_files': [('*ap.meta', 'raw_ephys_data/probe*', True), 

886 ('*ap.cbin', 'raw_ephys_data/probe*', False), # may not be present on local server anymore 

887 ('*ap.ch', 'raw_ephys_data/probe*', True), 

888 ('*lf.meta', 'raw_ephys_data/probe*', False), # NP2 doesn't have lf files # TODO detect from meta 

889 ('*lf.cbin', 'raw_ephys_data/probe*', False), # may not be present on local server anymore 

890 ('*lf.ch', 'raw_ephys_data/probe*', False), 

891 ('*nidq.meta', 'raw_ephys_data', True), 

892 ('*nidq.cbin', 'raw_ephys_data', False), # may not be present on local server anymore 

893 ('*nidq.ch', 'raw_ephys_data', True)] 

894 } 

895 

896 def _run(self): 

897 """ 

898 Compress ephys files looking for `compress_ephys.flag` within the probes folder 

899 Original bin file will be removed 

900 The registration flag created contains targeted file names at the root of the session 

901 """ 

902 

903 out_files = [] 1efa

904 ephys_files = spikeglx.glob_ephys_files(self.session_path) 1efa

905 ephys_files += spikeglx.glob_ephys_files(self.session_path, ext="ch") 1efa

906 ephys_files += spikeglx.glob_ephys_files(self.session_path, ext="meta") 1efa

907 

908 for ef in ephys_files: 1efa

909 for typ in ["ap", "lf", "nidq"]: 1efa

910 bin_file = ef.get(typ) 1efa

911 if not bin_file: 1efa

912 continue 1efa

913 if bin_file.suffix.find("bin") == 1: 1efa

914 with spikeglx.Reader(bin_file) as sr: 1e

915 if sr.is_mtscomp: 1e

916 out_files.append(bin_file) 

917 else: 

918 _logger.info(f"Compressing binary file {bin_file}") 1e

919 out_files.append(sr.compress_file(keep_original=False)) 1e

920 out_files.append(bin_file.with_suffix('.ch')) 1e

921 else: 

922 out_files.append(bin_file) 1efa

923 

924 return out_files 1efa

925 

926 def get_signatures(self, **kwargs): 

927 neuropixel_version = spikeglx.get_neuropixel_version_from_folder(self.session_path) 1efak

928 probes = spikeglx.get_probes_from_folder(self.session_path) 1efak

929 

930 full_input_files = [] 1efak

931 for sig in self.signature['input_files']: 1efak

932 if 'raw_ephys_data/probe*' in sig[1]: 1efak

933 for probe in probes: 1efak

934 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1efak

935 else: 

936 if neuropixel_version != '3A': 1efak

937 full_input_files.append((sig[0], sig[1], sig[2])) 1fak

938 

939 self.input_files = full_input_files 1efak

940 

941 full_output_files = [] 1efak

942 for sig in self.signature['output_files']: 1efak

943 if 'raw_ephys_data/probe*' in sig[1]: 1efak

944 for probe in probes: 1efak

945 full_output_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1efak

946 else: 

947 if neuropixel_version != '3A': 1efak

948 full_output_files.append((sig[0], sig[1], sig[2])) 1fak

949 

950 self.output_files = full_output_files 1efak

951 

952 

953class EphysDLC(tasks.Task): 

954 """ 

955 This task relies on a correctly installed dlc environment as per 

956 https://docs.google.com/document/d/1g0scP6_3EmaXCU4SsDNZWwDTaD9MG0es_grLA-d0gh0/edit# 

957 

958 If your environment is set up otherwise, make sure that you set the respective attributes: 

959 t = EphysDLC(session_path) 

960 t.dlcenv = Path('/path/to/your/dlcenv/bin/activate') 

961 t.scripts = Path('/path/to/your/iblscripts/deploy/serverpc/dlc') 

962 """ 

963 gpu = 1 

964 cpu = 4 

965 io_charge = 100 

966 level = 2 

967 force = True 

968 job_size = 'large' 

969 

970 dlcenv = Path.home().joinpath('Documents', 'PYTHON', 'envs', 'dlcenv', 'bin', 'activate') 

971 scripts = Path.home().joinpath('Documents', 'PYTHON', 'iblscripts', 'deploy', 'serverpc', 'dlc') 

972 signature = { 

973 'input_files': [ 

974 ('_iblrig_leftCamera.raw.mp4', 'raw_video_data', True), 

975 ('_iblrig_rightCamera.raw.mp4', 'raw_video_data', True), 

976 ('_iblrig_bodyCamera.raw.mp4', 'raw_video_data', True), 

977 ], 

978 'output_files': [ 

979 ('_ibl_leftCamera.dlc.pqt', 'alf', True), 

980 ('_ibl_rightCamera.dlc.pqt', 'alf', True), 

981 ('_ibl_bodyCamera.dlc.pqt', 'alf', True), 

982 ('leftCamera.ROIMotionEnergy.npy', 'alf', True), 

983 ('rightCamera.ROIMotionEnergy.npy', 'alf', True), 

984 ('bodyCamera.ROIMotionEnergy.npy', 'alf', True), 

985 ('leftROIMotionEnergy.position.npy', 'alf', True), 

986 ('rightROIMotionEnergy.position.npy', 'alf', True), 

987 ('bodyROIMotionEnergy.position.npy', 'alf', True), 

988 ], 

989 } 

990 

991 def _check_dlcenv(self): 

992 """Check that scripts are present, dlcenv can be activated and get iblvideo version""" 

993 assert len(list(self.scripts.rglob('run_dlc.*'))) == 2, \ 

994 f'Scripts run_dlc.sh and run_dlc.py do not exist in {self.scripts}' 

995 assert len(list(self.scripts.rglob('run_motion.*'))) == 2, \ 

996 f'Scripts run_motion.sh and run_motion.py do not exist in {self.scripts}' 

997 assert self.dlcenv.exists(), f"DLC environment does not exist in assumed location {self.dlcenv}" 

998 command2run = f"source {self.dlcenv}; python -c 'import iblvideo; print(iblvideo.__version__)'" 

999 process = subprocess.Popen( 

1000 command2run, 

1001 shell=True, 

1002 stdout=subprocess.PIPE, 

1003 stderr=subprocess.PIPE, 

1004 executable="/bin/bash" 

1005 ) 

1006 info, error = process.communicate() 

1007 if process.returncode != 0: 

1008 raise AssertionError(f"DLC environment check failed\n{error.decode('utf-8')}") 

1009 version = info.decode("utf-8").strip().split('\n')[-1] 

1010 return version 

1011 

1012 @staticmethod 

1013 def _video_intact(file_mp4): 

1014 """Checks that the downloaded video can be opened and is not empty""" 

1015 cap = cv2.VideoCapture(str(file_mp4)) 

1016 frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) 

1017 intact = True if frame_count > 0 else False 

1018 cap.release() 

1019 return intact 

1020 

1021 def _run(self, cams=None, overwrite=False): 

1022 # Default to all three cams 

1023 cams = cams or ['left', 'right', 'body'] 1a

1024 cams = assert_valid_label(cams) 1a

1025 # Set up 

1026 self.session_id = self.one.path2eid(self.session_path) 1a

1027 actual_outputs = [] 1a

1028 

1029 # Loop through cams 

1030 for cam in cams: 1a

1031 # Catch exceptions so that following cameras can still run 

1032 try: 1a

1033 # If all results exist and overwrite is False, skip computation 

1034 expected_outputs_present, expected_outputs = self.assert_expected(self.output_files, silent=True) 1a

1035 if overwrite is False and expected_outputs_present is True: 1a

1036 actual_outputs.extend(expected_outputs) 1a

1037 return actual_outputs 1a

1038 else: 

1039 file_mp4 = next(self.session_path.joinpath('raw_video_data').glob(f'_iblrig_{cam}Camera.raw*.mp4')) 

1040 if not file_mp4.exists(): 

1041 # In this case we set the status to Incomplete. 

1042 _logger.error(f"No raw video file available for {cam}, skipping.") 

1043 self.status = -3 

1044 continue 

1045 if not self._video_intact(file_mp4): 

1046 _logger.error(f"Corrupt raw video file {file_mp4}") 

1047 self.status = -1 

1048 continue 

1049 # Check that dlc environment is ok, shell scripts exists, and get iblvideo version, GPU addressable 

1050 self.version = self._check_dlcenv() 

1051 _logger.info(f'iblvideo version {self.version}') 

1052 check_nvidia_driver() 

1053 

1054 _logger.info(f'Running DLC on {cam}Camera.') 

1055 command2run = f"{self.scripts.joinpath('run_dlc.sh')} {str(self.dlcenv)} {file_mp4} {overwrite}" 

1056 _logger.info(command2run) 

1057 process = subprocess.Popen( 

1058 command2run, 

1059 shell=True, 

1060 stdout=subprocess.PIPE, 

1061 stderr=subprocess.PIPE, 

1062 executable="/bin/bash", 

1063 ) 

1064 info, error = process.communicate() 

1065 # info_str = info.decode("utf-8").strip() 

1066 # _logger.info(info_str) 

1067 if process.returncode != 0: 

1068 error_str = error.decode("utf-8").strip() 

1069 _logger.error(f'DLC failed for {cam}Camera.\n\n' 

1070 f'++++++++ Output of subprocess for debugging ++++++++\n\n' 

1071 f'{error_str}\n' 

1072 f'++++++++++++++++++++++++++++++++++++++++++++\n') 

1073 self.status = -1 

1074 # We dont' run motion energy, or add any files if dlc failed to run 

1075 continue 

1076 dlc_result = next(self.session_path.joinpath('alf').glob(f'_ibl_{cam}Camera.dlc*.pqt')) 

1077 actual_outputs.append(dlc_result) 

1078 

1079 _logger.info(f'Computing motion energy for {cam}Camera') 

1080 command2run = f"{self.scripts.joinpath('run_motion.sh')} {str(self.dlcenv)} {file_mp4} {dlc_result}" 

1081 _logger.info(command2run) 

1082 process = subprocess.Popen( 

1083 command2run, 

1084 shell=True, 

1085 stdout=subprocess.PIPE, 

1086 stderr=subprocess.PIPE, 

1087 executable="/bin/bash", 

1088 ) 

1089 info, error = process.communicate() 

1090 # info_str = info.decode("utf-8").strip() 

1091 # _logger.info(info_str) 

1092 if process.returncode != 0: 

1093 error_str = error.decode("utf-8").strip() 

1094 _logger.error(f'Motion energy failed for {cam}Camera.\n\n' 

1095 f'++++++++ Output of subprocess for debugging ++++++++\n\n' 

1096 f'{error_str}\n' 

1097 f'++++++++++++++++++++++++++++++++++++++++++++\n') 

1098 self.status = -1 

1099 continue 

1100 actual_outputs.append(next(self.session_path.joinpath('alf').glob( 

1101 f'{cam}Camera.ROIMotionEnergy*.npy'))) 

1102 actual_outputs.append(next(self.session_path.joinpath('alf').glob( 

1103 f'{cam}ROIMotionEnergy.position*.npy'))) 

1104 except BaseException: 

1105 _logger.error(traceback.format_exc()) 

1106 self.status = -1 

1107 continue 

1108 # If status is Incomplete, check that there is at least one output. 

1109 # # Otherwise make sure it gets set to Empty (outputs = None), and set status to -1 to make sure it doesn't slip 

1110 if self.status == -3 and len(actual_outputs) == 0: 

1111 actual_outputs = None 

1112 self.status = -1 

1113 return actual_outputs 

1114 

1115 

1116class EphysPostDLC(tasks.Task): 

1117 """ 

1118 The post_dlc task takes dlc traces as input and computes useful quantities, as well as qc. 

1119 """ 

1120 io_charge = 90 

1121 level = 3 

1122 force = True 

1123 signature = {'input_files': [('_ibl_leftCamera.dlc.pqt', 'alf', True), 

1124 ('_ibl_bodyCamera.dlc.pqt', 'alf', True), 

1125 ('_ibl_rightCamera.dlc.pqt', 'alf', True), 

1126 ('_ibl_rightCamera.times.npy', 'alf', True), 

1127 ('_ibl_leftCamera.times.npy', 'alf', True), 

1128 ('_ibl_bodyCamera.times.npy', 'alf', True), 

1129 # the following are required for the DLC plot only 

1130 # they are not strictly required, some plots just might be skipped 

1131 # In particular the raw videos don't need to be downloaded as they can be streamed 

1132 ('_iblrig_bodyCamera.raw.mp4', 'raw_video_data', True), 

1133 ('_iblrig_leftCamera.raw.mp4', 'raw_video_data', True), 

1134 ('_iblrig_rightCamera.raw.mp4', 'raw_video_data', True), 

1135 ('rightROIMotionEnergy.position.npy', 'alf', False), 

1136 ('leftROIMotionEnergy.position.npy', 'alf', False), 

1137 ('bodyROIMotionEnergy.position.npy', 'alf', False), 

1138 ('_ibl_trials.table.pqt', 'alf', True), 

1139 ('_ibl_wheel.position.npy', 'alf', True), 

1140 ('_ibl_wheel.timestamps.npy', 'alf', True), 

1141 ], 

1142 # More files are required for all panels of the DLC QC plot to function 

1143 'output_files': [('_ibl_leftCamera.features.pqt', 'alf', True), 

1144 ('_ibl_rightCamera.features.pqt', 'alf', True), 

1145 ('licks.times.npy', 'alf', True), 

1146 # ('dlc_qc_plot.png', 'snapshot', False) 

1147 ] 

1148 } 

1149 

1150 def _run(self, overwrite=True, run_qc=True, plot_qc=True): 

1151 """ 

1152 Run the EphysPostDLC task. Returns a list of file locations for the output files in signature. The created plot 

1153 (dlc_qc_plot.png) is not returned, but saved in session_path/snapshots and uploaded to Alyx as a note. 

1154 

1155 :param overwrite: bool, whether to recompute existing output files (default is False). 

1156 Note that the dlc_qc_plot will be (re-)computed even if overwrite = False 

1157 :param run_qc: bool, whether to run the DLC QC (default is True) 

1158 :param plot_qc: book, whether to create the dlc_qc_plot (default is True) 

1159 

1160 """ 

1161 # Check if output files exist locally 

1162 exist, output_files = self.assert_expected(self.signature['output_files'], silent=True) 1ba

1163 if exist and not overwrite: 1ba

1164 _logger.warning('EphysPostDLC outputs exist and overwrite=False, skipping computations of outputs.') 

1165 else: 

1166 if exist and overwrite: 1ba

1167 _logger.warning('EphysPostDLC outputs exist and overwrite=True, overwriting existing outputs.') 

1168 # Find all available dlc files 

1169 dlc_files = list(Path(self.session_path).joinpath('alf').glob('_ibl_*Camera.dlc.*')) 1ba

1170 for dlc_file in dlc_files: 1ba

1171 _logger.debug(dlc_file) 1ba

1172 output_files = [] 1ba

1173 combined_licks = [] 1ba

1174 

1175 for dlc_file in dlc_files: 1ba

1176 # Catch unforeseen exceptions and move on to next cam 

1177 try: 1ba

1178 cam = label_from_path(dlc_file) 1ba

1179 # load dlc trace and camera times 

1180 dlc = pd.read_parquet(dlc_file) 1ba

1181 dlc_thresh = likelihood_threshold(dlc, 0.9) 1ba

1182 # try to load respective camera times 

1183 try: 1ba

1184 dlc_t = np.load(next(Path(self.session_path).joinpath('alf').glob(f'_ibl_{cam}Camera.times.*npy'))) 1ba

1185 times = True 1ba

1186 if dlc_t.shape[0] == 0: 1ba

1187 _logger.error(f'camera.times empty for {cam} camera. ' 

1188 f'Computations using camera.times will be skipped') 

1189 self.status = -1 

1190 times = False 

1191 elif dlc_t.shape[0] < len(dlc_thresh): 1ba

1192 _logger.error(f'Camera times shorter than DLC traces for {cam} camera. ' 

1193 f'Computations using camera.times will be skipped') 

1194 self.status = -1 

1195 times = 'short' 

1196 except StopIteration: 

1197 self.status = -1 

1198 times = False 

1199 _logger.error(f'No camera.times for {cam} camera. ' 

1200 f'Computations using camera.times will be skipped') 

1201 # These features are only computed from left and right cam 

1202 if cam in ('left', 'right'): 1ba

1203 features = pd.DataFrame() 1ba

1204 # If camera times are available, get the lick time stamps for combined array 

1205 if times is True: 1ba

1206 _logger.info(f"Computing lick times for {cam} camera.") 1ba

1207 combined_licks.append(get_licks(dlc_thresh, dlc_t)) 1ba

1208 elif times is False: 

1209 _logger.warning(f"Skipping lick times for {cam} camera as no camera.times available") 

1210 elif times == 'short': 

1211 _logger.warning(f"Skipping lick times for {cam} camera as camera.times are too short") 

1212 # Compute pupil diameter, raw and smoothed 

1213 _logger.info(f"Computing raw pupil diameter for {cam} camera.") 1ba

1214 features['pupilDiameter_raw'] = get_pupil_diameter(dlc_thresh) 1ba

1215 try: 1ba

1216 _logger.info(f"Computing smooth pupil diameter for {cam} camera.") 1ba

1217 features['pupilDiameter_smooth'] = get_smooth_pupil_diameter(features['pupilDiameter_raw'], 1ba

1218 cam) 

1219 except BaseException: 

1220 _logger.error(f"Computing smooth pupil diameter for {cam} camera failed, saving all NaNs.") 

1221 _logger.error(traceback.format_exc()) 

1222 features['pupilDiameter_smooth'] = np.nan 

1223 # Safe to pqt 

1224 features_file = Path(self.session_path).joinpath('alf', f'_ibl_{cam}Camera.features.pqt') 1ba

1225 features.to_parquet(features_file) 1ba

1226 output_files.append(features_file) 1ba

1227 

1228 # For all cams, compute DLC qc if times available 

1229 if run_qc is True and times in [True, 'short']: 1ba

1230 # Setting download_data to False because at this point the data should be there 

1231 qc = DlcQC(self.session_path, side=cam, one=self.one, download_data=False) 1a

1232 qc.run(update=True) 1a

1233 else: 

1234 if times is False: 

1235 _logger.warning(f"Skipping QC for {cam} camera as no camera.times available") 

1236 if not run_qc: 

1237 _logger.warning(f"Skipping QC for {cam} camera as run_qc=False") 

1238 

1239 except BaseException: 

1240 _logger.error(traceback.format_exc()) 

1241 self.status = -1 

1242 continue 

1243 

1244 # Combined lick times 

1245 if len(combined_licks) > 0: 1ba

1246 lick_times_file = Path(self.session_path).joinpath('alf', 'licks.times.npy') 1ba

1247 np.save(lick_times_file, sorted(np.concatenate(combined_licks))) 1ba

1248 output_files.append(lick_times_file) 1ba

1249 else: 

1250 _logger.warning("No lick times computed for this session.") 

1251 

1252 if plot_qc: 1ba

1253 _logger.info("Creating DLC QC plot") 1a

1254 try: 1a

1255 session_id = self.one.path2eid(self.session_path) 1a

1256 fig_path = self.session_path.joinpath('snapshot', 'dlc_qc_plot.png') 1a

1257 if not fig_path.parent.exists(): 1a

1258 fig_path.parent.mkdir(parents=True, exist_ok=True) 

1259 fig = dlc_qc_plot(self.session_path, one=self.one) 1a

1260 fig.savefig(fig_path) 1a

1261 fig.clf() 1a

1262 snp = ReportSnapshot(self.session_path, session_id, one=self.one) 1a

1263 snp.outputs = [fig_path] 1a

1264 snp.register_images(widths=['orig'], 1a

1265 function=str(dlc_qc_plot.__module__) + '.' + str(dlc_qc_plot.__name__)) 

1266 except BaseException: 

1267 _logger.error('Could not create and/or upload DLC QC Plot') 

1268 _logger.error(traceback.format_exc()) 

1269 self.status = -1 

1270 

1271 return output_files 1ba

1272 

1273 

1274class EphysPassive(tasks.Task): 

1275 cpu = 1 

1276 io_charge = 90 

1277 level = 1 

1278 force = False 

1279 signature = { 

1280 'input_files': [('_iblrig_taskSettings.raw*', 'raw_behavior_data', True), 

1281 ('_spikeglx_sync.channels.*', 'raw_ephys_data*', True), 

1282 ('_spikeglx_sync.polarities.*', 'raw_ephys_data*', True), 

1283 ('_spikeglx_sync.times.*', 'raw_ephys_data*', True), 

1284 ('*.meta', 'raw_ephys_data*', True), 

1285 ('*wiring.json', 'raw_ephys_data*', False), 

1286 ('_iblrig_RFMapStim.raw*', 'raw_passive_data', True)], 

1287 'output_files': [('_ibl_passiveGabor.table.csv', 'alf', True), 

1288 ('_ibl_passivePeriods.intervalsTable.csv', 'alf', True), 

1289 ('_ibl_passiveRFM.times.npy', 'alf', True), 

1290 ('_ibl_passiveStims.table.csv', 'alf', True)]} 

1291 

1292 def _run(self): 

1293 """returns a list of pathlib.Paths. """ 

1294 data, paths = ephys_passive.PassiveChoiceWorld(self.session_path).extract(save=True) 1a

1295 if any([x is None for x in paths]): 

1296 self.status = -1 

1297 # Register? 

1298 return paths 

1299 

1300 def get_signatures(self, **kwargs): 

1301 neuropixel_version = spikeglx.get_neuropixel_version_from_folder(self.session_path) 1ap

1302 probes = spikeglx.get_probes_from_folder(self.session_path) 1ap

1303 

1304 full_input_files = [] 1ap

1305 for sig in self.signature['input_files']: 1ap

1306 if 'raw_ephys_data*' in sig[1]: 1ap

1307 if neuropixel_version != '3A': 1ap

1308 full_input_files.append((sig[0], 'raw_ephys_data', sig[2])) 1ap

1309 for probe in probes: 1ap

1310 full_input_files.append((sig[0], f'raw_ephys_data/{probe}', sig[2])) 1ap

1311 else: 

1312 full_input_files.append(sig) 1ap

1313 

1314 self.input_files = full_input_files 1ap

1315 

1316 self.output_files = self.signature['output_files'] 1ap

1317 

1318 

1319class EphysExtractionPipeline(tasks.Pipeline): 

1320 label = __name__ 

1321 

1322 def __init__(self, session_path=None, **kwargs): 

1323 super(EphysExtractionPipeline, self).__init__(session_path, **kwargs) 1a

1324 tasks = OrderedDict() 1a

1325 self.session_path = session_path 1a

1326 # level 0 

1327 tasks['ExperimentDescriptionRegisterRaw'] = base_tasks.ExperimentDescriptionRegisterRaw(self.session_path) 1a

1328 tasks["EphysRegisterRaw"] = tpp.TrainingRegisterRaw(self.session_path) 1a

1329 tasks["EphysPulses"] = EphysPulses(self.session_path) 1a

1330 tasks["EphysRawQC"] = RawEphysQC(self.session_path) 1a

1331 tasks["EphysAudio"] = EphysAudio(self.session_path) 1a

1332 tasks["EphysMtscomp"] = EphysMtscomp(self.session_path) 1a

1333 tasks['EphysVideoCompress'] = EphysVideoCompress(self.session_path) 1a

1334 # level 1 

1335 tasks["SpikeSorting"] = SpikeSorting( 1a

1336 self.session_path, parents=[tasks["EphysMtscomp"], tasks["EphysPulses"]]) 

1337 tasks["EphysTrials"] = EphysTrials(self.session_path, parents=[tasks["EphysPulses"]]) 1a

1338 

1339 tasks["EphysPassive"] = EphysPassive(self.session_path, parents=[tasks["EphysPulses"]]) 1a

1340 # level 2 

1341 tasks["EphysVideoSyncQc"] = EphysVideoSyncQc( 1a

1342 self.session_path, parents=[tasks["EphysVideoCompress"], tasks["EphysPulses"], tasks["EphysTrials"]]) 

1343 tasks["EphysCellsQc"] = EphysCellsQc(self.session_path, parents=[tasks["SpikeSorting"]]) 1a

1344 tasks["EphysDLC"] = EphysDLC(self.session_path, parents=[tasks["EphysVideoCompress"]]) 1a

1345 tasks['EphysTrainingStatus'] = tpp.TrainingStatus(self.session_path, parents=[tasks["EphysTrials"]]) 1a

1346 # level 3 

1347 tasks["EphysPostDLC"] = EphysPostDLC(self.session_path, parents=[tasks["EphysDLC"], tasks["EphysTrials"], 1a

1348 tasks["EphysVideoSyncQc"]]) 

1349 self.tasks = tasks 1a