Coverage for ibllib/pipes/ephys_tasks.py: 59%

455 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-08 17:16 +0100

1import logging 

2import traceback 

3from pathlib import Path 

4import subprocess 

5import re 

6import shutil 

7 

8import packaging.version 

9import numpy as np 

10import pandas as pd 

11import spikeglx 

12import neuropixel 

13from ibldsp.utils import rms 

14from ibldsp.waveform_extraction import extract_wfs_cbin 

15import one.alf.io as alfio 

16 

17from ibllib.misc import check_nvidia_driver 

18from ibllib.pipes import base_tasks 

19from ibllib.pipes.sync_tasks import SyncPulses 

20from ibllib.ephys import ephysqc 

21import ibllib.ephys.spikes 

22from ibllib.qc.alignment_qc import get_aligned_channels 

23from ibllib.plots.figures import LfpPlots, ApPlots, BadChannelsAp 

24from ibllib.plots.figures import SpikeSorting as SpikeSortingPlots 

25from ibllib.io.extractors.ephys_fpga import extract_sync 

26from ibllib.ephys.spikes import sync_probes 

27 

28 

29_logger = logging.getLogger("ibllib") 

30 

31 

32class EphysRegisterRaw(base_tasks.DynamicTask): 

33 """ 

34 Creates the probe insertions and uploads the probe descriptions file, also compresses the nidq files and uploads 

35 """ 

36 

37 priority = 100 

38 job_size = 'small' 

39 

40 @property 

41 def signature(self): 

42 signature = { 1aghideu

43 'input_files': [], 

44 'output_files': [('probes.description.json', 'alf', True)] 

45 } 

46 return signature 1aghideu

47 

48 def _run(self): 

49 

50 out_files = ibllib.ephys.spikes.probes_description(self.session_path, self.one) 1u

51 

52 return out_files 1u

53 

54 

55class EphysSyncRegisterRaw(base_tasks.DynamicTask): 

56 """ 

57 Task to rename, compress and register raw daq data with .bin format collected using NIDAQ 

58 """ 

59 

60 priority = 90 

61 cpu = 2 

62 io_charge = 30 # this jobs reads raw ap files 

63 job_size = 'small' 

64 

65 @property 

66 def signature(self): 

67 signature = { 1aghidejks

68 'input_files': [('*.*bin', self.sync_collection, True), 

69 ('*.meta', self.sync_collection, True), 

70 ('*.wiring.json', self.sync_collection, True)], 

71 'output_files': [('*nidq.cbin', self.sync_collection, True), 

72 ('*nidq.ch', self.sync_collection, True), 

73 ('*nidq.meta', self.sync_collection, True), 

74 ('*nidq.wiring.json', self.sync_collection, True)] 

75 } 

76 return signature 1aghidejks

77 

78 def _run(self): 

79 

80 out_files = [] 1jks

81 

82 # Detect the wiring file 

83 wiring_file = next(self.session_path.joinpath(self.sync_collection).glob('*.wiring.json'), None) 1jks

84 if wiring_file is not None: 1jks

85 out_files.append(wiring_file) 1jks

86 

87 # Search for .bin files in the sync_collection folder 

88 files = list(self.session_path.joinpath(self.sync_collection).glob('*nidq.*bin')) 1jks

89 bin_file = files[0] if len(files) == 1 else None 1jks

90 

91 # If we don't have a .bin/ .cbin file anymore see if we can still find the .ch and .meta files 

92 if bin_file is None: 1jks

93 for ext in ['ch', 'meta']: 1s

94 files = list(self.session_path.joinpath(self.sync_collection).glob(f'*nidq.{ext}')) 1s

95 ext_file = files[0] if len(files) == 1 else None 1s

96 if ext_file is not None: 1s

97 out_files.append(ext_file) 1s

98 

99 return out_files if len(out_files) > 0 else None 1s

100 

101 # If we do find the .bin file, compress files (only if they haven't already been compressed) 

102 sr = spikeglx.Reader(bin_file) 1jk

103 if sr.is_mtscomp: 1jk

104 sr.close() 1j

105 cbin_file = bin_file 1j

106 assert cbin_file.suffix == '.cbin' 1j

107 else: 

108 cbin_file = sr.compress_file() 1k

109 sr.close() 1k

110 bin_file.unlink() 1k

111 

112 meta_file = cbin_file.with_suffix('.meta') 1jk

113 ch_file = cbin_file.with_suffix('.ch') 1jk

114 

115 out_files.append(cbin_file) 1jk

116 out_files.append(ch_file) 1jk

117 out_files.append(meta_file) 1jk

118 

119 return out_files 1jk

120 

121 

122class EphysCompressNP1(base_tasks.EphysTask): 

123 priority = 90 

124 cpu = 2 

125 io_charge = 100 # this jobs reads raw ap files 

126 job_size = 'small' 

127 

128 @property 

129 def signature(self): 

130 signature = { 1aghiqfr

131 'input_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True), 

132 ('*ap.*bin', f'{self.device_collection}/{self.pname}', True), 

133 ('*lf.meta', f'{self.device_collection}/{self.pname}', True), 

134 ('*lf.*bin', f'{self.device_collection}/{self.pname}', True), 

135 ('*wiring.json', f'{self.device_collection}/{self.pname}', False)], 

136 'output_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True), 

137 ('*ap.cbin', f'{self.device_collection}/{self.pname}', True), 

138 ('*ap.ch', f'{self.device_collection}/{self.pname}', True), 

139 ('*lf.meta', f'{self.device_collection}/{self.pname}', True), 

140 ('*lf.cbin', f'{self.device_collection}/{self.pname}', True), 

141 ('*lf.ch', f'{self.device_collection}/{self.pname}', True), 

142 ('*wiring.json', f'{self.device_collection}/{self.pname}', False)] 

143 } 

144 return signature 1aghiqfr

145 

146 def _run(self): 

147 

148 out_files = [] 1qfr

149 

150 # Detect and upload the wiring file 

151 wiring_file = next(self.session_path.joinpath(self.device_collection, self.pname).glob('*wiring.json'), None) 1qfr

152 if wiring_file is not None: 1qfr

153 out_files.append(wiring_file) 1qfr

154 

155 ephys_files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname)) 1qfr

156 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="ch") 1qfr

157 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="meta") 1qfr

158 

159 for ef in ephys_files: 1qfr

160 for typ in ["ap", "lf"]: 1qfr

161 bin_file = ef.get(typ) 1qfr

162 if not bin_file: 1qfr

163 continue 

164 if bin_file.suffix.find("bin") == 1: 1qfr

165 with spikeglx.Reader(bin_file) as sr: 1f

166 if sr.is_mtscomp: 1f

167 out_files.append(bin_file) 

168 else: 

169 _logger.info(f"Compressing binary file {bin_file}") 1f

170 cbin_file = sr.compress_file() 1f

171 sr.close() 1f

172 bin_file.unlink() 1f

173 out_files.append(cbin_file) 1f

174 out_files.append(cbin_file.with_suffix('.ch')) 1f

175 else: 

176 out_files.append(bin_file) 1qfr

177 

178 return out_files 1qfr

179 

180 

181class EphysCompressNP21(base_tasks.EphysTask): 

182 priority = 90 

183 cpu = 2 

184 io_charge = 100 # this jobs reads raw ap files 

185 job_size = 'large' 

186 

187 @property 

188 def signature(self): 

189 signature = { 1lmn

190 'input_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True), 

191 ('*ap.*bin', f'{self.device_collection}/{self.pname}', True), 

192 ('*wiring.json', f'{self.device_collection}/{self.pname}', False)], 

193 'output_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True), 

194 ('*ap.cbin', f'{self.device_collection}/{self.pname}', True), 

195 ('*ap.ch', f'{self.device_collection}/{self.pname}', True), 

196 ('*lf.meta', f'{self.device_collection}/{self.pname}', True), 

197 ('*lf.cbin', f'{self.device_collection}/{self.pname}', True), 

198 ('*lf.ch', f'{self.device_collection}/{self.pname}', True), 

199 ('*wiring.json', f'{self.device_collection}/{self.pname}', False)] 

200 } 

201 return signature 1lmn

202 

203 def _run(self): 

204 

205 out_files = [] 1lmn

206 # Detect wiring files 

207 wiring_file = next(self.session_path.joinpath(self.device_collection, self.pname).glob('*wiring.json'), None) 1lmn

208 if wiring_file is not None: 1lmn

209 out_files.append(wiring_file) 1lmn

210 

211 ephys_files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname)) 1lmn

212 if len(ephys_files) > 0: 1lmn

213 bin_file = ephys_files[0].get('ap', None) 1lm

214 

215 # This is the case where no ap.bin/.cbin file exists 

216 if len(ephys_files) == 0 or not bin_file: 1lmn

217 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="ch") 1n

218 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="meta") 1n

219 for ef in ephys_files: 1n

220 for typ in ["ap", "lf"]: 1n

221 bin_file = ef.get(typ) 1n

222 if bin_file: 1n

223 out_files.append(bin_file) 1n

224 

225 return out_files 1n

226 

227 # If the ap.bin / .cbin file does exists instantiate the NP2converter 

228 np_conv = neuropixel.NP2Converter(bin_file, compress=True) 1lm

229 np_conv_status = np_conv.process() 1lm

230 np_conv_files = np_conv.get_processed_files_NP21() 1lm

231 np_conv.sr.close() 1lm

232 

233 # Status = 1 - successfully complete 

234 if np_conv_status == 1: # This is the status that it has completed successfully 1lm

235 out_files += np_conv_files 1lm

236 return out_files 1lm

237 # Status = 0 - Already processed 

238 elif np_conv_status == 0: 

239 ephys_files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname)) 

240 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="ch") 

241 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="meta") 

242 for ef in ephys_files: 

243 for typ in ["ap", "lf"]: 

244 bin_file = ef.get(typ) 

245 if bin_file and bin_file.suffix != '.bin': 

246 out_files.append(bin_file) 

247 

248 return out_files 

249 

250 else: 

251 return 

252 

253 

254class EphysCompressNP24(base_tasks.EphysTask): 

255 """ 

256 Compresses NP2.4 data by splitting into N binary files, corresponding to N shanks 

257 :param pname: a probe name string 

258 :param device_collection: the collection containing the probes (usually 'raw_ephys_data') 

259 :param nshanks: number of shanks used (usually 4 but it may be less depending on electrode map), optional 

260 """ 

261 

262 priority = 90 

263 cpu = 2 

264 io_charge = 100 # this jobs reads raw ap files 

265 job_size = 'large' 

266 

267 def __init__(self, session_path, *args, pname=None, device_collection='raw_ephys_data', nshanks=None, **kwargs): 

268 assert pname, "pname is a required argument" 1decb

269 if nshanks is None: 1decb

270 meta_file = next(session_path.joinpath(device_collection, pname).glob('*ap.meta')) 1b

271 nshanks = spikeglx._get_nshanks_from_meta(spikeglx.read_meta_data(meta_file)) 1b

272 assert nshanks > 1 1decb

273 super(EphysCompressNP24, self).__init__( 1decb

274 session_path, *args, pname=pname, device_collection=device_collection, nshanks=nshanks, **kwargs) 

275 

276 @property 

277 def signature(self): 

278 

279 signature = { 1decb

280 'input_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True), 

281 ('*ap.*bin', f'{self.device_collection}/{self.pname}', True), 

282 ('*wiring.json', f'{self.device_collection}/{self.pname}', False)], 

283 'output_files': [('*ap.meta', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] + 

284 [('*ap.cbin', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] + 

285 [('*ap.ch', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] + 

286 [('*lf.meta', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] + 

287 [('*lf.cbin', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] + 

288 [('*lf.ch', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] + 

289 [('*wiring.json', f'{self.device_collection}/{self.pname}{pext}', False) for pext in self.pextra] 

290 } 

291 return signature 1decb

292 

293 def _run(self, delete_original=True): 

294 

295 # Do we need the ability to register the files once it already been processed and original file deleted? 

296 

297 files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname)) 1cb

298 assert len(files) == 1 1cb

299 bin_file = files[0].get('ap', None) 1cb

300 

301 np_conv = neuropixel.NP2Converter(bin_file, post_check=True, compress=True, delete_original=delete_original) 1cb

302 np_conv_status = np_conv.process() 1cb

303 out_files = np_conv.get_processed_files_NP24() 1cb

304 np_conv.sr.close() 1cb

305 

306 if np_conv_status == 1: 1cb

307 wiring_file = next(self.session_path.joinpath(self.device_collection, self.pname).glob('*wiring.json'), None) 1cb

308 if wiring_file is not None: 1cb

309 # copy wiring file to each sub probe directory and add to output files 

310 for pext in self.pextra: 1c

311 new_wiring_file = self.session_path.joinpath(self.device_collection, f'{self.pname}{pext}', wiring_file.name) 1c

312 shutil.copyfile(wiring_file, new_wiring_file) 1c

313 out_files.append(new_wiring_file) 1c

314 return out_files 1cb

315 elif np_conv_status == 0: 1c

316 for pext in self.pextra: 1c

317 ephys_files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, f'{self.pname}{pext}')) 1c

318 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, 1c

319 f'{self.pname}{pext}'), ext="ch") 

320 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, 1c

321 f'{self.pname}{pext}'), ext="meta") 

322 for ef in ephys_files: 1c

323 for typ in ["ap", "lf"]: 1c

324 bin_file = ef.get(typ) 1c

325 if bin_file and bin_file.suffix != '.bin': 1c

326 out_files.append(bin_file) 1c

327 

328 wiring_file = next(self.session_path.joinpath(self.device_collection, 1c

329 f'{self.pname}{pext}').glob('*wiring.json'), None) 

330 if wiring_file is None: 1c

331 # See if we have the original wiring file 

332 orig_wiring_file = next(self.session_path.joinpath(self.device_collection, 

333 self.pname).glob('*wiring.json'), None) 

334 if orig_wiring_file is not None: 

335 # copy wiring file to sub probe directory and add to output files 

336 new_wiring_file = self.session_path.joinpath(self.device_collection, f'{self.pname}{pext}', 

337 orig_wiring_file.name) 

338 shutil.copyfile(orig_wiring_file, new_wiring_file) 

339 out_files.append(new_wiring_file) 

340 else: 

341 out_files.append(wiring_file) 1c

342 

343 return out_files 1c

344 else: 

345 return 

346 

347 

348class EphysSyncPulses(SyncPulses): 

349 

350 priority = 90 

351 cpu = 2 

352 io_charge = 30 # this jobs reads raw ap files 

353 job_size = 'small' 

354 

355 @property 

356 def signature(self): 

357 signature = { 1aghidev

358 'input_files': [('*nidq.cbin', self.sync_collection, True), 

359 ('*nidq.ch', self.sync_collection, False), 

360 ('*nidq.meta', self.sync_collection, True), 

361 ('*nidq.wiring.json', self.sync_collection, True)], 

362 'output_files': [('_spikeglx_sync.times.npy', self.sync_collection, True), 

363 ('_spikeglx_sync.polarities.npy', self.sync_collection, True), 

364 ('_spikeglx_sync.channels.npy', self.sync_collection, True)] 

365 } 

366 

367 return signature 1aghidev

368 

369 

370class EphysPulses(base_tasks.EphysTask): 

371 """ 

372 Extract Pulses from raw electrophysiology data into numpy arrays 

373 Perform the probes synchronisation with nidq (3B) or main probe (3A) 

374 First the job extract the sync pulses from the synchronisation task in all probes, and then perform the 

375 synchronisation with the nidq 

376 

377 :param pname: a list of probes names or a single probe name string 

378 :param device_collection: the collection containing the probes (usually 'raw_ephys_data') 

379 :param sync_collection: the collection containing the synchronisation device - nidq (usually 'raw_ephys_data') 

380 """ 

381 priority = 90 

382 cpu = 2 

383 io_charge = 30 # this jobs reads raw ap files 

384 job_size = 'small' 

385 

386 def __init__(self, *args, **kwargs): 

387 super(EphysPulses, self).__init__(*args, **kwargs) 1aghidebop

388 assert self.device_collection, "device_collection is a required argument" 1aghidebop

389 assert self.sync_collection, "sync_collection is a required argument" 1aghidebop

390 self.pname = [self.pname] if isinstance(self.pname, str) else self.pname 1aghidebop

391 assert isinstance(self.pname, list), 'pname task argument should be a list or a string' 1aghidebop

392 

393 @property 

394 def signature(self): 

395 signature = { 1aghidebop

396 'input_files': [('*ap.meta', f'{self.device_collection}/{pname}', True) for pname in self.pname] + 

397 [('*ap.cbin', f'{self.device_collection}/{pname}', True) for pname in self.pname] + 

398 [('*ap.ch', f'{self.device_collection}/{pname}', True) for pname in self.pname] + 

399 [('*ap.wiring.json', f'{self.device_collection}/{pname}', False) for pname in self.pname] + 

400 [('_spikeglx_sync.times.npy', self.sync_collection, True), 

401 ('_spikeglx_sync.polarities.npy', self.sync_collection, True), 

402 ('_spikeglx_sync.channels.npy', self.sync_collection, True)], 

403 'output_files': [(f'_spikeglx_sync.times.{pname}.npy', f'{self.device_collection}/{pname}', True) 

404 for pname in self.pname] + 

405 [(f'_spikeglx_sync.polarities.{pname}.npy', f'{self.device_collection}/{pname}', True) 

406 for pname in self.pname] + 

407 [(f'_spikeglx_sync.channels.{pname}.npy', f'{self.device_collection}/{pname}', True) 

408 for pname in self.pname] + 

409 [('*sync.npy', f'{self.device_collection}/{pname}', True) for pname in 

410 self.pname] + 

411 [('*timestamps.npy', f'{self.device_collection}/{pname}', True) for pname in 

412 self.pname] 

413 } 

414 

415 return signature 1aghidebop

416 

417 def _run(self, overwrite=False): 

418 

419 out_files = [] 1bop

420 for probe in self.pname: 1bop

421 files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, probe)) 1bop

422 assert len(files) == 1 # will error if the session is split 1bop

423 bin_file = files[0].get('ap', None) 1bop

424 if not bin_file: 1bop

425 return [] 

426 _, out = extract_sync(self.session_path, ephys_files=files, overwrite=overwrite) 1bop

427 out_files += out 1bop

428 

429 status, sync_files = sync_probes.sync(self.session_path, probe_names=self.pname) 1bop

430 

431 return out_files + sync_files 1bop

432 

433 

434class RawEphysQC(base_tasks.EphysTask): 

435 

436 cpu = 2 

437 io_charge = 30 # this jobs reads raw ap files 

438 priority = 10 # a lot of jobs depend on this one 

439 job_size = 'small' 

440 

441 @property 

442 def signature(self): 

443 signature = { 1aghide

444 'input_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True), 

445 ('*lf.meta', f'{self.device_collection}/{self.pname}', True), 

446 ('*lf.ch', f'{self.device_collection}/{self.pname}', False), 

447 ('*lf.*bin', f'{self.device_collection}/{self.pname}', False)], 

448 'output_files': [('_iblqc_ephysChannels.apRMS.npy', f'{self.device_collection}/{self.pname}', True), 

449 ('_iblqc_ephysChannels.rawSpikeRates.npy', f'{self.device_collection}/{self.pname}', True), 

450 ('_iblqc_ephysChannels.labels.npy', f'{self.device_collection}/{self.pname}', True), 

451 ('_iblqc_ephysSpectralDensityLF.freqs.npy', f'{self.device_collection}/{self.pname}', True), 

452 ('_iblqc_ephysSpectralDensityLF.power.npy', f'{self.device_collection}/{self.pname}', True), 

453 ('_iblqc_ephysSpectralDensityAP.freqs.npy', f'{self.device_collection}/{self.pname}', True), 

454 ('_iblqc_ephysSpectralDensityAP.power.npy', f'{self.device_collection}/{self.pname}', True), 

455 ('_iblqc_ephysTimeRmsLF.rms.npy', f'{self.device_collection}/{self.pname}', True), 

456 ('_iblqc_ephysTimeRmsLF.timestamps.npy', f'{self.device_collection}/{self.pname}', True)] 

457 } 

458 return signature 1aghide

459 

460 # TODO make sure this works with NP2 probes (at the moment not sure it will due to raiseError mapping) 

461 def _run(self, overwrite=False): 

462 

463 eid = self.one.path2eid(self.session_path) 

464 probe = self.one.alyx.rest('insertions', 'list', session=eid, name=self.pname) 

465 

466 # We expect there to only be one probe 

467 if len(probe) != 1: 

468 _logger.warning(f"{self.pname} for {eid} not found") # Should we create it? 

469 self.status = -1 

470 return 

471 

472 pid = probe[0]['id'] 

473 qc_files = [] 

474 _logger.info(f"\nRunning QC for probe insertion {self.pname}") 

475 try: 

476 eqc = ephysqc.EphysQC(pid, session_path=self.session_path, one=self.one) 

477 qc_files.extend(eqc.run(update=True, overwrite=overwrite)) 

478 _logger.info("Creating LFP QC plots") 

479 plot_task = LfpPlots(pid, session_path=self.session_path, one=self.one) 

480 _ = plot_task.run() 

481 self.plot_tasks.append(plot_task) 

482 plot_task = BadChannelsAp(pid, session_path=self.session_path, one=self.one) 

483 _ = plot_task.run() 

484 self.plot_tasks.append(plot_task) 

485 

486 except AssertionError: 

487 _logger.error(traceback.format_exc()) 

488 self.status = -1 

489 

490 return qc_files 

491 

492 

493class CellQCMixin: 

494 """ 

495 This mixin class is used to compute the cell QC metrics and update the json field of the probe insertion 

496 The compute_cell_qc method is static and can be used independently. 

497 """ 

498 @staticmethod 

499 def compute_cell_qc(folder_alf_probe): 

500 """ 

501 Computes the cell QC given an extracted probe alf path 

502 :param folder_alf_probe: folder 

503 :return: 

504 """ 

505 # compute the straight qc 

506 _logger.info(f"Computing cluster qc for {folder_alf_probe}") 

507 spikes = alfio.load_object(folder_alf_probe, 'spikes') 

508 clusters = alfio.load_object(folder_alf_probe, 'clusters') 

509 df_units, drift = ephysqc.spike_sorting_metrics( 

510 spikes.times, spikes.clusters, spikes.amps, spikes.depths, 

511 cluster_ids=np.arange(clusters.channels.size)) 

512 # if the ks2 labels file exist, load them and add the column 

513 file_labels = folder_alf_probe.joinpath('cluster_KSLabel.tsv') 

514 if file_labels.exists(): 

515 ks2_labels = pd.read_csv(file_labels, sep='\t') 

516 ks2_labels.rename(columns={'KSLabel': 'ks2_label'}, inplace=True) 

517 df_units = pd.concat( 

518 [df_units, ks2_labels['ks2_label'].reindex(df_units.index)], axis=1) 

519 # save as parquet file 

520 df_units.to_parquet(folder_alf_probe.joinpath("clusters.metrics.pqt")) 

521 return folder_alf_probe.joinpath("clusters.metrics.pqt"), df_units, drift 

522 

523 def _label_probe_qc(self, folder_probe, df_units, drift): 

524 """ 

525 Labels the json field of the alyx corresponding probe insertion 

526 :param folder_probe: 

527 :param df_units: 

528 :param drift: 

529 :return: 

530 """ 

531 eid = self.one.path2eid(self.session_path, query_type='remote') 

532 pdict = self.one.alyx.rest('insertions', 'list', session=eid, name=self.pname, no_cache=True) 

533 if len(pdict) != 1: 

534 _logger.warning(f'No probe found for probe name: {self.pname}') 

535 return 

536 isok = df_units['label'] == 1 

537 qcdict = {'n_units': int(df_units.shape[0]), 

538 'n_units_qc_pass': int(np.sum(isok)), 

539 'firing_rate_max': np.max(df_units['firing_rate'][isok]), 

540 'firing_rate_median': np.median(df_units['firing_rate'][isok]), 

541 'amplitude_max_uV': np.max(df_units['amp_max'][isok]) * 1e6, 

542 'amplitude_median_uV': np.max(df_units['amp_median'][isok]) * 1e6, 

543 'drift_rms_um': rms(drift['drift_um']), 

544 } 

545 file_wm = folder_probe.joinpath('_kilosort_whitening.matrix.npy') 

546 if file_wm.exists(): 

547 wm = np.load(file_wm) 

548 qcdict['whitening_matrix_conditioning'] = np.linalg.cond(wm) 

549 # groom qc dict (this function will eventually go directly into the json field update) 

550 for k in qcdict: 

551 if isinstance(qcdict[k], np.int64): 

552 qcdict[k] = int(qcdict[k]) 

553 elif isinstance(qcdict[k], float): 

554 qcdict[k] = np.round(qcdict[k], 2) 

555 self.one.alyx.json_field_update("insertions", pdict[0]["id"], "json", qcdict) 

556 

557 

558class SpikeSorting(base_tasks.EphysTask, CellQCMixin): 

559 """ 

560 Pykilosort 2.5 pipeline 

561 """ 

562 gpu = 1 

563 io_charge = 100 # this jobs reads raw ap files 

564 priority = 60 

565 job_size = 'large' 

566 force = True 

567 

568 SHELL_SCRIPT = Path.home().joinpath( 

569 "Documents/PYTHON/iblscripts/deploy/serverpc/iblsorter/run_iblsorter.sh" 

570 ) 

571 SPIKE_SORTER_NAME = 'iblsorter' 

572 PYKILOSORT_REPO = Path.home().joinpath('Documents/PYTHON/SPIKE_SORTING/ibl-sorter') 

573 

574 @property 

575 def signature(self): 

576 signature = { 1aghide

577 'input_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True), 

578 ('*ap.*bin', f'{self.device_collection}/{self.pname}', True), 

579 ('*ap.ch', f'{self.device_collection}/{self.pname}', False), 

580 ('*sync.npy', f'{self.device_collection}/{self.pname}', True)], 

581 'output_files': [('spike_sorting_pykilosort.log', f'spike_sorters/pykilosort/{self.pname}', True), 

582 ('_iblqc_ephysTimeRmsAP.rms.npy', f'{self.device_collection}/{self.pname}', True), 

583 ('_iblqc_ephysTimeRmsAP.timestamps.npy', f'{self.device_collection}/{self.pname}', True)] 

584 } 

585 return signature 1aghide

586 

587 @staticmethod 

588 def _sample2v(ap_file): 

589 md = spikeglx.read_meta_data(ap_file.with_suffix(".meta")) 

590 s2v = spikeglx._conversion_sample2v_from_meta(md) 

591 return s2v["ap"][0] 

592 

593 @staticmethod 

594 def _fetch_iblsorter_version(repo_path): 

595 try: 

596 import iblsorter 

597 return f"iblsorter_{iblsorter.__version__}" 

598 except ImportError: 

599 _logger.info('IBL-sorter not in environment, trying to locate the repository') 

600 init_file = Path(repo_path).joinpath('ibl-sorter', '__init__.py') 

601 try: 

602 with open(init_file) as fid: 

603 lines = fid.readlines() 

604 for line in lines: 

605 if line.startswith("__version__ = "): 

606 version = line.split('=')[-1].strip().replace('"', '').replace("'", '') 

607 except Exception: 

608 pass 

609 return f"iblsorter_{version}" 

610 

611 @staticmethod 

612 def _fetch_iblsorter_run_version(log_file): 

613 """ 

614 Parse the following line (2 formats depending on version) from the log files to get the version 

615 '\x1b[0m15:39:37.919 [I] ibl:90 Starting Pykilosort version ibl_1.2.1, output in gnagga^[[0m\n' 

616 '\x1b[0m15:39:37.919 [I] ibl:90 Starting Pykilosort version ibl_1.3.0^[[0m\n' 

617 """ 

618 with open(log_file) as fid: 1t

619 line = fid.readline() 1t

620 version = re.search('version (.*), output', line) 1t

621 version = version or re.search('version (.*)', line) # old versions have output, new have a version line 1t

622 version = re.sub(r'\^\[{2}[0-9]+m', '', version.group(1)) # removes the coloring tags 1t

623 return version 1t

624 

625 def _run_iblsort(self, ap_file): 

626 """ 

627 Runs the ks2 matlab spike sorting for one probe dataset 

628 the raw spike sorting output is in session_path/spike_sorters/{self.SPIKE_SORTER_NAME}/probeXX folder 

629 (discontinued support for old spike sortings in the probe folder <1.5.5) 

630 :return: path of the folder containing ks2 spike sorting output 

631 """ 

632 self.version = self._fetch_iblsorter_version(self.PYKILOSORT_REPO) 

633 label = ap_file.parts[-2] # this is usually the probe name 

634 sorter_dir = self.session_path.joinpath("spike_sorters", self.SPIKE_SORTER_NAME, label) 

635 self.FORCE_RERUN = False 

636 if not self.FORCE_RERUN: 

637 log_file = sorter_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log") 

638 if log_file.exists(): 

639 run_version = self._fetch_iblsorter_run_version(log_file) 

640 if packaging.version.parse(run_version) >= packaging.version.parse('1.7.0'): 

641 _logger.info(f"Already ran: spike_sorting_{self.SPIKE_SORTER_NAME}.log" 

642 f" found in {sorter_dir}, skipping.") 

643 return sorter_dir 

644 else: 

645 self.FORCE_RERUN = True 

646 # get the scratch drive from the shell script 

647 with open(self.SHELL_SCRIPT) as fid: 

648 lines = fid.readlines() 

649 line = [line for line in lines if line.startswith("SCRATCH_DRIVE=")][0] 

650 m = re.search(r"\=(.*?)(\#|\n)", line)[0] 

651 scratch_drive = Path(m[1:-1].strip()) 

652 assert scratch_drive.exists() 

653 spikesorter_dir = f"{self.version}_{'_'.join(list(self.session_path.parts[-3:]))}_{self.pname}" 

654 temp_dir = scratch_drive.joinpath(spikesorter_dir) 

655 _logger.info(f"job progress command: tail -f {temp_dir} *.log") 

656 temp_dir.mkdir(parents=True, exist_ok=True) 

657 check_nvidia_driver() 

658 try: 

659 # if pykilosort is in the environment, use the installed version within the task 

660 import iblsorter.ibl # noqa 

661 iblsorter.ibl.run_spike_sorting_ibl(bin_file=ap_file, scratch_dir=temp_dir) 

662 except ImportError: 

663 command2run = f"{self.SHELL_SCRIPT} {ap_file} {temp_dir}" 

664 _logger.info(command2run) 

665 process = subprocess.Popen( 

666 command2run, 

667 shell=True, 

668 stdout=subprocess.PIPE, 

669 stderr=subprocess.PIPE, 

670 executable="/bin/bash", 

671 ) 

672 info, error = process.communicate() 

673 info_str = info.decode("utf-8").strip() 

674 _logger.info(info_str) 

675 if process.returncode != 0: 

676 error_str = error.decode("utf-8").strip() 

677 # try and get the kilosort log if any 

678 for log_file in temp_dir.rglob('*_kilosort.log'): 

679 with open(log_file) as fid: 

680 log = fid.read() 

681 _logger.error(log) 

682 break 

683 raise RuntimeError(f"{self.SPIKE_SORTER_NAME} {info_str}, {error_str}") 

684 

685 shutil.copytree(temp_dir.joinpath('output'), sorter_dir, dirs_exist_ok=True) 

686 shutil.rmtree(temp_dir, ignore_errors=True) 

687 

688 return sorter_dir 

689 

690 def _run(self): 

691 """ 

692 Multiple steps. For each probe: 

693 - Runs ks2 (skips if it already ran) 

694 - synchronize the spike sorting 

695 - output the probe description files 

696 - compute the waveforms 

697 :return: list of files to be registered on database 

698 """ 

699 efiles = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname)) 

700 ap_files = [(ef.get("ap"), ef.get("label")) for ef in efiles if "ap" in ef.keys()] 

701 assert len(ap_files) == 1, f"Several bin files found for the same probe {ap_files}" 

702 ap_file, label = ap_files[0] 

703 out_files = [] 

704 ks2_dir = self._run_iblsort(ap_file) # runs the sorter, skips if it already ran 

705 probe_out_path = self.session_path.joinpath("alf", label, self.SPIKE_SORTER_NAME) 

706 shutil.rmtree(probe_out_path, ignore_errors=True) 

707 probe_out_path.mkdir(parents=True, exist_ok=True) 

708 ibllib.ephys.spikes.ks2_to_alf( 

709 ks2_dir, 

710 bin_path=ap_file.parent, 

711 out_path=probe_out_path, 

712 bin_file=ap_file, 

713 ampfactor=self._sample2v(ap_file), 

714 ) 

715 logfile = ks2_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log") 

716 if logfile.exists(): 

717 shutil.copyfile(logfile, probe_out_path.joinpath(f"_ibl_log.info_{self.SPIKE_SORTER_NAME}.log")) 

718 # Sync spike sorting with the main behaviour clock: the nidq for 3B+ and the main probe for 3A 

719 out, _ = ibllib.ephys.spikes.sync_spike_sorting(ap_file=ap_file, out_path=probe_out_path) 

720 out_files.extend(out) 

721 # Now compute the unit metrics 

722 self.compute_cell_qc(probe_out_path) 

723 # convert ks2_output into tar file and also register 

724 # Make this in case spike sorting is in old raw_ephys_data folders, for new 

725 # sessions it should already exist 

726 tar_dir = self.session_path.joinpath('spike_sorters', self.SPIKE_SORTER_NAME, label) 

727 tar_dir.mkdir(parents=True, exist_ok=True) 

728 out = ibllib.ephys.spikes.ks2_to_tar(ks2_dir, tar_dir, force=self.FORCE_RERUN) 

729 out_files.extend(out) 

730 # run waveform extraction 

731 _logger.info("Running waveform extraction") 

732 spikes = alfio.load_object(probe_out_path, 'spikes', attribute=['samples', 'clusters']) 

733 clusters = alfio.load_object(probe_out_path, 'clusters', attribute=['channels']) 

734 channels = alfio.load_object(probe_out_path, 'channels') 

735 extract_wfs_cbin( 

736 cbin_file=ap_file, 

737 output_dir=probe_out_path, 

738 spike_samples=spikes['samples'], 

739 spike_clusters=spikes['clusters'], 

740 spike_channels=clusters['channels'][spikes['clusters']], 

741 h=None, # todo the geometry needs to be set using the spikeglx object 

742 channel_labels=channels['labels'], 

743 max_wf=256, 

744 trough_offset=42, 

745 spike_length_samples=128, 

746 chunksize_samples=int(3000), 

747 n_jobs=None, 

748 ) 

749 if self.one: 

750 eid = self.one.path2eid(self.session_path, query_type='remote') 

751 ins = self.one.alyx.rest('insertions', 'list', session=eid, name=label, query_type='remote') 

752 if len(ins) != 0: 

753 _logger.info("Creating SpikeSorting QC plots") 

754 plot_task = ApPlots(ins[0]['id'], session_path=self.session_path, one=self.one) 

755 _ = plot_task.run() 

756 self.plot_tasks.append(plot_task) 

757 

758 plot_task = SpikeSortingPlots(ins[0]['id'], session_path=self.session_path, one=self.one) 

759 _ = plot_task.run(collection=str(probe_out_path.relative_to(self.session_path))) 

760 self.plot_tasks.append(plot_task) 

761 

762 resolved = ins[0].get('json', {'temp': 0}).get('extended_qc', {'temp': 0}). \ 

763 get('alignment_resolved', False) 

764 if resolved: 

765 chns = np.load(probe_out_path.joinpath('channels.localCoordinates.npy')) 

766 out = get_aligned_channels(ins[0], chns, one=self.one, save_dir=probe_out_path) 

767 out_files.extend(out) 

768 

769 return out_files 

770 

771 

772class EphysCellsQc(base_tasks.EphysTask, CellQCMixin): 

773 priority = 90 

774 job_size = 'small' 

775 

776 @property 

777 def signature(self): 

778 signature = { 1aghide

779 'input_files': [('spikes.times.npy', f'alf/{self.pname}*', True), 

780 ('spikes.clusters.npy', f'alf/{self.pname}*', True), 

781 ('spikes.amps.npy', f'alf/{self.pname}*', True), 

782 ('spikes.depths.npy', f'alf/{self.pname}*', True), 

783 ('clusters.channels.npy', f'alf/{self.pname}*', True)], 

784 'output_files': [('clusters.metrics.pqt', f'alf/{self.pname}*', True)] 

785 } 

786 return signature 1aghide

787 

788 def _run(self): 

789 """ 

790 Post spike-sorting quality control at the cluster level. 

791 Outputs a QC table in the clusters ALF object and labels corresponding probes in Alyx 

792 """ 

793 files_spikes = Path(self.session_path).joinpath('alf', self.pname).rglob('spikes.times.npy') 

794 folder_probes = [f.parent for f in files_spikes] 

795 out_files = [] 

796 for folder_probe in folder_probes: 

797 try: 

798 qc_file, df_units, drift = self.compute_cell_qc(folder_probe) 

799 out_files.append(qc_file) 

800 self._label_probe_qc(folder_probe, df_units, drift) 

801 except Exception: 

802 _logger.error(traceback.format_exc()) 

803 self.status = -1 

804 continue 

805 return out_files