Coverage for ibllib/pipes/ephys_tasks.py: 59%
455 statements
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
« prev ^ index » next coverage.py v7.5.4, created at 2024-07-08 17:16 +0100
1import logging
2import traceback
3from pathlib import Path
4import subprocess
5import re
6import shutil
8import packaging.version
9import numpy as np
10import pandas as pd
11import spikeglx
12import neuropixel
13from ibldsp.utils import rms
14from ibldsp.waveform_extraction import extract_wfs_cbin
15import one.alf.io as alfio
17from ibllib.misc import check_nvidia_driver
18from ibllib.pipes import base_tasks
19from ibllib.pipes.sync_tasks import SyncPulses
20from ibllib.ephys import ephysqc
21import ibllib.ephys.spikes
22from ibllib.qc.alignment_qc import get_aligned_channels
23from ibllib.plots.figures import LfpPlots, ApPlots, BadChannelsAp
24from ibllib.plots.figures import SpikeSorting as SpikeSortingPlots
25from ibllib.io.extractors.ephys_fpga import extract_sync
26from ibllib.ephys.spikes import sync_probes
29_logger = logging.getLogger("ibllib")
32class EphysRegisterRaw(base_tasks.DynamicTask):
33 """
34 Creates the probe insertions and uploads the probe descriptions file, also compresses the nidq files and uploads
35 """
37 priority = 100
38 job_size = 'small'
40 @property
41 def signature(self):
42 signature = { 1aghideu
43 'input_files': [],
44 'output_files': [('probes.description.json', 'alf', True)]
45 }
46 return signature 1aghideu
48 def _run(self):
50 out_files = ibllib.ephys.spikes.probes_description(self.session_path, self.one) 1u
52 return out_files 1u
55class EphysSyncRegisterRaw(base_tasks.DynamicTask):
56 """
57 Task to rename, compress and register raw daq data with .bin format collected using NIDAQ
58 """
60 priority = 90
61 cpu = 2
62 io_charge = 30 # this jobs reads raw ap files
63 job_size = 'small'
65 @property
66 def signature(self):
67 signature = { 1aghidejks
68 'input_files': [('*.*bin', self.sync_collection, True),
69 ('*.meta', self.sync_collection, True),
70 ('*.wiring.json', self.sync_collection, True)],
71 'output_files': [('*nidq.cbin', self.sync_collection, True),
72 ('*nidq.ch', self.sync_collection, True),
73 ('*nidq.meta', self.sync_collection, True),
74 ('*nidq.wiring.json', self.sync_collection, True)]
75 }
76 return signature 1aghidejks
78 def _run(self):
80 out_files = [] 1jks
82 # Detect the wiring file
83 wiring_file = next(self.session_path.joinpath(self.sync_collection).glob('*.wiring.json'), None) 1jks
84 if wiring_file is not None: 1jks
85 out_files.append(wiring_file) 1jks
87 # Search for .bin files in the sync_collection folder
88 files = list(self.session_path.joinpath(self.sync_collection).glob('*nidq.*bin')) 1jks
89 bin_file = files[0] if len(files) == 1 else None 1jks
91 # If we don't have a .bin/ .cbin file anymore see if we can still find the .ch and .meta files
92 if bin_file is None: 1jks
93 for ext in ['ch', 'meta']: 1s
94 files = list(self.session_path.joinpath(self.sync_collection).glob(f'*nidq.{ext}')) 1s
95 ext_file = files[0] if len(files) == 1 else None 1s
96 if ext_file is not None: 1s
97 out_files.append(ext_file) 1s
99 return out_files if len(out_files) > 0 else None 1s
101 # If we do find the .bin file, compress files (only if they haven't already been compressed)
102 sr = spikeglx.Reader(bin_file) 1jk
103 if sr.is_mtscomp: 1jk
104 sr.close() 1j
105 cbin_file = bin_file 1j
106 assert cbin_file.suffix == '.cbin' 1j
107 else:
108 cbin_file = sr.compress_file() 1k
109 sr.close() 1k
110 bin_file.unlink() 1k
112 meta_file = cbin_file.with_suffix('.meta') 1jk
113 ch_file = cbin_file.with_suffix('.ch') 1jk
115 out_files.append(cbin_file) 1jk
116 out_files.append(ch_file) 1jk
117 out_files.append(meta_file) 1jk
119 return out_files 1jk
122class EphysCompressNP1(base_tasks.EphysTask):
123 priority = 90
124 cpu = 2
125 io_charge = 100 # this jobs reads raw ap files
126 job_size = 'small'
128 @property
129 def signature(self):
130 signature = { 1aghiqfr
131 'input_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True),
132 ('*ap.*bin', f'{self.device_collection}/{self.pname}', True),
133 ('*lf.meta', f'{self.device_collection}/{self.pname}', True),
134 ('*lf.*bin', f'{self.device_collection}/{self.pname}', True),
135 ('*wiring.json', f'{self.device_collection}/{self.pname}', False)],
136 'output_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True),
137 ('*ap.cbin', f'{self.device_collection}/{self.pname}', True),
138 ('*ap.ch', f'{self.device_collection}/{self.pname}', True),
139 ('*lf.meta', f'{self.device_collection}/{self.pname}', True),
140 ('*lf.cbin', f'{self.device_collection}/{self.pname}', True),
141 ('*lf.ch', f'{self.device_collection}/{self.pname}', True),
142 ('*wiring.json', f'{self.device_collection}/{self.pname}', False)]
143 }
144 return signature 1aghiqfr
146 def _run(self):
148 out_files = [] 1qfr
150 # Detect and upload the wiring file
151 wiring_file = next(self.session_path.joinpath(self.device_collection, self.pname).glob('*wiring.json'), None) 1qfr
152 if wiring_file is not None: 1qfr
153 out_files.append(wiring_file) 1qfr
155 ephys_files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname)) 1qfr
156 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="ch") 1qfr
157 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="meta") 1qfr
159 for ef in ephys_files: 1qfr
160 for typ in ["ap", "lf"]: 1qfr
161 bin_file = ef.get(typ) 1qfr
162 if not bin_file: 1qfr
163 continue
164 if bin_file.suffix.find("bin") == 1: 1qfr
165 with spikeglx.Reader(bin_file) as sr: 1f
166 if sr.is_mtscomp: 1f
167 out_files.append(bin_file)
168 else:
169 _logger.info(f"Compressing binary file {bin_file}") 1f
170 cbin_file = sr.compress_file() 1f
171 sr.close() 1f
172 bin_file.unlink() 1f
173 out_files.append(cbin_file) 1f
174 out_files.append(cbin_file.with_suffix('.ch')) 1f
175 else:
176 out_files.append(bin_file) 1qfr
178 return out_files 1qfr
181class EphysCompressNP21(base_tasks.EphysTask):
182 priority = 90
183 cpu = 2
184 io_charge = 100 # this jobs reads raw ap files
185 job_size = 'large'
187 @property
188 def signature(self):
189 signature = { 1lmn
190 'input_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True),
191 ('*ap.*bin', f'{self.device_collection}/{self.pname}', True),
192 ('*wiring.json', f'{self.device_collection}/{self.pname}', False)],
193 'output_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True),
194 ('*ap.cbin', f'{self.device_collection}/{self.pname}', True),
195 ('*ap.ch', f'{self.device_collection}/{self.pname}', True),
196 ('*lf.meta', f'{self.device_collection}/{self.pname}', True),
197 ('*lf.cbin', f'{self.device_collection}/{self.pname}', True),
198 ('*lf.ch', f'{self.device_collection}/{self.pname}', True),
199 ('*wiring.json', f'{self.device_collection}/{self.pname}', False)]
200 }
201 return signature 1lmn
203 def _run(self):
205 out_files = [] 1lmn
206 # Detect wiring files
207 wiring_file = next(self.session_path.joinpath(self.device_collection, self.pname).glob('*wiring.json'), None) 1lmn
208 if wiring_file is not None: 1lmn
209 out_files.append(wiring_file) 1lmn
211 ephys_files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname)) 1lmn
212 if len(ephys_files) > 0: 1lmn
213 bin_file = ephys_files[0].get('ap', None) 1lm
215 # This is the case where no ap.bin/.cbin file exists
216 if len(ephys_files) == 0 or not bin_file: 1lmn
217 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="ch") 1n
218 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="meta") 1n
219 for ef in ephys_files: 1n
220 for typ in ["ap", "lf"]: 1n
221 bin_file = ef.get(typ) 1n
222 if bin_file: 1n
223 out_files.append(bin_file) 1n
225 return out_files 1n
227 # If the ap.bin / .cbin file does exists instantiate the NP2converter
228 np_conv = neuropixel.NP2Converter(bin_file, compress=True) 1lm
229 np_conv_status = np_conv.process() 1lm
230 np_conv_files = np_conv.get_processed_files_NP21() 1lm
231 np_conv.sr.close() 1lm
233 # Status = 1 - successfully complete
234 if np_conv_status == 1: # This is the status that it has completed successfully 1lm
235 out_files += np_conv_files 1lm
236 return out_files 1lm
237 # Status = 0 - Already processed
238 elif np_conv_status == 0:
239 ephys_files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname))
240 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="ch")
241 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="meta")
242 for ef in ephys_files:
243 for typ in ["ap", "lf"]:
244 bin_file = ef.get(typ)
245 if bin_file and bin_file.suffix != '.bin':
246 out_files.append(bin_file)
248 return out_files
250 else:
251 return
254class EphysCompressNP24(base_tasks.EphysTask):
255 """
256 Compresses NP2.4 data by splitting into N binary files, corresponding to N shanks
257 :param pname: a probe name string
258 :param device_collection: the collection containing the probes (usually 'raw_ephys_data')
259 :param nshanks: number of shanks used (usually 4 but it may be less depending on electrode map), optional
260 """
262 priority = 90
263 cpu = 2
264 io_charge = 100 # this jobs reads raw ap files
265 job_size = 'large'
267 def __init__(self, session_path, *args, pname=None, device_collection='raw_ephys_data', nshanks=None, **kwargs):
268 assert pname, "pname is a required argument" 1decb
269 if nshanks is None: 1decb
270 meta_file = next(session_path.joinpath(device_collection, pname).glob('*ap.meta')) 1b
271 nshanks = spikeglx._get_nshanks_from_meta(spikeglx.read_meta_data(meta_file)) 1b
272 assert nshanks > 1 1decb
273 super(EphysCompressNP24, self).__init__( 1decb
274 session_path, *args, pname=pname, device_collection=device_collection, nshanks=nshanks, **kwargs)
276 @property
277 def signature(self):
279 signature = { 1decb
280 'input_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True),
281 ('*ap.*bin', f'{self.device_collection}/{self.pname}', True),
282 ('*wiring.json', f'{self.device_collection}/{self.pname}', False)],
283 'output_files': [('*ap.meta', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] +
284 [('*ap.cbin', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] +
285 [('*ap.ch', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] +
286 [('*lf.meta', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] +
287 [('*lf.cbin', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] +
288 [('*lf.ch', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] +
289 [('*wiring.json', f'{self.device_collection}/{self.pname}{pext}', False) for pext in self.pextra]
290 }
291 return signature 1decb
293 def _run(self, delete_original=True):
295 # Do we need the ability to register the files once it already been processed and original file deleted?
297 files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname)) 1cb
298 assert len(files) == 1 1cb
299 bin_file = files[0].get('ap', None) 1cb
301 np_conv = neuropixel.NP2Converter(bin_file, post_check=True, compress=True, delete_original=delete_original) 1cb
302 np_conv_status = np_conv.process() 1cb
303 out_files = np_conv.get_processed_files_NP24() 1cb
304 np_conv.sr.close() 1cb
306 if np_conv_status == 1: 1cb
307 wiring_file = next(self.session_path.joinpath(self.device_collection, self.pname).glob('*wiring.json'), None) 1cb
308 if wiring_file is not None: 1cb
309 # copy wiring file to each sub probe directory and add to output files
310 for pext in self.pextra: 1c
311 new_wiring_file = self.session_path.joinpath(self.device_collection, f'{self.pname}{pext}', wiring_file.name) 1c
312 shutil.copyfile(wiring_file, new_wiring_file) 1c
313 out_files.append(new_wiring_file) 1c
314 return out_files 1cb
315 elif np_conv_status == 0: 1c
316 for pext in self.pextra: 1c
317 ephys_files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, f'{self.pname}{pext}')) 1c
318 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, 1c
319 f'{self.pname}{pext}'), ext="ch")
320 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, 1c
321 f'{self.pname}{pext}'), ext="meta")
322 for ef in ephys_files: 1c
323 for typ in ["ap", "lf"]: 1c
324 bin_file = ef.get(typ) 1c
325 if bin_file and bin_file.suffix != '.bin': 1c
326 out_files.append(bin_file) 1c
328 wiring_file = next(self.session_path.joinpath(self.device_collection, 1c
329 f'{self.pname}{pext}').glob('*wiring.json'), None)
330 if wiring_file is None: 1c
331 # See if we have the original wiring file
332 orig_wiring_file = next(self.session_path.joinpath(self.device_collection,
333 self.pname).glob('*wiring.json'), None)
334 if orig_wiring_file is not None:
335 # copy wiring file to sub probe directory and add to output files
336 new_wiring_file = self.session_path.joinpath(self.device_collection, f'{self.pname}{pext}',
337 orig_wiring_file.name)
338 shutil.copyfile(orig_wiring_file, new_wiring_file)
339 out_files.append(new_wiring_file)
340 else:
341 out_files.append(wiring_file) 1c
343 return out_files 1c
344 else:
345 return
348class EphysSyncPulses(SyncPulses):
350 priority = 90
351 cpu = 2
352 io_charge = 30 # this jobs reads raw ap files
353 job_size = 'small'
355 @property
356 def signature(self):
357 signature = { 1aghidev
358 'input_files': [('*nidq.cbin', self.sync_collection, True),
359 ('*nidq.ch', self.sync_collection, False),
360 ('*nidq.meta', self.sync_collection, True),
361 ('*nidq.wiring.json', self.sync_collection, True)],
362 'output_files': [('_spikeglx_sync.times.npy', self.sync_collection, True),
363 ('_spikeglx_sync.polarities.npy', self.sync_collection, True),
364 ('_spikeglx_sync.channels.npy', self.sync_collection, True)]
365 }
367 return signature 1aghidev
370class EphysPulses(base_tasks.EphysTask):
371 """
372 Extract Pulses from raw electrophysiology data into numpy arrays
373 Perform the probes synchronisation with nidq (3B) or main probe (3A)
374 First the job extract the sync pulses from the synchronisation task in all probes, and then perform the
375 synchronisation with the nidq
377 :param pname: a list of probes names or a single probe name string
378 :param device_collection: the collection containing the probes (usually 'raw_ephys_data')
379 :param sync_collection: the collection containing the synchronisation device - nidq (usually 'raw_ephys_data')
380 """
381 priority = 90
382 cpu = 2
383 io_charge = 30 # this jobs reads raw ap files
384 job_size = 'small'
386 def __init__(self, *args, **kwargs):
387 super(EphysPulses, self).__init__(*args, **kwargs) 1aghidebop
388 assert self.device_collection, "device_collection is a required argument" 1aghidebop
389 assert self.sync_collection, "sync_collection is a required argument" 1aghidebop
390 self.pname = [self.pname] if isinstance(self.pname, str) else self.pname 1aghidebop
391 assert isinstance(self.pname, list), 'pname task argument should be a list or a string' 1aghidebop
393 @property
394 def signature(self):
395 signature = { 1aghidebop
396 'input_files': [('*ap.meta', f'{self.device_collection}/{pname}', True) for pname in self.pname] +
397 [('*ap.cbin', f'{self.device_collection}/{pname}', True) for pname in self.pname] +
398 [('*ap.ch', f'{self.device_collection}/{pname}', True) for pname in self.pname] +
399 [('*ap.wiring.json', f'{self.device_collection}/{pname}', False) for pname in self.pname] +
400 [('_spikeglx_sync.times.npy', self.sync_collection, True),
401 ('_spikeglx_sync.polarities.npy', self.sync_collection, True),
402 ('_spikeglx_sync.channels.npy', self.sync_collection, True)],
403 'output_files': [(f'_spikeglx_sync.times.{pname}.npy', f'{self.device_collection}/{pname}', True)
404 for pname in self.pname] +
405 [(f'_spikeglx_sync.polarities.{pname}.npy', f'{self.device_collection}/{pname}', True)
406 for pname in self.pname] +
407 [(f'_spikeglx_sync.channels.{pname}.npy', f'{self.device_collection}/{pname}', True)
408 for pname in self.pname] +
409 [('*sync.npy', f'{self.device_collection}/{pname}', True) for pname in
410 self.pname] +
411 [('*timestamps.npy', f'{self.device_collection}/{pname}', True) for pname in
412 self.pname]
413 }
415 return signature 1aghidebop
417 def _run(self, overwrite=False):
419 out_files = [] 1bop
420 for probe in self.pname: 1bop
421 files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, probe)) 1bop
422 assert len(files) == 1 # will error if the session is split 1bop
423 bin_file = files[0].get('ap', None) 1bop
424 if not bin_file: 1bop
425 return []
426 _, out = extract_sync(self.session_path, ephys_files=files, overwrite=overwrite) 1bop
427 out_files += out 1bop
429 status, sync_files = sync_probes.sync(self.session_path, probe_names=self.pname) 1bop
431 return out_files + sync_files 1bop
434class RawEphysQC(base_tasks.EphysTask):
436 cpu = 2
437 io_charge = 30 # this jobs reads raw ap files
438 priority = 10 # a lot of jobs depend on this one
439 job_size = 'small'
441 @property
442 def signature(self):
443 signature = { 1aghide
444 'input_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True),
445 ('*lf.meta', f'{self.device_collection}/{self.pname}', True),
446 ('*lf.ch', f'{self.device_collection}/{self.pname}', False),
447 ('*lf.*bin', f'{self.device_collection}/{self.pname}', False)],
448 'output_files': [('_iblqc_ephysChannels.apRMS.npy', f'{self.device_collection}/{self.pname}', True),
449 ('_iblqc_ephysChannels.rawSpikeRates.npy', f'{self.device_collection}/{self.pname}', True),
450 ('_iblqc_ephysChannels.labels.npy', f'{self.device_collection}/{self.pname}', True),
451 ('_iblqc_ephysSpectralDensityLF.freqs.npy', f'{self.device_collection}/{self.pname}', True),
452 ('_iblqc_ephysSpectralDensityLF.power.npy', f'{self.device_collection}/{self.pname}', True),
453 ('_iblqc_ephysSpectralDensityAP.freqs.npy', f'{self.device_collection}/{self.pname}', True),
454 ('_iblqc_ephysSpectralDensityAP.power.npy', f'{self.device_collection}/{self.pname}', True),
455 ('_iblqc_ephysTimeRmsLF.rms.npy', f'{self.device_collection}/{self.pname}', True),
456 ('_iblqc_ephysTimeRmsLF.timestamps.npy', f'{self.device_collection}/{self.pname}', True)]
457 }
458 return signature 1aghide
460 # TODO make sure this works with NP2 probes (at the moment not sure it will due to raiseError mapping)
461 def _run(self, overwrite=False):
463 eid = self.one.path2eid(self.session_path)
464 probe = self.one.alyx.rest('insertions', 'list', session=eid, name=self.pname)
466 # We expect there to only be one probe
467 if len(probe) != 1:
468 _logger.warning(f"{self.pname} for {eid} not found") # Should we create it?
469 self.status = -1
470 return
472 pid = probe[0]['id']
473 qc_files = []
474 _logger.info(f"\nRunning QC for probe insertion {self.pname}")
475 try:
476 eqc = ephysqc.EphysQC(pid, session_path=self.session_path, one=self.one)
477 qc_files.extend(eqc.run(update=True, overwrite=overwrite))
478 _logger.info("Creating LFP QC plots")
479 plot_task = LfpPlots(pid, session_path=self.session_path, one=self.one)
480 _ = plot_task.run()
481 self.plot_tasks.append(plot_task)
482 plot_task = BadChannelsAp(pid, session_path=self.session_path, one=self.one)
483 _ = plot_task.run()
484 self.plot_tasks.append(plot_task)
486 except AssertionError:
487 _logger.error(traceback.format_exc())
488 self.status = -1
490 return qc_files
493class CellQCMixin:
494 """
495 This mixin class is used to compute the cell QC metrics and update the json field of the probe insertion
496 The compute_cell_qc method is static and can be used independently.
497 """
498 @staticmethod
499 def compute_cell_qc(folder_alf_probe):
500 """
501 Computes the cell QC given an extracted probe alf path
502 :param folder_alf_probe: folder
503 :return:
504 """
505 # compute the straight qc
506 _logger.info(f"Computing cluster qc for {folder_alf_probe}")
507 spikes = alfio.load_object(folder_alf_probe, 'spikes')
508 clusters = alfio.load_object(folder_alf_probe, 'clusters')
509 df_units, drift = ephysqc.spike_sorting_metrics(
510 spikes.times, spikes.clusters, spikes.amps, spikes.depths,
511 cluster_ids=np.arange(clusters.channels.size))
512 # if the ks2 labels file exist, load them and add the column
513 file_labels = folder_alf_probe.joinpath('cluster_KSLabel.tsv')
514 if file_labels.exists():
515 ks2_labels = pd.read_csv(file_labels, sep='\t')
516 ks2_labels.rename(columns={'KSLabel': 'ks2_label'}, inplace=True)
517 df_units = pd.concat(
518 [df_units, ks2_labels['ks2_label'].reindex(df_units.index)], axis=1)
519 # save as parquet file
520 df_units.to_parquet(folder_alf_probe.joinpath("clusters.metrics.pqt"))
521 return folder_alf_probe.joinpath("clusters.metrics.pqt"), df_units, drift
523 def _label_probe_qc(self, folder_probe, df_units, drift):
524 """
525 Labels the json field of the alyx corresponding probe insertion
526 :param folder_probe:
527 :param df_units:
528 :param drift:
529 :return:
530 """
531 eid = self.one.path2eid(self.session_path, query_type='remote')
532 pdict = self.one.alyx.rest('insertions', 'list', session=eid, name=self.pname, no_cache=True)
533 if len(pdict) != 1:
534 _logger.warning(f'No probe found for probe name: {self.pname}')
535 return
536 isok = df_units['label'] == 1
537 qcdict = {'n_units': int(df_units.shape[0]),
538 'n_units_qc_pass': int(np.sum(isok)),
539 'firing_rate_max': np.max(df_units['firing_rate'][isok]),
540 'firing_rate_median': np.median(df_units['firing_rate'][isok]),
541 'amplitude_max_uV': np.max(df_units['amp_max'][isok]) * 1e6,
542 'amplitude_median_uV': np.max(df_units['amp_median'][isok]) * 1e6,
543 'drift_rms_um': rms(drift['drift_um']),
544 }
545 file_wm = folder_probe.joinpath('_kilosort_whitening.matrix.npy')
546 if file_wm.exists():
547 wm = np.load(file_wm)
548 qcdict['whitening_matrix_conditioning'] = np.linalg.cond(wm)
549 # groom qc dict (this function will eventually go directly into the json field update)
550 for k in qcdict:
551 if isinstance(qcdict[k], np.int64):
552 qcdict[k] = int(qcdict[k])
553 elif isinstance(qcdict[k], float):
554 qcdict[k] = np.round(qcdict[k], 2)
555 self.one.alyx.json_field_update("insertions", pdict[0]["id"], "json", qcdict)
558class SpikeSorting(base_tasks.EphysTask, CellQCMixin):
559 """
560 Pykilosort 2.5 pipeline
561 """
562 gpu = 1
563 io_charge = 100 # this jobs reads raw ap files
564 priority = 60
565 job_size = 'large'
566 force = True
568 SHELL_SCRIPT = Path.home().joinpath(
569 "Documents/PYTHON/iblscripts/deploy/serverpc/iblsorter/run_iblsorter.sh"
570 )
571 SPIKE_SORTER_NAME = 'iblsorter'
572 PYKILOSORT_REPO = Path.home().joinpath('Documents/PYTHON/SPIKE_SORTING/ibl-sorter')
574 @property
575 def signature(self):
576 signature = { 1aghide
577 'input_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True),
578 ('*ap.*bin', f'{self.device_collection}/{self.pname}', True),
579 ('*ap.ch', f'{self.device_collection}/{self.pname}', False),
580 ('*sync.npy', f'{self.device_collection}/{self.pname}', True)],
581 'output_files': [('spike_sorting_pykilosort.log', f'spike_sorters/pykilosort/{self.pname}', True),
582 ('_iblqc_ephysTimeRmsAP.rms.npy', f'{self.device_collection}/{self.pname}', True),
583 ('_iblqc_ephysTimeRmsAP.timestamps.npy', f'{self.device_collection}/{self.pname}', True)]
584 }
585 return signature 1aghide
587 @staticmethod
588 def _sample2v(ap_file):
589 md = spikeglx.read_meta_data(ap_file.with_suffix(".meta"))
590 s2v = spikeglx._conversion_sample2v_from_meta(md)
591 return s2v["ap"][0]
593 @staticmethod
594 def _fetch_iblsorter_version(repo_path):
595 try:
596 import iblsorter
597 return f"iblsorter_{iblsorter.__version__}"
598 except ImportError:
599 _logger.info('IBL-sorter not in environment, trying to locate the repository')
600 init_file = Path(repo_path).joinpath('ibl-sorter', '__init__.py')
601 try:
602 with open(init_file) as fid:
603 lines = fid.readlines()
604 for line in lines:
605 if line.startswith("__version__ = "):
606 version = line.split('=')[-1].strip().replace('"', '').replace("'", '')
607 except Exception:
608 pass
609 return f"iblsorter_{version}"
611 @staticmethod
612 def _fetch_iblsorter_run_version(log_file):
613 """
614 Parse the following line (2 formats depending on version) from the log files to get the version
615 '\x1b[0m15:39:37.919 [I] ibl:90 Starting Pykilosort version ibl_1.2.1, output in gnagga^[[0m\n'
616 '\x1b[0m15:39:37.919 [I] ibl:90 Starting Pykilosort version ibl_1.3.0^[[0m\n'
617 """
618 with open(log_file) as fid: 1t
619 line = fid.readline() 1t
620 version = re.search('version (.*), output', line) 1t
621 version = version or re.search('version (.*)', line) # old versions have output, new have a version line 1t
622 version = re.sub(r'\^\[{2}[0-9]+m', '', version.group(1)) # removes the coloring tags 1t
623 return version 1t
625 def _run_iblsort(self, ap_file):
626 """
627 Runs the ks2 matlab spike sorting for one probe dataset
628 the raw spike sorting output is in session_path/spike_sorters/{self.SPIKE_SORTER_NAME}/probeXX folder
629 (discontinued support for old spike sortings in the probe folder <1.5.5)
630 :return: path of the folder containing ks2 spike sorting output
631 """
632 self.version = self._fetch_iblsorter_version(self.PYKILOSORT_REPO)
633 label = ap_file.parts[-2] # this is usually the probe name
634 sorter_dir = self.session_path.joinpath("spike_sorters", self.SPIKE_SORTER_NAME, label)
635 self.FORCE_RERUN = False
636 if not self.FORCE_RERUN:
637 log_file = sorter_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log")
638 if log_file.exists():
639 run_version = self._fetch_iblsorter_run_version(log_file)
640 if packaging.version.parse(run_version) >= packaging.version.parse('1.7.0'):
641 _logger.info(f"Already ran: spike_sorting_{self.SPIKE_SORTER_NAME}.log"
642 f" found in {sorter_dir}, skipping.")
643 return sorter_dir
644 else:
645 self.FORCE_RERUN = True
646 # get the scratch drive from the shell script
647 with open(self.SHELL_SCRIPT) as fid:
648 lines = fid.readlines()
649 line = [line for line in lines if line.startswith("SCRATCH_DRIVE=")][0]
650 m = re.search(r"\=(.*?)(\#|\n)", line)[0]
651 scratch_drive = Path(m[1:-1].strip())
652 assert scratch_drive.exists()
653 spikesorter_dir = f"{self.version}_{'_'.join(list(self.session_path.parts[-3:]))}_{self.pname}"
654 temp_dir = scratch_drive.joinpath(spikesorter_dir)
655 _logger.info(f"job progress command: tail -f {temp_dir} *.log")
656 temp_dir.mkdir(parents=True, exist_ok=True)
657 check_nvidia_driver()
658 try:
659 # if pykilosort is in the environment, use the installed version within the task
660 import iblsorter.ibl # noqa
661 iblsorter.ibl.run_spike_sorting_ibl(bin_file=ap_file, scratch_dir=temp_dir)
662 except ImportError:
663 command2run = f"{self.SHELL_SCRIPT} {ap_file} {temp_dir}"
664 _logger.info(command2run)
665 process = subprocess.Popen(
666 command2run,
667 shell=True,
668 stdout=subprocess.PIPE,
669 stderr=subprocess.PIPE,
670 executable="/bin/bash",
671 )
672 info, error = process.communicate()
673 info_str = info.decode("utf-8").strip()
674 _logger.info(info_str)
675 if process.returncode != 0:
676 error_str = error.decode("utf-8").strip()
677 # try and get the kilosort log if any
678 for log_file in temp_dir.rglob('*_kilosort.log'):
679 with open(log_file) as fid:
680 log = fid.read()
681 _logger.error(log)
682 break
683 raise RuntimeError(f"{self.SPIKE_SORTER_NAME} {info_str}, {error_str}")
685 shutil.copytree(temp_dir.joinpath('output'), sorter_dir, dirs_exist_ok=True)
686 shutil.rmtree(temp_dir, ignore_errors=True)
688 return sorter_dir
690 def _run(self):
691 """
692 Multiple steps. For each probe:
693 - Runs ks2 (skips if it already ran)
694 - synchronize the spike sorting
695 - output the probe description files
696 - compute the waveforms
697 :return: list of files to be registered on database
698 """
699 efiles = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname))
700 ap_files = [(ef.get("ap"), ef.get("label")) for ef in efiles if "ap" in ef.keys()]
701 assert len(ap_files) == 1, f"Several bin files found for the same probe {ap_files}"
702 ap_file, label = ap_files[0]
703 out_files = []
704 ks2_dir = self._run_iblsort(ap_file) # runs the sorter, skips if it already ran
705 probe_out_path = self.session_path.joinpath("alf", label, self.SPIKE_SORTER_NAME)
706 shutil.rmtree(probe_out_path, ignore_errors=True)
707 probe_out_path.mkdir(parents=True, exist_ok=True)
708 ibllib.ephys.spikes.ks2_to_alf(
709 ks2_dir,
710 bin_path=ap_file.parent,
711 out_path=probe_out_path,
712 bin_file=ap_file,
713 ampfactor=self._sample2v(ap_file),
714 )
715 logfile = ks2_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log")
716 if logfile.exists():
717 shutil.copyfile(logfile, probe_out_path.joinpath(f"_ibl_log.info_{self.SPIKE_SORTER_NAME}.log"))
718 # Sync spike sorting with the main behaviour clock: the nidq for 3B+ and the main probe for 3A
719 out, _ = ibllib.ephys.spikes.sync_spike_sorting(ap_file=ap_file, out_path=probe_out_path)
720 out_files.extend(out)
721 # Now compute the unit metrics
722 self.compute_cell_qc(probe_out_path)
723 # convert ks2_output into tar file and also register
724 # Make this in case spike sorting is in old raw_ephys_data folders, for new
725 # sessions it should already exist
726 tar_dir = self.session_path.joinpath('spike_sorters', self.SPIKE_SORTER_NAME, label)
727 tar_dir.mkdir(parents=True, exist_ok=True)
728 out = ibllib.ephys.spikes.ks2_to_tar(ks2_dir, tar_dir, force=self.FORCE_RERUN)
729 out_files.extend(out)
730 # run waveform extraction
731 _logger.info("Running waveform extraction")
732 spikes = alfio.load_object(probe_out_path, 'spikes', attribute=['samples', 'clusters'])
733 clusters = alfio.load_object(probe_out_path, 'clusters', attribute=['channels'])
734 channels = alfio.load_object(probe_out_path, 'channels')
735 extract_wfs_cbin(
736 cbin_file=ap_file,
737 output_dir=probe_out_path,
738 spike_samples=spikes['samples'],
739 spike_clusters=spikes['clusters'],
740 spike_channels=clusters['channels'][spikes['clusters']],
741 h=None, # todo the geometry needs to be set using the spikeglx object
742 channel_labels=channels['labels'],
743 max_wf=256,
744 trough_offset=42,
745 spike_length_samples=128,
746 chunksize_samples=int(3000),
747 n_jobs=None,
748 )
749 if self.one:
750 eid = self.one.path2eid(self.session_path, query_type='remote')
751 ins = self.one.alyx.rest('insertions', 'list', session=eid, name=label, query_type='remote')
752 if len(ins) != 0:
753 _logger.info("Creating SpikeSorting QC plots")
754 plot_task = ApPlots(ins[0]['id'], session_path=self.session_path, one=self.one)
755 _ = plot_task.run()
756 self.plot_tasks.append(plot_task)
758 plot_task = SpikeSortingPlots(ins[0]['id'], session_path=self.session_path, one=self.one)
759 _ = plot_task.run(collection=str(probe_out_path.relative_to(self.session_path)))
760 self.plot_tasks.append(plot_task)
762 resolved = ins[0].get('json', {'temp': 0}).get('extended_qc', {'temp': 0}). \
763 get('alignment_resolved', False)
764 if resolved:
765 chns = np.load(probe_out_path.joinpath('channels.localCoordinates.npy'))
766 out = get_aligned_channels(ins[0], chns, one=self.one, save_dir=probe_out_path)
767 out_files.extend(out)
769 return out_files
772class EphysCellsQc(base_tasks.EphysTask, CellQCMixin):
773 priority = 90
774 job_size = 'small'
776 @property
777 def signature(self):
778 signature = { 1aghide
779 'input_files': [('spikes.times.npy', f'alf/{self.pname}*', True),
780 ('spikes.clusters.npy', f'alf/{self.pname}*', True),
781 ('spikes.amps.npy', f'alf/{self.pname}*', True),
782 ('spikes.depths.npy', f'alf/{self.pname}*', True),
783 ('clusters.channels.npy', f'alf/{self.pname}*', True)],
784 'output_files': [('clusters.metrics.pqt', f'alf/{self.pname}*', True)]
785 }
786 return signature 1aghide
788 def _run(self):
789 """
790 Post spike-sorting quality control at the cluster level.
791 Outputs a QC table in the clusters ALF object and labels corresponding probes in Alyx
792 """
793 files_spikes = Path(self.session_path).joinpath('alf', self.pname).rglob('spikes.times.npy')
794 folder_probes = [f.parent for f in files_spikes]
795 out_files = []
796 for folder_probe in folder_probes:
797 try:
798 qc_file, df_units, drift = self.compute_cell_qc(folder_probe)
799 out_files.append(qc_file)
800 self._label_probe_qc(folder_probe, df_units, drift)
801 except Exception:
802 _logger.error(traceback.format_exc())
803 self.status = -1
804 continue
805 return out_files