Coverage for ibllib/pipes/ephys_tasks.py: 57%

456 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-10-11 11:13 +0100

1import logging 

2import traceback 

3from pathlib import Path 

4import subprocess 

5import re 

6import shutil 

7 

8import packaging.version 

9import numpy as np 

10import pandas as pd 

11import spikeglx 

12import neuropixel 

13from neurodsp.utils import rms 

14import one.alf.io as alfio 

15 

16from ibllib.misc import check_nvidia_driver 

17from ibllib.pipes import base_tasks 

18from ibllib.pipes.sync_tasks import SyncPulses 

19from ibllib.ephys import ephysqc, spikes 

20from ibllib.qc.alignment_qc import get_aligned_channels 

21from ibllib.plots.figures import LfpPlots, ApPlots, BadChannelsAp 

22from ibllib.plots.figures import SpikeSorting as SpikeSortingPlots 

23from ibllib.io.extractors.ephys_fpga import extract_sync 

24from ibllib.ephys.spikes import sync_probes 

25 

26 

27_logger = logging.getLogger("ibllib") 

28 

29 

30class EphysRegisterRaw(base_tasks.DynamicTask): 

31 """ 

32 Creates the probe insertions and uploads the probe descriptions file, also compresses the nidq files and uploads 

33 """ 

34 

35 priority = 100 

36 job_size = 'small' 

37 

38 @property 

39 def signature(self): 

40 signature = { 1v

41 'input_files': [], 

42 'output_files': [('probes.description.json', 'alf', True)] 

43 } 

44 return signature 1v

45 

46 def _run(self): 

47 

48 out_files = spikes.probes_description(self.session_path, self.one) 1v

49 

50 return out_files 1v

51 

52 

53class EphysSyncRegisterRaw(base_tasks.DynamicTask): 

54 """ 

55 Task to rename, compress and register raw daq data with .bin format collected using NIDAQ 

56 """ 

57 

58 priority = 90 

59 cpu = 2 

60 io_charge = 30 # this jobs reads raw ap files 

61 job_size = 'small' 

62 

63 @property 

64 def signature(self): 

65 signature = { 1efn

66 'input_files': [('*.*bin', self.sync_collection, True), 

67 ('*.meta', self.sync_collection, True), 

68 ('*.wiring.json', self.sync_collection, True)], 

69 'output_files': [('*nidq.cbin', self.sync_collection, True), 

70 ('*nidq.ch', self.sync_collection, True), 

71 ('*nidq.meta', self.sync_collection, True), 

72 ('*nidq.wiring.json', self.sync_collection, True)] 

73 } 

74 return signature 1efn

75 

76 def _run(self): 

77 

78 out_files = [] 1efn

79 

80 # Detect the wiring file 

81 wiring_file = next(self.session_path.joinpath(self.sync_collection).glob('*.wiring.json'), None) 1efn

82 if wiring_file is not None: 1efn

83 out_files.append(wiring_file) 1efn

84 

85 # Search for .bin files in the sync_collection folder 

86 files = list(self.session_path.joinpath(self.sync_collection).glob('*nidq.*bin')) 1efn

87 bin_file = files[0] if len(files) == 1 else None 1efn

88 

89 # If we don't have a .bin/ .cbin file anymore see if we can still find the .ch and .meta files 

90 if bin_file is None: 1efn

91 for ext in ['ch', 'meta']: 1n

92 files = list(self.session_path.joinpath(self.sync_collection).glob(f'*nidq.{ext}')) 1n

93 ext_file = files[0] if len(files) == 1 else None 1n

94 if ext_file is not None: 1n

95 out_files.append(ext_file) 1n

96 

97 return out_files if len(out_files) > 0 else None 1n

98 

99 # If we do find the .bin file, compress files (only if they haven't already been compressed) 

100 sr = spikeglx.Reader(bin_file) 1ef

101 if sr.is_mtscomp: 1ef

102 sr.close() 1e

103 cbin_file = bin_file 1e

104 assert cbin_file.suffix == '.cbin' 1e

105 else: 

106 cbin_file = sr.compress_file() 1f

107 sr.close() 1f

108 bin_file.unlink() 1f

109 

110 meta_file = cbin_file.with_suffix('.meta') 1ef

111 ch_file = cbin_file.with_suffix('.ch') 1ef

112 

113 out_files.append(cbin_file) 1ef

114 out_files.append(ch_file) 1ef

115 out_files.append(meta_file) 1ef

116 

117 return out_files 1ef

118 

119 

120class EphysCompressNP1(base_tasks.EphysTask): 

121 priority = 90 

122 cpu = 2 

123 io_charge = 100 # this jobs reads raw ap files 

124 job_size = 'small' 

125 

126 @property 

127 def signature(self): 

128 signature = { 1ldm

129 'input_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True), 

130 ('*ap.*bin', f'{self.device_collection}/{self.pname}', True), 

131 ('*lf.meta', f'{self.device_collection}/{self.pname}', True), 

132 ('*lf.*bin', f'{self.device_collection}/{self.pname}', True), 

133 ('*wiring.json', f'{self.device_collection}/{self.pname}', False)], 

134 'output_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True), 

135 ('*ap.cbin', f'{self.device_collection}/{self.pname}', True), 

136 ('*ap.ch', f'{self.device_collection}/{self.pname}', True), 

137 ('*lf.meta', f'{self.device_collection}/{self.pname}', True), 

138 ('*lf.cbin', f'{self.device_collection}/{self.pname}', True), 

139 ('*lf.ch', f'{self.device_collection}/{self.pname}', True), 

140 ('*wiring.json', f'{self.device_collection}/{self.pname}', False)] 

141 } 

142 return signature 1ldm

143 

144 def _run(self): 

145 

146 out_files = [] 1ldm

147 

148 # Detect and upload the wiring file 

149 wiring_file = next(self.session_path.joinpath(self.device_collection, self.pname).glob('*wiring.json'), None) 1ldm

150 if wiring_file is not None: 1ldm

151 out_files.append(wiring_file) 1ldm

152 

153 ephys_files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname)) 1ldm

154 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="ch") 1ldm

155 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="meta") 1ldm

156 

157 for ef in ephys_files: 1ldm

158 for typ in ["ap", "lf"]: 1ldm

159 bin_file = ef.get(typ) 1ldm

160 if not bin_file: 1ldm

161 continue 

162 if bin_file.suffix.find("bin") == 1: 1ldm

163 with spikeglx.Reader(bin_file) as sr: 1d

164 if sr.is_mtscomp: 1d

165 out_files.append(bin_file) 

166 else: 

167 _logger.info(f"Compressing binary file {bin_file}") 1d

168 cbin_file = sr.compress_file() 1d

169 sr.close() 1d

170 bin_file.unlink() 1d

171 out_files.append(cbin_file) 1d

172 out_files.append(cbin_file.with_suffix('.ch')) 1d

173 else: 

174 out_files.append(bin_file) 1ldm

175 

176 return out_files 1ldm

177 

178 

179class EphysCompressNP21(base_tasks.EphysTask): 

180 priority = 90 

181 cpu = 2 

182 io_charge = 100 # this jobs reads raw ap files 

183 job_size = 'large' 

184 

185 @property 

186 def signature(self): 

187 signature = { 1ghi

188 'input_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True), 

189 ('*ap.*bin', f'{self.device_collection}/{self.pname}', True), 

190 ('*wiring.json', f'{self.device_collection}/{self.pname}', False)], 

191 'output_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True), 

192 ('*ap.cbin', f'{self.device_collection}/{self.pname}', True), 

193 ('*ap.ch', f'{self.device_collection}/{self.pname}', True), 

194 ('*lf.meta', f'{self.device_collection}/{self.pname}', True), 

195 ('*lf.cbin', f'{self.device_collection}/{self.pname}', True), 

196 ('*lf.ch', f'{self.device_collection}/{self.pname}', True), 

197 ('*wiring.json', f'{self.device_collection}/{self.pname}', False)] 

198 } 

199 return signature 1ghi

200 

201 def _run(self): 

202 

203 out_files = [] 1ghi

204 # Detect wiring files 

205 wiring_file = next(self.session_path.joinpath(self.device_collection, self.pname).glob('*wiring.json'), None) 1ghi

206 if wiring_file is not None: 1ghi

207 out_files.append(wiring_file) 1ghi

208 

209 ephys_files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname)) 1ghi

210 if len(ephys_files) > 0: 1ghi

211 bin_file = ephys_files[0].get('ap', None) 1gh

212 

213 # This is the case where no ap.bin/.cbin file exists 

214 if len(ephys_files) == 0 or not bin_file: 1ghi

215 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="ch") 1i

216 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="meta") 1i

217 for ef in ephys_files: 1i

218 for typ in ["ap", "lf"]: 1i

219 bin_file = ef.get(typ) 1i

220 if bin_file: 1i

221 out_files.append(bin_file) 1i

222 

223 return out_files 1i

224 

225 # If the ap.bin / .cbin file does exists instantiate the NP2converter 

226 np_conv = neuropixel.NP2Converter(bin_file, compress=True) 1gh

227 np_conv_status = np_conv.process() 1gh

228 np_conv_files = np_conv.get_processed_files_NP21() 1gh

229 np_conv.sr.close() 1gh

230 

231 # Status = 1 - successfully complete 

232 if np_conv_status == 1: # This is the status that it has completed successfully 1gh

233 out_files += np_conv_files 1gh

234 return out_files 1gh

235 # Status = 0 - Already processed 

236 elif np_conv_status == 0: 

237 ephys_files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname)) 

238 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="ch") 

239 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname), ext="meta") 

240 for ef in ephys_files: 

241 for typ in ["ap", "lf"]: 

242 bin_file = ef.get(typ) 

243 if bin_file and bin_file.suffix != '.bin': 

244 out_files.append(bin_file) 

245 

246 return out_files 

247 

248 else: 

249 return 

250 

251 

252class EphysCompressNP24(base_tasks.EphysTask): 

253 """ 

254 Compresses NP2.4 data by splitting into N binary files, corresponding to N shanks 

255 :param pname: a probe name string 

256 :param device_collection: the collection containing the probes (usually 'raw_ephys_data') 

257 :param nshanks: number of shanks used (usually 4 but it may be less depending on electrode map), optional 

258 """ 

259 

260 priority = 90 

261 cpu = 2 

262 io_charge = 100 # this jobs reads raw ap files 

263 job_size = 'large' 

264 

265 def __init__(self, session_path, *args, pname=None, device_collection='raw_ephys_data', nshanks=None, **kwargs): 

266 assert pname, "pname is a required argument" 1opcb

267 if nshanks is None: 1opcb

268 meta_file = next(session_path.joinpath(device_collection, pname).glob('*ap.meta')) 1b

269 nshanks = spikeglx._get_nshanks_from_meta(spikeglx.read_meta_data(meta_file)) 1b

270 assert nshanks > 1 1opcb

271 super(EphysCompressNP24, self).__init__( 1opcb

272 session_path, *args, pname=pname, device_collection=device_collection, nshanks=nshanks, **kwargs) 

273 

274 @property 

275 def signature(self): 

276 

277 signature = { 1cb

278 'input_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True), 

279 ('*ap.*bin', f'{self.device_collection}/{self.pname}', True), 

280 ('*wiring.json', f'{self.device_collection}/{self.pname}', False)], 

281 'output_files': [('*ap.meta', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] + 

282 [('*ap.cbin', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] + 

283 [('*ap.ch', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] + 

284 [('*lf.meta', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] + 

285 [('*lf.cbin', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] + 

286 [('*lf.ch', f'{self.device_collection}/{self.pname}{pext}', True) for pext in self.pextra] + 

287 [('*wiring.json', f'{self.device_collection}/{self.pname}{pext}', False) for pext in self.pextra] 

288 } 

289 return signature 1cb

290 

291 def _run(self, delete_original=True): 

292 

293 # Do we need the ability to register the files once it already been processed and original file deleted? 

294 

295 files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname)) 1cb

296 assert len(files) == 1 1cb

297 bin_file = files[0].get('ap', None) 1cb

298 

299 np_conv = neuropixel.NP2Converter(bin_file, post_check=True, compress=True, delete_original=delete_original) 1cb

300 np_conv_status = np_conv.process() 1cb

301 out_files = np_conv.get_processed_files_NP24() 1cb

302 np_conv.sr.close() 1cb

303 

304 if np_conv_status == 1: 1cb

305 wiring_file = next(self.session_path.joinpath(self.device_collection, self.pname).glob('*wiring.json'), None) 1cb

306 if wiring_file is not None: 1cb

307 # copy wiring file to each sub probe directory and add to output files 

308 for pext in self.pextra: 1c

309 new_wiring_file = self.session_path.joinpath(self.device_collection, f'{self.pname}{pext}', wiring_file.name) 1c

310 shutil.copyfile(wiring_file, new_wiring_file) 1c

311 out_files.append(new_wiring_file) 1c

312 return out_files 1cb

313 elif np_conv_status == 0: 1c

314 for pext in self.pextra: 1c

315 ephys_files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, f'{self.pname}{pext}')) 1c

316 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, 1c

317 f'{self.pname}{pext}'), ext="ch") 

318 ephys_files += spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, 1c

319 f'{self.pname}{pext}'), ext="meta") 

320 for ef in ephys_files: 1c

321 for typ in ["ap", "lf"]: 1c

322 bin_file = ef.get(typ) 1c

323 if bin_file and bin_file.suffix != '.bin': 1c

324 out_files.append(bin_file) 1c

325 

326 wiring_file = next(self.session_path.joinpath(self.device_collection, 1c

327 f'{self.pname}{pext}').glob('*wiring.json'), None) 

328 if wiring_file is None: 1c

329 # See if we have the original wiring file 

330 orig_wiring_file = next(self.session_path.joinpath(self.device_collection, 

331 self.pname).glob('*wiring.json'), None) 

332 if orig_wiring_file is not None: 

333 # copy wiring file to sub probe directory and add to output files 

334 new_wiring_file = self.session_path.joinpath(self.device_collection, f'{self.pname}{pext}', 

335 orig_wiring_file.name) 

336 shutil.copyfile(orig_wiring_file, new_wiring_file) 

337 out_files.append(new_wiring_file) 

338 else: 

339 out_files.append(wiring_file) 1c

340 

341 return out_files 1c

342 else: 

343 return 

344 

345 

346class EphysSyncPulses(SyncPulses): 

347 

348 priority = 90 

349 cpu = 2 

350 io_charge = 30 # this jobs reads raw ap files 

351 job_size = 'small' 

352 

353 @property 

354 def signature(self): 

355 signature = { 1w

356 'input_files': [('*nidq.cbin', self.sync_collection, True), 

357 ('*nidq.ch', self.sync_collection, False), 

358 ('*nidq.meta', self.sync_collection, True), 

359 ('*nidq.wiring.json', self.sync_collection, True)], 

360 'output_files': [('_spikeglx_sync.times.npy', self.sync_collection, True), 

361 ('_spikeglx_sync.polarities.npy', self.sync_collection, True), 

362 ('_spikeglx_sync.channels.npy', self.sync_collection, True)] 

363 } 

364 

365 return signature 1w

366 

367 

368class EphysPulses(base_tasks.EphysTask): 

369 """ 

370 Extract Pulses from raw electrophysiology data into numpy arrays 

371 Perform the probes synchronisation with nidq (3B) or main probe (3A) 

372 First the job extract the sync pulses from the synchronisation task in all probes, and then perform the 

373 synchronisation with the nidq 

374 

375 :param pname: a list of probes names or a single probe name string 

376 :param device_collection: the collection containing the probes (usually 'raw_ephys_data') 

377 :param sync_collection: the collection containing the synchronisation device - nidq (usually 'raw_ephys_data') 

378 """ 

379 priority = 90 

380 cpu = 2 

381 io_charge = 30 # this jobs reads raw ap files 

382 job_size = 'small' 

383 

384 def __init__(self, *args, **kwargs): 

385 super(EphysPulses, self).__init__(*args, **kwargs) 1arstopbjku

386 assert self.device_collection, "device_collection is a required argument" 1arstopbjku

387 assert self.sync_collection, "sync_collection is a required argument" 1arstopbjku

388 self.pname = [self.pname] if isinstance(self.pname, str) else self.pname 1arstopbjku

389 assert isinstance(self.pname, list), 'pname task argument should be a list or a string' 1arstopbjku

390 

391 @property 

392 def signature(self): 

393 signature = { 1bjk

394 'input_files': [('*ap.meta', f'{self.device_collection}/{pname}', True) for pname in self.pname] + 

395 [('*ap.cbin', f'{self.device_collection}/{pname}', True) for pname in self.pname] + 

396 [('*ap.ch', f'{self.device_collection}/{pname}', True) for pname in self.pname] + 

397 [('*ap.wiring.json', f'{self.device_collection}/{pname}', False) for pname in self.pname] + 

398 [('_spikeglx_sync.times.npy', self.sync_collection, True), 

399 ('_spikeglx_sync.polarities.npy', self.sync_collection, True), 

400 ('_spikeglx_sync.channels.npy', self.sync_collection, True)], 

401 'output_files': [(f'_spikeglx_sync.times.{pname}.npy', f'{self.device_collection}/{pname}', True) 

402 for pname in self.pname] + 

403 [(f'_spikeglx_sync.polarities.{pname}.npy', f'{self.device_collection}/{pname}', True) 

404 for pname in self.pname] + 

405 [(f'_spikeglx_sync.channels.{pname}.npy', f'{self.device_collection}/{pname}', True) 

406 for pname in self.pname] + 

407 [('*sync.npy', f'{self.device_collection}/{pname}', True) for pname in 

408 self.pname] + 

409 [('*timestamps.npy', f'{self.device_collection}/{pname}', True) for pname in 

410 self.pname] 

411 } 

412 

413 return signature 1bjk

414 

415 def _run(self, overwrite=False): 

416 

417 out_files = [] 1bjk

418 for probe in self.pname: 1bjk

419 files = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, probe)) 1bjk

420 assert len(files) == 1 # will error if the session is split 1bjk

421 bin_file = files[0].get('ap', None) 1bjk

422 if not bin_file: 1bjk

423 return [] 

424 _, out = extract_sync(self.session_path, ephys_files=files, overwrite=overwrite) 1bjk

425 out_files += out 1bjk

426 

427 status, sync_files = sync_probes.sync(self.session_path, probe_names=self.pname) 1bjk

428 

429 return out_files + sync_files 1bjk

430 

431 

432class RawEphysQC(base_tasks.EphysTask): 

433 

434 cpu = 2 

435 io_charge = 30 # this jobs reads raw ap files 

436 priority = 10 # a lot of jobs depend on this one 

437 job_size = 'small' 

438 

439 @property 

440 def signature(self): 

441 signature = { 

442 'input_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True), 

443 ('*lf.meta', f'{self.device_collection}/{self.pname}', True), 

444 ('*lf.ch', f'{self.device_collection}/{self.pname}', False), 

445 ('*lf.*bin', f'{self.device_collection}/{self.pname}', False)], 

446 'output_files': [('_iblqc_ephysChannels.apRMS.npy', f'{self.device_collection}/{self.pname}', True), 

447 ('_iblqc_ephysChannels.rawSpikeRates.npy', f'{self.device_collection}/{self.pname}', True), 

448 ('_iblqc_ephysChannels.labels.npy', f'{self.device_collection}/{self.pname}', True), 

449 ('_iblqc_ephysSpectralDensityLF.freqs.npy', f'{self.device_collection}/{self.pname}', True), 

450 ('_iblqc_ephysSpectralDensityLF.power.npy', f'{self.device_collection}/{self.pname}', True), 

451 ('_iblqc_ephysSpectralDensityAP.freqs.npy', f'{self.device_collection}/{self.pname}', True), 

452 ('_iblqc_ephysSpectralDensityAP.power.npy', f'{self.device_collection}/{self.pname}', True), 

453 ('_iblqc_ephysTimeRmsLF.rms.npy', f'{self.device_collection}/{self.pname}', True), 

454 ('_iblqc_ephysTimeRmsLF.timestamps.npy', f'{self.device_collection}/{self.pname}', True)] 

455 } 

456 return signature 

457 

458 # TODO make sure this works with NP2 probes (at the moment not sure it will due to raiseError mapping) 

459 def _run(self, overwrite=False): 

460 

461 eid = self.one.path2eid(self.session_path) 

462 probe = self.one.alyx.rest('insertions', 'list', session=eid, name=self.pname) 

463 

464 # We expect there to only be one probe 

465 if len(probe) != 1: 

466 _logger.warning(f"{self.pname} for {eid} not found") # Should we create it? 

467 self.status = -1 

468 return 

469 

470 pid = probe[0]['id'] 

471 qc_files = [] 

472 _logger.info(f"\nRunning QC for probe insertion {self.pname}") 

473 try: 

474 eqc = ephysqc.EphysQC(pid, session_path=self.session_path, one=self.one) 

475 qc_files.extend(eqc.run(update=True, overwrite=overwrite)) 

476 _logger.info("Creating LFP QC plots") 

477 plot_task = LfpPlots(pid, session_path=self.session_path, one=self.one) 

478 _ = plot_task.run() 

479 self.plot_tasks.append(plot_task) 

480 plot_task = BadChannelsAp(pid, session_path=self.session_path, one=self.one) 

481 _ = plot_task.run() 

482 self.plot_tasks.append(plot_task) 

483 

484 except AssertionError: 

485 _logger.error(traceback.format_exc()) 

486 self.status = -1 

487 

488 return qc_files 

489 

490 

491class SpikeSorting(base_tasks.EphysTask): 

492 """ 

493 Pykilosort 2.5 pipeline 

494 """ 

495 gpu = 1 

496 io_charge = 100 # this jobs reads raw ap files 

497 priority = 60 

498 job_size = 'large' 

499 force = True 

500 

501 SHELL_SCRIPT = Path.home().joinpath( 

502 "Documents/PYTHON/iblscripts/deploy/serverpc/kilosort2/run_pykilosort.sh" 

503 ) 

504 SPIKE_SORTER_NAME = 'pykilosort' 

505 PYKILOSORT_REPO = Path.home().joinpath('Documents/PYTHON/SPIKE_SORTING/pykilosort') 

506 

507 @property 

508 def signature(self): 

509 signature = { 

510 'input_files': [('*ap.meta', f'{self.device_collection}/{self.pname}', True), 

511 ('*ap.cbin', f'{self.device_collection}/{self.pname}', True), 

512 ('*ap.ch', f'{self.device_collection}/{self.pname}', True), 

513 ('*sync.npy', f'{self.device_collection}/{self.pname}', True)], 

514 'output_files': [('spike_sorting_pykilosort.log', f'spike_sorters/pykilosort/{self.pname}', True), 

515 ('_iblqc_ephysTimeRmsAP.rms.npy', f'{self.device_collection}/{self.pname}', True), 

516 ('_iblqc_ephysTimeRmsAP.timestamps.npy', f'{self.device_collection}/{self.pname}', True)] 

517 } 

518 return signature 

519 

520 @staticmethod 

521 def _sample2v(ap_file): 

522 md = spikeglx.read_meta_data(ap_file.with_suffix(".meta")) 

523 s2v = spikeglx._conversion_sample2v_from_meta(md) 

524 return s2v["ap"][0] 

525 

526 @staticmethod 

527 def _fetch_pykilosort_version(repo_path): 

528 init_file = Path(repo_path).joinpath('pykilosort', '__init__.py') 

529 version = SpikeSorting._fetch_ks2_commit_hash(repo_path) # default 

530 try: 

531 with open(init_file) as fid: 

532 lines = fid.readlines() 

533 for line in lines: 

534 if line.startswith("__version__ = "): 

535 version = line.split('=')[-1].strip().replace('"', '').replace("'", '') 

536 except Exception: 

537 pass 

538 return f"pykilosort_{version}" 

539 

540 @staticmethod 

541 def _fetch_pykilosort_run_version(log_file): 

542 """ 

543 Parse the following line (2 formats depending on version) from the log files to get the version 

544 '\x1b[0m15:39:37.919 [I] ibl:90 Starting Pykilosort version ibl_1.2.1, output in gnagga^[[0m\n' 

545 '\x1b[0m15:39:37.919 [I] ibl:90 Starting Pykilosort version ibl_1.3.0^[[0m\n' 

546 """ 

547 with open(log_file) as fid: 1q

548 line = fid.readline() 1q

549 version = re.search('version (.*), output', line) 1q

550 version = version or re.search('version (.*)', line) # old versions have output, new have a version line 1q

551 version = re.sub('\\^[[0-9]+m', '', version.group(1)) # removes the coloring tags 1q

552 return f"pykilosort_{version}" 1q

553 

554 @staticmethod 

555 def _fetch_ks2_commit_hash(repo_path): 

556 command2run = f"git --git-dir {repo_path}/.git rev-parse --verify HEAD" 

557 process = subprocess.Popen( 

558 command2run, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE 

559 ) 

560 info, error = process.communicate() 

561 if process.returncode != 0: 

562 _logger.error( 

563 f"Can't fetch pykilsort commit hash, will still attempt to run \n" 

564 f"Error: {error.decode('utf-8')}" 

565 ) 

566 return "" 

567 return info.decode("utf-8").strip() 

568 

569 def _run_pykilosort(self, ap_file): 

570 """ 

571 Runs the ks2 matlab spike sorting for one probe dataset 

572 the raw spike sorting output is in session_path/spike_sorters/{self.SPIKE_SORTER_NAME}/probeXX folder 

573 (discontinued support for old spike sortings in the probe folder <1.5.5) 

574 :return: path of the folder containing ks2 spike sorting output 

575 """ 

576 self.version = self._fetch_pykilosort_version(self.PYKILOSORT_REPO) 

577 label = ap_file.parts[-2] # this is usually the probe name 

578 sorter_dir = self.session_path.joinpath("spike_sorters", self.SPIKE_SORTER_NAME, label) 

579 self.FORCE_RERUN = False 

580 if not self.FORCE_RERUN: 

581 log_file = sorter_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log") 

582 if log_file.exists(): 

583 run_version = self._fetch_pykilosort_run_version(log_file) 

584 if packaging.version.parse(run_version) > packaging.version.parse('pykilosort_ibl_1.1.0'): 

585 _logger.info(f"Already ran: spike_sorting_{self.SPIKE_SORTER_NAME}.log" 

586 f" found in {sorter_dir}, skipping.") 

587 return sorter_dir 

588 else: 

589 self.FORCE_RERUN = True 

590 # get the scratch drive from the shell script 

591 with open(self.SHELL_SCRIPT) as fid: 

592 lines = fid.readlines() 

593 line = [line for line in lines if line.startswith("SCRATCH_DRIVE=")][0] 

594 m = re.search(r"\=(.*?)(\#|\n)", line)[0] 

595 scratch_drive = Path(m[1:-1].strip()) 

596 assert scratch_drive.exists() 

597 # clean up and create directory, this also checks write permissions 

598 # temp_dir has the following shape: pykilosort/ZM_3003_2020-07-29_001_probe00 

599 # first makes sure the tmp dir is clean 

600 shutil.rmtree(scratch_drive.joinpath(self.SPIKE_SORTER_NAME), ignore_errors=True) 

601 temp_dir = scratch_drive.joinpath( 

602 self.SPIKE_SORTER_NAME, "_".join(list(self.session_path.parts[-3:]) + [label]) 

603 ) 

604 if temp_dir.exists(): # hmmm this has to be decided, we may want to restart ? 

605 # But failed sessions may then clog the scratch dir and have users run out of space 

606 shutil.rmtree(temp_dir, ignore_errors=True) 

607 log_file = temp_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log") 

608 _logger.info(f"job progress command: tail -f {temp_dir} *.log") 

609 temp_dir.mkdir(parents=True, exist_ok=True) 

610 check_nvidia_driver() 

611 command2run = f"{self.SHELL_SCRIPT} {ap_file} {temp_dir}" 

612 _logger.info(command2run) 

613 process = subprocess.Popen( 

614 command2run, 

615 shell=True, 

616 stdout=subprocess.PIPE, 

617 stderr=subprocess.PIPE, 

618 executable="/bin/bash", 

619 ) 

620 info, error = process.communicate() 

621 info_str = info.decode("utf-8").strip() 

622 _logger.info(info_str) 

623 if process.returncode != 0: 

624 error_str = error.decode("utf-8").strip() 

625 # try and get the kilosort log if any 

626 for log_file in temp_dir.rglob('*_kilosort.log'): 

627 with open(log_file) as fid: 

628 log = fid.read() 

629 _logger.error(log) 

630 break 

631 raise RuntimeError(f"{self.SPIKE_SORTER_NAME} {info_str}, {error_str}") 

632 

633 shutil.copytree(temp_dir.joinpath('output'), sorter_dir, dirs_exist_ok=True) 

634 shutil.rmtree(temp_dir, ignore_errors=True) 

635 

636 return sorter_dir 

637 

638 def _run(self): 

639 """ 

640 Multiple steps. For each probe: 

641 - Runs ks2 (skips if it already ran) 

642 - synchronize the spike sorting 

643 - output the probe description files 

644 :param probes: (list of str) if provided, will only run spike sorting for specified probe names 

645 :return: list of files to be registered on database 

646 """ 

647 efiles = spikeglx.glob_ephys_files(self.session_path.joinpath(self.device_collection, self.pname)) 

648 ap_files = [(ef.get("ap"), ef.get("label")) for ef in efiles if "ap" in ef.keys()] 

649 out_files = [] 

650 for ap_file, label in ap_files: 

651 try: 

652 # if the file is part of a sequence, handles the run accordingly 

653 sequence_file = ap_file.parent.joinpath(ap_file.stem.replace('ap', 'sequence.json')) 

654 # temporary just skips for now 

655 if sequence_file.exists(): 

656 continue 

657 ks2_dir = self._run_pykilosort(ap_file) # runs ks2, skips if it already ran 

658 probe_out_path = self.session_path.joinpath("alf", label, self.SPIKE_SORTER_NAME) 

659 shutil.rmtree(probe_out_path, ignore_errors=True) 

660 probe_out_path.mkdir(parents=True, exist_ok=True) 

661 spikes.ks2_to_alf( 

662 ks2_dir, 

663 bin_path=ap_file.parent, 

664 out_path=probe_out_path, 

665 bin_file=ap_file, 

666 ampfactor=self._sample2v(ap_file), 

667 ) 

668 logfile = ks2_dir.joinpath(f"spike_sorting_{self.SPIKE_SORTER_NAME}.log") 

669 if logfile.exists(): 

670 shutil.copyfile(logfile, probe_out_path.joinpath(f"_ibl_log.info_{self.SPIKE_SORTER_NAME}.log")) 

671 # For now leave the syncing here 

672 out, _ = spikes.sync_spike_sorting(ap_file=ap_file, out_path=probe_out_path) 

673 out_files.extend(out) 

674 # convert ks2_output into tar file and also register 

675 # Make this in case spike sorting is in old raw_ephys_data folders, for new 

676 # sessions it should already exist 

677 tar_dir = self.session_path.joinpath( 

678 'spike_sorters', self.SPIKE_SORTER_NAME, label) 

679 tar_dir.mkdir(parents=True, exist_ok=True) 

680 out = spikes.ks2_to_tar(ks2_dir, tar_dir, force=self.FORCE_RERUN) 

681 out_files.extend(out) 

682 

683 if self.one: 

684 eid = self.one.path2eid(self.session_path, query_type='remote') 

685 ins = self.one.alyx.rest('insertions', 'list', session=eid, name=label, query_type='remote') 

686 if len(ins) != 0: 

687 _logger.info("Creating SpikeSorting QC plots") 

688 plot_task = ApPlots(ins[0]['id'], session_path=self.session_path, one=self.one) 

689 _ = plot_task.run() 

690 self.plot_tasks.append(plot_task) 

691 

692 plot_task = SpikeSortingPlots(ins[0]['id'], session_path=self.session_path, one=self.one) 

693 _ = plot_task.run(collection=str(probe_out_path.relative_to(self.session_path))) 

694 self.plot_tasks.append(plot_task) 

695 

696 resolved = ins[0].get('json', {'temp': 0}).get('extended_qc', {'temp': 0}). \ 

697 get('alignment_resolved', False) 

698 if resolved: 

699 chns = np.load(probe_out_path.joinpath('channels.localCoordinates.npy')) 

700 out = get_aligned_channels(ins[0], chns, one=self.one, save_dir=probe_out_path) 

701 out_files.extend(out) 

702 

703 except BaseException: 

704 _logger.error(traceback.format_exc()) 

705 self.status = -1 

706 continue 

707 

708 return out_files 

709 

710 

711class EphysCellsQc(base_tasks.EphysTask): 

712 priority = 90 

713 job_size = 'small' 

714 

715 @property 

716 def signature(self): 

717 signature = { 

718 'input_files': [('spikes.times.npy', f'alf/{self.pname}*', True), 

719 ('spikes.clusters.npy', f'alf/{self.pname}*', True), 

720 ('spikes.amps.npy', f'alf/{self.pname}*', True), 

721 ('spikes.depths.npy', f'alf/{self.pname}*', True), 

722 ('clusters.channels.npy', f'alf/{self.pname}*', True)], 

723 'output_files': [('clusters.metrics.pqt', f'alf/{self.pname}*', True)] 

724 } 

725 return signature 

726 

727 def _compute_cell_qc(self, folder_probe): 

728 """ 

729 Computes the cell QC given an extracted probe alf path 

730 :param folder_probe: folder 

731 :return: 

732 """ 

733 # compute the straight qc 

734 _logger.info(f"Computing cluster qc for {folder_probe}") 

735 spikes = alfio.load_object(folder_probe, 'spikes') 

736 clusters = alfio.load_object(folder_probe, 'clusters') 

737 df_units, drift = ephysqc.spike_sorting_metrics( 

738 spikes.times, spikes.clusters, spikes.amps, spikes.depths, 

739 cluster_ids=np.arange(clusters.channels.size)) 

740 # if the ks2 labels file exist, load them and add the column 

741 file_labels = folder_probe.joinpath('cluster_KSLabel.tsv') 

742 if file_labels.exists(): 

743 ks2_labels = pd.read_csv(file_labels, sep='\t') 

744 ks2_labels.rename(columns={'KSLabel': 'ks2_label'}, inplace=True) 

745 df_units = pd.concat( 

746 [df_units, ks2_labels['ks2_label'].reindex(df_units.index)], axis=1) 

747 # save as parquet file 

748 df_units.to_parquet(folder_probe.joinpath("clusters.metrics.pqt")) 

749 return folder_probe.joinpath("clusters.metrics.pqt"), df_units, drift 

750 

751 def _label_probe_qc(self, folder_probe, df_units, drift): 

752 """ 

753 Labels the json field of the alyx corresponding probe insertion 

754 :param folder_probe: 

755 :param df_units: 

756 :param drift: 

757 :return: 

758 """ 

759 eid = self.one.path2eid(self.session_path, query_type='remote') 

760 pdict = self.one.alyx.rest('insertions', 'list', session=eid, name=self.pname, no_cache=True) 

761 if len(pdict) != 1: 

762 _logger.warning(f'No probe found for probe name: {self.pname}') 

763 return 

764 isok = df_units['label'] == 1 

765 qcdict = {'n_units': int(df_units.shape[0]), 

766 'n_units_qc_pass': int(np.sum(isok)), 

767 'firing_rate_max': np.max(df_units['firing_rate'][isok]), 

768 'firing_rate_median': np.median(df_units['firing_rate'][isok]), 

769 'amplitude_max_uV': np.max(df_units['amp_max'][isok]) * 1e6, 

770 'amplitude_median_uV': np.max(df_units['amp_median'][isok]) * 1e6, 

771 'drift_rms_um': rms(drift['drift_um']), 

772 } 

773 file_wm = folder_probe.joinpath('_kilosort_whitening.matrix.npy') 

774 if file_wm.exists(): 

775 wm = np.load(file_wm) 

776 qcdict['whitening_matrix_conditioning'] = np.linalg.cond(wm) 

777 # groom qc dict (this function will eventually go directly into the json field update) 

778 for k in qcdict: 

779 if isinstance(qcdict[k], np.int64): 

780 qcdict[k] = int(qcdict[k]) 

781 elif isinstance(qcdict[k], float): 

782 qcdict[k] = np.round(qcdict[k], 2) 

783 self.one.alyx.json_field_update("insertions", pdict[0]["id"], "json", qcdict) 

784 

785 def _run(self): 

786 """ 

787 Post spike-sorting quality control at the cluster level. 

788 Outputs a QC table in the clusters ALF object and labels corresponding probes in Alyx 

789 """ 

790 files_spikes = Path(self.session_path).joinpath('alf', self.pname).rglob('spikes.times.npy') 

791 folder_probes = [f.parent for f in files_spikes] 

792 out_files = [] 

793 for folder_probe in folder_probes: 

794 try: 

795 qc_file, df_units, drift = self._compute_cell_qc(folder_probe) 

796 out_files.append(qc_file) 

797 self._label_probe_qc(folder_probe, df_units, drift) 

798 except Exception: 

799 _logger.error(traceback.format_exc()) 

800 self.status = -1 

801 continue 

802 return out_files