Coverage for ibllib/pipes/ephys_tasks.py: 57%
456 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
1import logging
2import traceback
3from pathlib import Path
4import subprocess
5import re
6import shutil
8import packaging.version
9import numpy as np
10import pandas as pd
11import spikeglx
12import neuropixel
13from neurodsp.utils import rms
14import one.alf.io as alfio
16from ibllib.misc import check_nvidia_driver
17from ibllib.pipes import base_tasks
18from ibllib.pipes.sync_tasks import SyncPulses
19from ibllib.ephys import ephysqc, spikes
20from ibllib.qc.alignment_qc import get_aligned_channels
21from ibllib.plots.figures import LfpPlots, ApPlots, BadChannelsAp
22from ibllib.plots.figures import SpikeSorting as SpikeSortingPlots
23from ibllib.io.extractors.ephys_fpga import extract_sync
24from ibllib.ephys.spikes import sync_probes
27_logger = logging.getLogger("ibllib")
30class EphysRegisterRaw(base_tasks.DynamicTask):
31 """
32 Creates the probe insertions and uploads the probe descriptions file, also compresses the nidq files and uploads
33 """
35 priority = 100
36 job_size = 'small'
38 @property
39 def signature(self):
40 signature = { 1v
41 'input_files': [],
42 'output_files': [('probes.description.json', 'alf', True)]
43 }
44 return signature 1v
46 def _run(self):
48 out_files = spikes.probes_description(self.session_path, self.one) 1v
50 return out_files 1v
53class EphysSyncRegisterRaw(base_tasks.DynamicTask):
54 """
55 Task to rename, compress and register raw daq data with .bin format collected using NIDAQ
56 """
58 priority = 90
59 cpu = 2
60 io_charge = 30 # this jobs reads raw ap files
61 job_size = 'small'
63 @property
64 def signature(self):
65 signature = { 1efn
66 'input_files': [('*.*bin', self.sync_collection, True),
67 ('*.meta', self.sync_collection, True),
68 ('*.wiring.json', self.sync_collection, True)],
69 'output_files': [('*nidq.cbin', self.sync_collection, True),
70 ('*nidq.ch', self.sync_collection, True),
71 ('*nidq.meta', self.sync_collection, True),
72 ('*nidq.wiring.json', self.sync_collection, True)]
73 }
74 return signature 1efn
76 def _run(self):
78 out_files = [] 1efn
80 # Detect the wiring file
81 wiring_file = next(self.session_path.joinpath(self.sync_collection).glob('*.wiring.json'), None) 1efn
82 if wiring_file is not None: 1efn
83 out_files.append(wiring_file) 1efn
85 # Search for .bin files in the sync_collection folder
86 files = list(self.session_path.joinpath(self.sync_collection).glob('*nidq.*bin')) 1efn
87 bin_file = files[0] if len(files) == 1 else None 1efn
89 # If we don't have a .bin/ .cbin file anymore see if we can still find the .ch and .meta files
90 if bin_file is None: 1efn
91 for ext in ['ch', 'meta']: 1n
92 files = list(self.session_path.joinpath(self.sync_collection).glob(f'*nidq.{ext}')) 1n
93 ext_file = files[0] if len(files) == 1 else None 1n
94 if ext_file is not None: 1n
95 out_files.append(ext_file) 1n
97 return out_files if len(out_files) > 0 else None 1n
99 # If we do find the .bin file, compress files (only if they haven't already been compressed)
100 sr = spikeglx.Reader(bin_file) 1ef
101 if sr.is_mtscomp: 1ef
102 sr.close() 1e
103 cbin_file = bin_file 1e
104 assert cbin_file.suffix == '.cbin' 1e
105 else:
106 cbin_file = sr.compress_file() 1f
107 sr.close() 1f
108 bin_file.unlink() 1f
110 meta_file = cbin_file.with_suffix('.meta') 1ef
111 ch_file = cbin_file.with_suffix('.ch') 1ef
113 out_files.append(cbin_file) 1ef
114 out_files.append(ch_file) 1ef
115 out_files.append(meta_file) 1ef
117 return out_files 1ef
120class EphysCompressNP1(base_tasks.EphysTask):
121 priority = 90
122 cpu = 2
123 io_charge = 100 # this jobs reads raw ap files
124 job_size = 'small'
126 @property
127 def signature(self):
128 signature = { 1ldm
129 'input_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True),
130 ('*ap.*bin', f'{self.device_collection}/{self.pname}', True),
131 ('*lf.meta', f'{self.device_collection}/{self.pname}', True),
132 ('*lf.*bin', f'{self.device_collection}/{self.pname}', True),
133 ('*wiring.json', f'{self.device_collection}/{self.pname}', False)],
134 'output_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True),
135 ('*ap.cbin', f'{self.device_collection}/{self.pname}', True),
136 ('*ap.ch', f'{self.device_collection}/{self.pname}', True),
137 ('*lf.meta', f'{self.device_collection}/{self.pname}', True),
138 ('*lf.cbin', f'{self.device_collection}/{self.pname}', True),
139 ('*lf.ch', f'{self.device_collection}/{self.pname}', True),
140 ('*wiring.json', f'{self.device_collection}/{self.pname}', False)]
141 }
142 return signature 1ldm
144 def _run(self):
146 out_files = [] 1ldm
148 # Detect and upload the wiring file
149 wiring_file = next(self.session_path.joinpath(self.device_collection, self.pname).glob('*wiring.json'), None) 1ldm
150 if wiring_file is not None: 1ldm
151 out_files.append(wiring_file) 1ldm
153 ephys_files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname)) 1ldm
154 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="ch") 1ldm
155 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="meta") 1ldm
157 for ef in ephys_files: 1ldm
158 for typ in ["ap", "lf"]: 1ldm
159 bin_file = ef.get(typ) 1ldm
160 if not bin_file: 1ldm
161 continue
162 if bin_file.suffix.find("bin") == 1: 1ldm
163 with spikeglx.Reader(bin_file) as sr: 1d
164 if sr.is_mtscomp: 1d
165 out_files.append(bin_file)
166 else:
167 _logger.info(f"Compressing binary file {bin_file}") 1d
168 cbin_file = sr.compress_file() 1d
169 sr.close() 1d
170 bin_file.unlink() 1d
171 out_files.append(cbin_file) 1d
172 out_files.append(cbin_file.with_suffix('.ch')) 1d
173 else:
174 out_files.append(bin_file) 1ldm
176 return out_files 1ldm
179class EphysCompressNP21(base_tasks.EphysTask):
180 priority = 90
181 cpu = 2
182 io_charge = 100 # this jobs reads raw ap files
183 job_size = 'large'
185 @property
186 def signature(self):
187 signature = { 1ghi
188 'input_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True),
189 ('*ap.*bin', f'{self.device_collection}/{self.pname}', True),
190 ('*wiring.json', f'{self.device_collection}/{self.pname}', False)],
191 'output_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True),
192 ('*ap.cbin', f'{self.device_collection}/{self.pname}', True),
193 ('*ap.ch', f'{self.device_collection}/{self.pname}', True),
194 ('*lf.meta', f'{self.device_collection}/{self.pname}', True),
195 ('*lf.cbin', f'{self.device_collection}/{self.pname}', True),
196 ('*lf.ch', f'{self.device_collection}/{self.pname}', True),
197 ('*wiring.json', f'{self.device_collection}/{self.pname}', False)]
198 }
199 return signature 1ghi
201 def _run(self):
203 out_files = [] 1ghi
204 # Detect wiring files
205 wiring_file = next(self.session_path.joinpath(self.device_collection, self.pname).glob('*wiring.json'), None) 1ghi
206 if wiring_file is not None: 1ghi
207 out_files.append(wiring_file) 1ghi
209 ephys_files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname)) 1ghi
210 if len(ephys_files) > 0: 1ghi
211 bin_file = ephys_files[0].get('ap', None) 1gh
213 # This is the case where no ap.bin/.cbin file exists
214 if len(ephys_files) == 0 or not bin_file: 1ghi
215 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="ch") 1i
216 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="meta") 1i
217 for ef in ephys_files: 1i
218 for typ in ["ap", "lf"]: 1i
219 bin_file = ef.get(typ) 1i
220 if bin_file: 1i
221 out_files.append(bin_file) 1i
223 return out_files 1i
225 # If the ap.bin / .cbin file does exists instantiate the NP2converter
226 np_conv = neuropixel.NP2Converter(bin_file, compress=True) 1gh
227 np_conv_status = np_conv.process() 1gh
228 np_conv_files = np_conv.get_processed_files_NP21() 1gh
229 np_conv.sr.close() 1gh
231 # Status = 1 - successfully complete
232 if np_conv_status == 1: # This is the status that it has completed successfully 1gh
233 out_files += np_conv_files 1gh
234 return out_files 1gh
235 # Status = 0 - Already processed
236 elif np_conv_status == 0:
237 ephys_files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname))
238 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="ch")
239 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="meta")
240 for ef in ephys_files:
241 for typ in ["ap", "lf"]:
242 bin_file = ef.get(typ)
243 if bin_file and bin_file.suffix != '.bin':
244 out_files.append(bin_file)
246 return out_files
248 else:
249 return
252class EphysCompressNP24(base_tasks.EphysTask):
253 """
254 Compresses NP2.4 data by splitting into N binary files, corresponding to N shanks
255 :param pname: a probe name string
256 :param device_collection: the collection containing the probes (usually 'raw_ephys_data')
257 :param nshanks: number of shanks used (usually 4 but it may be less depending on electrode map), optional
258 """
260 priority = 90
261 cpu = 2
262 io_charge = 100 # this jobs reads raw ap files
263 job_size = 'large'
265 def __init__(self, session_path, *args, pname=None, device_collection='raw_ephys_data', nshanks=None, **kwargs):
266 assert pname, "pname is a required argument" 1opcb
267 if nshanks is None: 1opcb
268 meta_file = next(session_path.joinpath(device_collection, pname).glob('*ap.meta')) 1b
269 nshanks = spikeglx._get_nshanks_from_meta(spikeglx.read_meta_data(meta_file)) 1b
270 assert nshanks > 1 1opcb
271 super(EphysCompressNP24, self).__init__( 1opcb
272 session_path, *args, pname=pname, device_collection=device_collection, nshanks=nshanks, **kwargs)
274 @property
275 def signature(self):
277 signature = { 1cb
278 'input_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True),
279 ('*ap.*bin', f'{self.device_collection}/{self.pname}', True),
280 ('*wiring.json', f'{self.device_collection}/{self.pname}', False)],
281 'output_files': [('*ap.meta', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] +
282 [('*ap.cbin', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] +
283 [('*ap.ch', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] +
284 [('*lf.meta', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] +
285 [('*lf.cbin', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] +
286 [('*lf.ch', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] +
287 [('*wiring.json', f'{self.device_collection}/{self.pname}{pext}', False) for pext in self.pextra]
288 }
289 return signature 1cb
291 def _run(self, delete_original=True):
293 # Do we need the ability to register the files once it already been processed and original file deleted?
295 files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname)) 1cb
296 assert len(files) == 1 1cb
297 bin_file = files[0].get('ap', None) 1cb
299 np_conv = neuropixel.NP2Converter(bin_file, post_check=True, compress=True, delete_original=delete_original) 1cb
300 np_conv_status = np_conv.process() 1cb
301 out_files = np_conv.get_processed_files_NP24() 1cb
302 np_conv.sr.close() 1cb
304 if np_conv_status == 1: 1cb
305 wiring_file = next(self.session_path.joinpath(self.device_collection, self.pname).glob('*wiring.json'), None) 1cb
306 if wiring_file is not None: 1cb
307 # copy wiring file to each sub probe directory and add to output files
308 for pext in self.pextra: 1c
309 new_wiring_file = self.session_path.joinpath(self.device_collection, f'{self.pname}{pext}', wiring_file.name) 1c
310 shutil.copyfile(wiring_file, new_wiring_file) 1c
311 out_files.append(new_wiring_file) 1c
312 return out_files 1cb
313 elif np_conv_status == 0: 1c
314 for pext in self.pextra: 1c
315 ephys_files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, f'{self.pname}{pext}')) 1c
316 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, 1c
317 f'{self.pname}{pext}'), ext="ch")
318 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, 1c
319 f'{self.pname}{pext}'), ext="meta")
320 for ef in ephys_files: 1c
321 for typ in ["ap", "lf"]: 1c
322 bin_file = ef.get(typ) 1c
323 if bin_file and bin_file.suffix != '.bin': 1c
324 out_files.append(bin_file) 1c
326 wiring_file = next(self.session_path.joinpath(self.device_collection, 1c
327 f'{self.pname}{pext}').glob('*wiring.json'), None)
328 if wiring_file is None: 1c
329 # See if we have the original wiring file
330 orig_wiring_file = next(self.session_path.joinpath(self.device_collection,
331 self.pname).glob('*wiring.json'), None)
332 if orig_wiring_file is not None:
333 # copy wiring file to sub probe directory and add to output files
334 new_wiring_file = self.session_path.joinpath(self.device_collection, f'{self.pname}{pext}',
335 orig_wiring_file.name)
336 shutil.copyfile(orig_wiring_file, new_wiring_file)
337 out_files.append(new_wiring_file)
338 else:
339 out_files.append(wiring_file) 1c
341 return out_files 1c
342 else:
343 return
346class EphysSyncPulses(SyncPulses):
348 priority = 90
349 cpu = 2
350 io_charge = 30 # this jobs reads raw ap files
351 job_size = 'small'
353 @property
354 def signature(self):
355 signature = { 1w
356 'input_files': [('*nidq.cbin', self.sync_collection, True),
357 ('*nidq.ch', self.sync_collection, False),
358 ('*nidq.meta', self.sync_collection, True),
359 ('*nidq.wiring.json', self.sync_collection, True)],
360 'output_files': [('_spikeglx_sync.times.npy', self.sync_collection, True),
361 ('_spikeglx_sync.polarities.npy', self.sync_collection, True),
362 ('_spikeglx_sync.channels.npy', self.sync_collection, True)]
363 }
365 return signature 1w
368class EphysPulses(base_tasks.EphysTask):
369 """
370 Extract Pulses from raw electrophysiology data into numpy arrays
371 Perform the probes synchronisation with nidq (3B) or main probe (3A)
372 First the job extract the sync pulses from the synchronisation task in all probes, and then perform the
373 synchronisation with the nidq
375 :param pname: a list of probes names or a single probe name string
376 :param device_collection: the collection containing the probes (usually 'raw_ephys_data')
377 :param sync_collection: the collection containing the synchronisation device - nidq (usually 'raw_ephys_data')
378 """
379 priority = 90
380 cpu = 2
381 io_charge = 30 # this jobs reads raw ap files
382 job_size = 'small'
384 def __init__(self, *args, **kwargs):
385 super(EphysPulses, self).__init__(*args, **kwargs) 1arstopbjku
386 assert self.device_collection, "device_collection is a required argument" 1arstopbjku
387 assert self.sync_collection, "sync_collection is a required argument" 1arstopbjku
388 self.pname = [self.pname] if isinstance(self.pname, str) else self.pname 1arstopbjku
389 assert isinstance(self.pname, list), 'pname task argument should be a list or a string' 1arstopbjku
391 @property
392 def signature(self):
393 signature = { 1bjk
394 'input_files': [('*ap.meta', f'{self.device_collection}/{pname}', True) for pname in self.pname] +
395 [('*ap.cbin', f'{self.device_collection}/{pname}', True) for pname in self.pname] +
396 [('*ap.ch', f'{self.device_collection}/{pname}', True) for pname in self.pname] +
397 [('*ap.wiring.json', f'{self.device_collection}/{pname}', False) for pname in self.pname] +
398 [('_spikeglx_sync.times.npy', self.sync_collection, True),
399 ('_spikeglx_sync.polarities.npy', self.sync_collection, True),
400 ('_spikeglx_sync.channels.npy', self.sync_collection, True)],
401 'output_files': [(f'_spikeglx_sync.times.{pname}.npy', f'{self.device_collection}/{pname}', True)
402 for pname in self.pname] +
403 [(f'_spikeglx_sync.polarities.{pname}.npy', f'{self.device_collection}/{pname}', True)
404 for pname in self.pname] +
405 [(f'_spikeglx_sync.channels.{pname}.npy', f'{self.device_collection}/{pname}', True)
406 for pname in self.pname] +
407 [('*sync.npy', f'{self.device_collection}/{pname}', True) for pname in
408 self.pname] +
409 [('*timestamps.npy', f'{self.device_collection}/{pname}', True) for pname in
410 self.pname]
411 }
413 return signature 1bjk
415 def _run(self, overwrite=False):
417 out_files = [] 1bjk
418 for probe in self.pname: 1bjk
419 files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, probe)) 1bjk
420 assert len(files) == 1 # will error if the session is split 1bjk
421 bin_file = files[0].get('ap', None) 1bjk
422 if not bin_file: 1bjk
423 return []
424 _, out = extract_sync(self.session_path, ephys_files=files, overwrite=overwrite) 1bjk
425 out_files += out 1bjk
427 status, sync_files = sync_probes.sync(self.session_path, probe_names=self.pname) 1bjk
429 return out_files + sync_files 1bjk
432class RawEphysQC(base_tasks.EphysTask):
434 cpu = 2
435 io_charge = 30 # this jobs reads raw ap files
436 priority = 10 # a lot of jobs depend on this one
437 job_size = 'small'
439 @property
440 def signature(self):
441 signature = {
442 'input_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True),
443 ('*lf.meta', f'{self.device_collection}/{self.pname}', True),
444 ('*lf.ch', f'{self.device_collection}/{self.pname}', False),
445 ('*lf.*bin', f'{self.device_collection}/{self.pname}', False)],
446 'output_files': [('_iblqc_ephysChannels.apRMS.npy', f'{self.device_collection}/{self.pname}', True),
447 ('_iblqc_ephysChannels.rawSpikeRates.npy', f'{self.device_collection}/{self.pname}', True),
448 ('_iblqc_ephysChannels.labels.npy', f'{self.device_collection}/{self.pname}', True),
449 ('_iblqc_ephysSpectralDensityLF.freqs.npy', f'{self.device_collection}/{self.pname}', True),
450 ('_iblqc_ephysSpectralDensityLF.power.npy', f'{self.device_collection}/{self.pname}', True),
451 ('_iblqc_ephysSpectralDensityAP.freqs.npy', f'{self.device_collection}/{self.pname}', True),
452 ('_iblqc_ephysSpectralDensityAP.power.npy', f'{self.device_collection}/{self.pname}', True),
453 ('_iblqc_ephysTimeRmsLF.rms.npy', f'{self.device_collection}/{self.pname}', True),
454 ('_iblqc_ephysTimeRmsLF.timestamps.npy', f'{self.device_collection}/{self.pname}', True)]
455 }
456 return signature
458 # TODO make sure this works with NP2 probes (at the moment not sure it will due to raiseError mapping)
459 def _run(self, overwrite=False):
461 eid = self.one.path2eid(self.session_path)
462 probe = self.one.alyx.rest('insertions', 'list', session=eid, name=self.pname)
464 # We expect there to only be one probe
465 if len(probe) != 1:
466 _logger.warning(f"{self.pname} for {eid} not found") # Should we create it?
467 self.status = -1
468 return
470 pid = probe[0]['id']
471 qc_files = []
472 _logger.info(f"\nRunning QC for probe insertion {self.pname}")
473 try:
474 eqc = ephysqc.EphysQC(pid, session_path=self.session_path, one=self.one)
475 qc_files.extend(eqc.run(update=True, overwrite=overwrite))
476 _logger.info("Creating LFP QC plots")
477 plot_task = LfpPlots(pid, session_path=self.session_path, one=self.one)
478 _ = plot_task.run()
479 self.plot_tasks.append(plot_task)
480 plot_task = BadChannelsAp(pid, session_path=self.session_path, one=self.one)
481 _ = plot_task.run()
482 self.plot_tasks.append(plot_task)
484 except AssertionError:
485 _logger.error(traceback.format_exc())
486 self.status = -1
488 return qc_files
491class SpikeSorting(base_tasks.EphysTask):
492 """
493 Pykilosort 2.5 pipeline
494 """
495 gpu = 1
496 io_charge = 100 # this jobs reads raw ap files
497 priority = 60
498 job_size = 'large'
499 force = True
501 SHELL_SCRIPT = Path.home().joinpath(
502 "Documents/PYTHON/iblscripts/deploy/serverpc/kilosort2/run_pykilosort.sh"
503 )
504 SPIKE_SORTER_NAME = 'pykilosort'
505 PYKILOSORT_REPO = Path.home().joinpath('Documents/PYTHON/SPIKE_SORTING/pykilosort')
507 @property
508 def signature(self):
509 signature = {
510 'input_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True),
511 ('*ap.cbin', f'{self.device_collection}/{self.pname}', True),
512 ('*ap.ch', f'{self.device_collection}/{self.pname}', True),
513 ('*sync.npy', f'{self.device_collection}/{self.pname}', True)],
514 'output_files': [('spike_sorting_pykilosort.log', f'spike_sorters/pykilosort/{self.pname}', True),
515 ('_iblqc_ephysTimeRmsAP.rms.npy', f'{self.device_collection}/{self.pname}', True),
516 ('_iblqc_ephysTimeRmsAP.timestamps.npy', f'{self.device_collection}/{self.pname}', True)]
517 }
518 return signature
520 @staticmethod
521 def _sample2v(ap_file):
522 md = spikeglx.read_meta_data(ap_file.with_suffix(".meta"))
523 s2v = spikeglx._conversion_sample2v_from_meta(md)
524 return s2v["ap"][0]
526 @staticmethod
527 def _fetch_pykilosort_version(repo_path):
528 init_file = Path(repo_path).joinpath('pykilosort', '__init__.py')
529 version = SpikeSorting._fetch_ks2_commit_hash(repo_path) # default
530 try:
531 with open(init_file) as fid:
532 lines = fid.readlines()
533 for line in lines:
534 if line.startswith("__version__ = "):
535 version = line.split('=')[-1].strip().replace('"', '').replace("'", '')
536 except Exception:
537 pass
538 return f"pykilosort_{version}"
540 @staticmethod
541 def _fetch_pykilosort_run_version(log_file):
542 """
543 Parse the following line (2 formats depending on version) from the log files to get the version
544 '\x1b[0m15:39:37.919 [I] ibl:90 Starting Pykilosort version ibl_1.2.1, output in gnagga^[[0m\n'
545 '\x1b[0m15:39:37.919 [I] ibl:90 Starting Pykilosort version ibl_1.3.0^[[0m\n'
546 """
547 with open(log_file) as fid: 1q
548 line = fid.readline() 1q
549 version = re.search('version (.*), output', line) 1q
550 version = version or re.search('version (.*)', line) # old versions have output, new have a version line 1q
551 version = re.sub('\\^[[0-9]+m', '', version.group(1)) # removes the coloring tags 1q
552 return f"pykilosort_{version}" 1q
554 @staticmethod
555 def _fetch_ks2_commit_hash(repo_path):
556 command2run = f"git --git-dir {repo_path}/.git rev-parse --verify HEAD"
557 process = subprocess.Popen(
558 command2run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
559 )
560 info, error = process.communicate()
561 if process.returncode != 0:
562 _logger.error(
563 f"Can't fetch pykilsort commit hash, will still attempt to run \n"
564 f"Error: {error.decode('utf-8')}"
565 )
566 return ""
567 return info.decode("utf-8").strip()
569 def _run_pykilosort(self, ap_file):
570 """
571 Runs the ks2 matlab spike sorting for one probe dataset
572 the raw spike sorting output is in session_path/spike_sorters/{self.SPIKE_SORTER_NAME}/probeXX folder
573 (discontinued support for old spike sortings in the probe folder <1.5.5)
574 :return: path of the folder containing ks2 spike sorting output
575 """
576 self.version = self._fetch_pykilosort_version(self.PYKILOSORT_REPO)
577 label = ap_file.parts[-2] # this is usually the probe name
578 sorter_dir = self.session_path.joinpath("spike_sorters", self.SPIKE_SORTER_NAME, label)
579 self.FORCE_RERUN = False
580 if not self.FORCE_RERUN:
581 log_file = sorter_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log")
582 if log_file.exists():
583 run_version = self._fetch_pykilosort_run_version(log_file)
584 if packaging.version.parse(run_version) > packaging.version.parse('pykilosort_ibl_1.1.0'):
585 _logger.info(f"Already ran: spike_sorting_{self.SPIKE_SORTER_NAME}.log"
586 f" found in {sorter_dir}, skipping.")
587 return sorter_dir
588 else:
589 self.FORCE_RERUN = True
590 # get the scratch drive from the shell script
591 with open(self.SHELL_SCRIPT) as fid:
592 lines = fid.readlines()
593 line = [line for line in lines if line.startswith("SCRATCH_DRIVE=")][0]
594 m = re.search(r"\=(.*?)(\#|\n)", line)[0]
595 scratch_drive = Path(m[1:-1].strip())
596 assert scratch_drive.exists()
597 # clean up and create directory, this also checks write permissions
598 # temp_dir has the following shape: pykilosort/ZM_3003_2020-07-29_001_probe00
599 # first makes sure the tmp dir is clean
600 shutil.rmtree(scratch_drive.joinpath(self.SPIKE_SORTER_NAME), ignore_errors=True)
601 temp_dir = scratch_drive.joinpath(
602 self.SPIKE_SORTER_NAME, "_".join(list(self.session_path.parts[-3:]) + [label])
603 )
604 if temp_dir.exists(): # hmmm this has to be decided, we may want to restart ?
605 # But failed sessions may then clog the scratch dir and have users run out of space
606 shutil.rmtree(temp_dir, ignore_errors=True)
607 log_file = temp_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log")
608 _logger.info(f"job progress command: tail -f {temp_dir} *.log")
609 temp_dir.mkdir(parents=True, exist_ok=True)
610 check_nvidia_driver()
611 command2run = f"{self.SHELL_SCRIPT} {ap_file} {temp_dir}"
612 _logger.info(command2run)
613 process = subprocess.Popen(
614 command2run,
615 shell=True,
616 stdout=subprocess.PIPE,
617 stderr=subprocess.PIPE,
618 executable="/bin/bash",
619 )
620 info, error = process.communicate()
621 info_str = info.decode("utf-8").strip()
622 _logger.info(info_str)
623 if process.returncode != 0:
624 error_str = error.decode("utf-8").strip()
625 # try and get the kilosort log if any
626 for log_file in temp_dir.rglob('*_kilosort.log'):
627 with open(log_file) as fid:
628 log = fid.read()
629 _logger.error(log)
630 break
631 raise RuntimeError(f"{self.SPIKE_SORTER_NAME} {info_str}, {error_str}")
633 shutil.copytree(temp_dir.joinpath('output'), sorter_dir, dirs_exist_ok=True)
634 shutil.rmtree(temp_dir, ignore_errors=True)
636 return sorter_dir
638 def _run(self):
639 """
640 Multiple steps. For each probe:
641 - Runs ks2 (skips if it already ran)
642 - synchronize the spike sorting
643 - output the probe description files
644 :param probes: (list of str) if provided, will only run spike sorting for specified probe names
645 :return: list of files to be registered on database
646 """
647 efiles = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname))
648 ap_files = [(ef.get("ap"), ef.get("label")) for ef in efiles if "ap" in ef.keys()]
649 out_files = []
650 for ap_file, label in ap_files:
651 try:
652 # if the file is part of a sequence, handles the run accordingly
653 sequence_file = ap_file.parent.joinpath(ap_file.stem.replace('ap', 'sequence.json'))
654 # temporary just skips for now
655 if sequence_file.exists():
656 continue
657 ks2_dir = self._run_pykilosort(ap_file) # runs ks2, skips if it already ran
658 probe_out_path = self.session_path.joinpath("alf", label, self.SPIKE_SORTER_NAME)
659 shutil.rmtree(probe_out_path, ignore_errors=True)
660 probe_out_path.mkdir(parents=True, exist_ok=True)
661 spikes.ks2_to_alf(
662 ks2_dir,
663 bin_path=ap_file.parent,
664 out_path=probe_out_path,
665 bin_file=ap_file,
666 ampfactor=self._sample2v(ap_file),
667 )
668 logfile = ks2_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log")
669 if logfile.exists():
670 shutil.copyfile(logfile, probe_out_path.joinpath(f"_ibl_log.info_{self.SPIKE_SORTER_NAME}.log"))
671 # For now leave the syncing here
672 out, _ = spikes.sync_spike_sorting(ap_file=ap_file, out_path=probe_out_path)
673 out_files.extend(out)
674 # convert ks2_output into tar file and also register
675 # Make this in case spike sorting is in old raw_ephys_data folders, for new
676 # sessions it should already exist
677 tar_dir = self.session_path.joinpath(
678 'spike_sorters', self.SPIKE_SORTER_NAME, label)
679 tar_dir.mkdir(parents=True, exist_ok=True)
680 out = spikes.ks2_to_tar(ks2_dir, tar_dir, force=self.FORCE_RERUN)
681 out_files.extend(out)
683 if self.one:
684 eid = self.one.path2eid(self.session_path, query_type='remote')
685 ins = self.one.alyx.rest('insertions', 'list', session=eid, name=label, query_type='remote')
686 if len(ins) != 0:
687 _logger.info("Creating SpikeSorting QC plots")
688 plot_task = ApPlots(ins[0]['id'], session_path=self.session_path, one=self.one)
689 _ = plot_task.run()
690 self.plot_tasks.append(plot_task)
692 plot_task = SpikeSortingPlots(ins[0]['id'], session_path=self.session_path, one=self.one)
693 _ = plot_task.run(collection=str(probe_out_path.relative_to(self.session_path)))
694 self.plot_tasks.append(plot_task)
696 resolved = ins[0].get('json', {'temp': 0}).get('extended_qc', {'temp': 0}). \
697 get('alignment_resolved', False)
698 if resolved:
699 chns = np.load(probe_out_path.joinpath('channels.localCoordinates.npy'))
700 out = get_aligned_channels(ins[0], chns, one=self.one, save_dir=probe_out_path)
701 out_files.extend(out)
703 except BaseException:
704 _logger.error(traceback.format_exc())
705 self.status = -1
706 continue
708 return out_files
711class EphysCellsQc(base_tasks.EphysTask):
712 priority = 90
713 job_size = 'small'
715 @property
716 def signature(self):
717 signature = {
718 'input_files': [('spikes.times.npy', f'alf/{self.pname}*', True),
719 ('spikes.clusters.npy', f'alf/{self.pname}*', True),
720 ('spikes.amps.npy', f'alf/{self.pname}*', True),
721 ('spikes.depths.npy', f'alf/{self.pname}*', True),
722 ('clusters.channels.npy', f'alf/{self.pname}*', True)],
723 'output_files': [('clusters.metrics.pqt', f'alf/{self.pname}*', True)]
724 }
725 return signature
727 def _compute_cell_qc(self, folder_probe):
728 """
729 Computes the cell QC given an extracted probe alf path
730 :param folder_probe: folder
731 :return:
732 """
733 # compute the straight qc
734 _logger.info(f"Computing cluster qc for {folder_probe}")
735 spikes = alfio.load_object(folder_probe, 'spikes')
736 clusters = alfio.load_object(folder_probe, 'clusters')
737 df_units, drift = ephysqc.spike_sorting_metrics(
738 spikes.times, spikes.clusters, spikes.amps, spikes.depths,
739 cluster_ids=np.arange(clusters.channels.size))
740 # if the ks2 labels file exist, load them and add the column
741 file_labels = folder_probe.joinpath('cluster_KSLabel.tsv')
742 if file_labels.exists():
743 ks2_labels = pd.read_csv(file_labels, sep='\t')
744 ks2_labels.rename(columns={'KSLabel': 'ks2_label'}, inplace=True)
745 df_units = pd.concat(
746 [df_units, ks2_labels['ks2_label'].reindex(df_units.index)], axis=1)
747 # save as parquet file
748 df_units.to_parquet(folder_probe.joinpath("clusters.metrics.pqt"))
749 return folder_probe.joinpath("clusters.metrics.pqt"), df_units, drift
751 def _label_probe_qc(self, folder_probe, df_units, drift):
752 """
753 Labels the json field of the alyx corresponding probe insertion
754 :param folder_probe:
755 :param df_units:
756 :param drift:
757 :return:
758 """
759 eid = self.one.path2eid(self.session_path, query_type='remote')
760 pdict = self.one.alyx.rest('insertions', 'list', session=eid, name=self.pname, no_cache=True)
761 if len(pdict) != 1:
762 _logger.warning(f'No probe found for probe name: {self.pname}')
763 return
764 isok = df_units['label'] == 1
765 qcdict = {'n_units': int(df_units.shape[0]),
766 'n_units_qc_pass': int(np.sum(isok)),
767 'firing_rate_max': np.max(df_units['firing_rate'][isok]),
768 'firing_rate_median': np.median(df_units['firing_rate'][isok]),
769 'amplitude_max_uV': np.max(df_units['amp_max'][isok]) * 1e6,
770 'amplitude_median_uV': np.max(df_units['amp_median'][isok]) * 1e6,
771 'drift_rms_um': rms(drift['drift_um']),
772 }
773 file_wm = folder_probe.joinpath('_kilosort_whitening.matrix.npy')
774 if file_wm.exists():
775 wm = np.load(file_wm)
776 qcdict['whitening_matrix_conditioning'] = np.linalg.cond(wm)
777 # groom qc dict (this function will eventually go directly into the json field update)
778 for k in qcdict:
779 if isinstance(qcdict[k], np.int64):
780 qcdict[k] = int(qcdict[k])
781 elif isinstance(qcdict[k], float):
782 qcdict[k] = np.round(qcdict[k], 2)
783 self.one.alyx.json_field_update("insertions", pdict[0]["id"], "json", qcdict)
785 def _run(self):
786 """
787 Post spike-sorting quality control at the cluster level.
788 Outputs a QC table in the clusters ALF object and labels corresponding probes in Alyx
789 """
790 files_spikes = Path(self.session_path).joinpath('alf', self.pname).rglob('spikes.times.npy')
791 folder_probes = [f.parent for f in files_spikes]
792 out_files = []
793 for folder_probe in folder_probes:
794 try:
795 qc_file, df_units, drift = self._compute_cell_qc(folder_probe)
796 out_files.append(qc_file)
797 self._label_probe_qc(folder_probe, df_units, drift)
798 except Exception:
799 _logger.error(traceback.format_exc())
800 self.status = -1
801 continue
802 return out_files