Coverage for ibllib/pipes/ephys_alignment.py: 64%

215 statements  

« prev     ^ index     » next       coverage.py v7.5.4, created at 2024-07-08 17:16 +0100

1import scipy 

2import numpy as np 

3import ibllib.pipes.histology as histology 

4import iblatlas.atlas as atlas 

5TIP_SIZE_UM = 200 

6 

7 

8def _cumulative_distance(xyz): 

9 return np.cumsum(np.r_[0, np.sqrt(np.sum(np.diff(xyz, axis=0) ** 2, axis=1))]) 1abchf

10 

11 

12class EphysAlignment: 

13 

14 def __init__(self, xyz_picks, chn_depths=None, track_prev=None, 

15 feature_prev=None, brain_atlas=None, speedy=False): 

16 

17 if not brain_atlas: 1abc

18 self.brain_atlas = atlas.AllenAtlas(25) 

19 else: 

20 self.brain_atlas = brain_atlas 1abc

21 

22 self.xyz_track, self.track_extent = self.get_insertion_track(xyz_picks, speedy=speedy) 1abc

23 

24 self.chn_depths = chn_depths 1abc

25 if np.any(track_prev): 1abc

26 self.track_init = track_prev 1abc

27 self.feature_init = feature_prev 1abc

28 else: 

29 start_lims = 6000 / 1e6 

30 self.track_init = np.array([-1 * start_lims, start_lims]) 

31 self.feature_init = np.array([-1 * start_lims, start_lims]) 

32 

33 self.sampling_trk = np.arange(self.track_extent[0], 1abc

34 self.track_extent[-1] - 10 * 1e-6, 10 * 1e-6) 

35 self.xyz_samples = histology.interpolate_along_track(self.xyz_track, 1abc

36 self.sampling_trk - 

37 self.sampling_trk[0]) 

38 # ensure none of the track is outside the y or x lim of atlas 

39 xlim = np.bitwise_and(self.xyz_samples[:, 0] > self.brain_atlas.bc.xlim[0], 1abc

40 self.xyz_samples[:, 0] < self.brain_atlas.bc.xlim[1]) 

41 ylim = np.bitwise_and(self.xyz_samples[:, 1] < self.brain_atlas.bc.ylim[0], 1abc

42 self.xyz_samples[:, 1] > self.brain_atlas.bc.ylim[1]) 

43 rem = np.bitwise_and(xlim, ylim) 1abc

44 self.xyz_samples = self.xyz_samples[rem] 1abc

45 

46 self.region, self.region_label, self.region_colour, self.region_id\ 1abc

47 = self.get_histology_regions(self.xyz_samples, self.sampling_trk, self.brain_atlas) 

48 

49 def get_insertion_track(self, xyz_picks, speedy=False): 

50 """ 

51 Extends probe trajectory from bottom of brain to upper bound of allen atlas 

52 :param xyz_picks: points defining probe trajectory in 3D space (xyz) 

53 :type xyz_picks: np.array((n, 3)) - n: no. of unique points 

54 :return xyz_track: points defining extended trajectory in 3D space (xyz) 

55 :type xyz_track: np.array((n+2, 3)) 

56 :return track_extent: cumulative distance between two extremes of xyz_track (bottom of 

57 brain and top of atlas) offset by distance to probe tip 

58 :type track_extent: np.array((2)) 

59 """ 

60 # Use the first and last quarter of xyz_picks to estimate the trajectory beyond xyz_picks 

61 n_picks = np.max([4, round(xyz_picks.shape[0] / 4)]) 1abc

62 traj_entry = atlas.Trajectory.fit(xyz_picks[:n_picks, :]) 1abc

63 traj_exit = atlas.Trajectory.fit(xyz_picks[-1 * n_picks:, :]) 1abc

64 

65 # Force the entry to be on the upper z lim of the atlas to account for cases where channels 

66 # may be located above the surface of the brain 

67 entry = (traj_entry.eval_z(self.brain_atlas.bc.zlim))[0, :] 1abc

68 if speedy: 1abc

69 exit = (traj_exit.eval_z(self.brain_atlas.bc.zlim))[1, :] 

70 else: 

71 exit = atlas.Insertion.get_brain_exit(traj_exit, self.brain_atlas) 1abc

72 # The exit is just below the bottom surfacce of the brain 

73 exit[2] = exit[2] - 200 / 1e6 1abc

74 

75 # Catch cases where the exit 

76 if any(np.isnan(exit)): 1abc

77 exit = (traj_exit.eval_z(self.brain_atlas.bc.zlim))[1, :] 

78 xyz_track = np.r_[exit[np.newaxis, :], xyz_picks, entry[np.newaxis, :]] 1abc

79 # Sort so that most ventral coordinate is first 

80 xyz_track = xyz_track[np.argsort(xyz_track[:, 2]), :] 1abc

81 

82 # Compute distance to first electrode from bottom coordinate 

83 tip_distance = _cumulative_distance(xyz_track)[1] + TIP_SIZE_UM / 1e6 1abc

84 track_length = _cumulative_distance(xyz_track)[-1] 1abc

85 track_extent = np.array([0, track_length]) - tip_distance 1abc

86 

87 return xyz_track, track_extent 1abc

88 

89 def get_track_and_feature(self): 

90 """ 

91 Return track, feature and xyz_track variables 

92 """ 

93 return self.feature_init, self.track_init, self.xyz_track 

94 

95 @staticmethod 

96 def feature2track(trk, feature, track): 

97 """ 

98 Estimate new values of trk according to interpolated fit between feature and track space 

99 :param trk: points in track space to convert feature space 

100 :type trk: np.array 

101 :param feature: reference coordinates in feature space (ephys plots) 

102 :type feature: np.array((n_lines + 2)) n_lines: no. of user reference lines 

103 :param track: reference coordinates in track space (histology track) 

104 :type track: np.array((n_lines + 2)) 

105 :return fcn(trk): interpolated values of trk 

106 :type fcn(trk): np.array 

107 """ 

108 

109 fcn = scipy.interpolate.interp1d(feature, track, fill_value="extrapolate") 1bcdhfeg

110 return fcn(trk) 1bcdhfeg

111 

112 @staticmethod 

113 def track2feature(ft, feature, track): 

114 """ 

115 Estimate new values of ft according to interpolated fit between track and feature space 

116 :param ft: points in feature space to convert track space 

117 :type ft: np.array 

118 :param feature: reference coordinates in feature space (ephys plots) 

119 :type feature: np.array((n_lines + 2)) n_lines: no. of user reference lines 

120 :param track: reference coordinates in track space (histology track) 

121 :type track: np.array((n_lines + 2)) 

122 :return fcn(ft): interpolated values of ft 

123 :type fcn(ft): np.array 

124 """ 

125 fcn = scipy.interpolate.interp1d(track, feature, fill_value="extrapolate") 1dfe

126 return fcn(ft) 1dfe

127 

128 @staticmethod 

129 def feature2track_lin(trk, feature, track): 

130 """ 

131 Estimate new values of trk according to linear fit between feature and track space, only 

132 implemented if no. of reference points >= 3 

133 :param trk: points in track space to convert feature space 

134 :type trk: np.array 

135 :param feature: reference coordinates in feature space (ephys plots) 

136 :type feature: np.array((n_lines + 2)) n_lines: no. of user reference lines 

137 :param track: reference coordinates in track space (histology track) 

138 :type track: np.array((n_lines + 2)) 

139 :return fcn(trk): linear fit values of trk 

140 :type fcn(trk): np.array 

141 """ 

142 if feature.size >= 5: 1d

143 fcn_lin = np.poly1d(np.polyfit(feature[1:-1], track[1:-1], 1)) 1d

144 lin_fit = fcn_lin(trk) 1d

145 else: 

146 lin_fit = 0 

147 return lin_fit 1d

148 

149 @staticmethod 

150 def adjust_extremes_uniform(feature, track): 

151 """ 

152 Change the value of the first and last reference points (non user chosen points) such 

153 that coordinates outside user picked regions are left unchanged 

154 :param feature: reference coordinates in feature space (ephys plots) 

155 :type feature: np.array((n_lines + 2)) n_lines: no. of user reference lines 

156 :param track: reference coordinates in track space (histology track) 

157 :type track: np.array((n_lines + 2)) 

158 :return track: reference coordinates in track space with first and last value adjusted 

159 :type track: np.array((n_lines + 2)) 

160 """ 

161 diff = np.diff(feature - track) 1fe

162 track[0] -= diff[0] 1fe

163 track[-1] += diff[-1] 1fe

164 return track 1fe

165 

166 def adjust_extremes_linear(self, feature, track, extend_feature=1): 

167 """ 

168 Change the value of the first and last reference points (non user chosen points) such 

169 that coordinates outside user picked regions have a linear fit applied 

170 :param feature: reference coordinates in feature space (ephys plots) 

171 :type feature: np.array((n_lines + 2)) n_lines: no. of user reference lines 

172 :param track: reference coordinates in track space (histology track) 

173 :type track: np.array((n_lines + 2)) 

174 :param extend_feature: amount to extend extreme coordinates before applying linear fit 

175 :type extend_feature: float 

176 :return feature: reference coordinates in feature space with first and last value adjusted 

177 :type feature: np.array((n_lines + 2)) 

178 :return track: reference coordinates in track space with first and last value adjusted 

179 :type track: np.array((n_lines + 2)) 

180 """ 

181 

182 feature[0] = self.track_init[0] - extend_feature 1d

183 feature[-1] = self.track_init[-1] + extend_feature 1d

184 extend_track = self.feature2track_lin(feature[[0, -1]], feature, track) 1d

185 track[0] = extend_track[0] 1d

186 track[-1] = extend_track[-1] 1d

187 return feature, track 1d

188 

189 def scale_histology_regions(self, feature, track, region=None, region_label=None): 

190 """ 

191 Recompute locations of brain region boundaries using interpolated fit based on reference 

192 lines 

193 :param feature: reference coordinates in feature space (ephys plots) 

194 :type feature: np.array((n_lines + 2)) n_lines: no. of user reference lines 

195 :param track: reference coordinates in track space (histology track) 

196 :type track: np.array((n_lines + 2)) 

197 :return region: new coordinates of histology boundaries after applying interpolation 

198 :type region: np.array((n_bound, 2)) n_bound: no. of histology boundaries 

199 :return region_label: new coordinates of histology labels positions after applying 

200 interpolation 

201 :type region_label: np.array((n_bound)) of tuples (coordinate - float, label - str) 

202 """ 

203 region = np.copy(region) if region is not None else np.copy(self.region) 1dfe

204 region_label = np.copy(region_label) if region_label is not None else np.copy(self.region_label) 1dfe

205 region = self.track2feature(region, feature, track) * 1e6 1dfe

206 region_label[:, 0] = (self.track2feature(np.float64(region_label[:, 0]), feature, 1dfe

207 track) * 1e6) 

208 return region, region_label 1dfe

209 

210 @staticmethod 

211 def get_histology_regions(xyz_coords, depth_coords, brain_atlas=None, mapping=None): 

212 """ 

213 Find all brain regions and their boundaries along the depth of probe or track 

214 :param xyz_coords: 3D coordinates of points along probe or track 

215 :type xyz_coords: np.array((n_points, 3)) n_points: no. of points 

216 :param depth_coords: depth along probe or track where each xyz_coord is located 

217 :type depth_coords: np.array((n_points)) 

218 :return region: coordinates bounding each brain region 

219 :type region: np.array((n_bound, 2)) n_bound: no. of histology boundaries 

220 :return region_label: label for each brain region and coordinate of where to place label 

221 :type region_label: np.array((n_bound)) of tuples (coordinate - float, label - str) 

222 :return region_colour: allen atlas rgb colour for each brain region along track 

223 :type region_colour: np.array((n_bound, 3)) 

224 :return region_id: allen atlas id for each brain region along track 

225 :type region_id: np.array((n_bound)) 

226 """ 

227 if not brain_atlas: 1abc

228 brain_atlas = atlas.AllenAtlas(25) 

229 

230 region_ids = brain_atlas.get_labels(xyz_coords, mapping=mapping) 1abc

231 region_info = brain_atlas.regions.get(region_ids) 1abc

232 boundaries = np.where(np.diff(region_info.id))[0] 1abc

233 region = np.empty((boundaries.size + 1, 2)) 1abc

234 region_label = np.empty((boundaries.size + 1, 2), dtype=object) 1abc

235 region_id = np.empty((boundaries.size + 1, 1), dtype=int) 1abc

236 region_colour = np.empty((boundaries.size + 1, 3), dtype=int) 1abc

237 for bound in np.arange(boundaries.size + 1): 1abc

238 if bound == 0: 1abc

239 _region = np.array([0, boundaries[bound]]) 1abc

240 elif bound == boundaries.size: 1abc

241 _region = np.array([boundaries[bound - 1], region_info.id.size - 1]) 1abc

242 else: 

243 _region = np.array([boundaries[bound - 1], boundaries[bound]]) 1abc

244 _region_colour = region_info.rgb[_region[1]] 1abc

245 _region_label = region_info.acronym[_region[1]] 1abc

246 _region_id = region_info.id[_region[1]] 1abc

247 _region = depth_coords[_region] 1abc

248 _region_mean = np.mean(_region) 1abc

249 region[bound, :] = _region 1abc

250 region_colour[bound, :] = _region_colour 1abc

251 region_id[bound, :] = _region_id 1abc

252 region_label[bound, :] = (_region_mean, _region_label) 1abc

253 

254 return region, region_label, region_colour, region_id 1abc

255 

256 @staticmethod 

257 def get_nearest_boundary(xyz_coords, allen, extent=100, steps=8, parent=True, 

258 brain_atlas=None): 

259 """ 

260 Finds distance to closest neighbouring brain region along trajectory. For each point in 

261 xyz_coords computes the plane passing through point and perpendicular to trajectory and 

262 finds all brain regions that lie in that plane up to a given distance extent from specified 

263 point. Additionally, if requested, computes distance between the parents of regions. 

264 :param xyz_coords: 3D coordinates of points along probe or track 

265 :type xyz_coords: np.array((n_points, 3)) n_points: no. of points 

266 :param allen: dataframe containing allen info. Loaded from allen_structure_tree in 

267 ibllib/atlas 

268 :type allen: pandas Dataframe 

269 :param extent: extent of plane in each direction from origin in (um) 

270 :type extent: float 

271 :param steps: no. of steps to discretise plane into 

272 :type steps: int 

273 :param parent: Whether to also compute nearest distance between parents of regions 

274 :type parent: bool 

275 :return nearest_bound: dict containing results 

276 :type nearest_bound: dict 

277 """ 

278 if not brain_atlas: 

279 brain_atlas = atlas.AllenAtlas(25) 

280 

281 vector = atlas.Insertion.from_track(xyz_coords, brain_atlas=brain_atlas).trajectory.vector 

282 nearest_bound = dict() 

283 nearest_bound['dist'] = np.zeros((xyz_coords.shape[0])) 

284 nearest_bound['id'] = np.zeros((xyz_coords.shape[0])) 

285 # nearest_bound['adj_id'] = np.zeros((xyz_coords.shape[0])) 

286 nearest_bound['col'] = [] 

287 

288 if parent: 

289 nearest_bound['parent_dist'] = np.zeros((xyz_coords.shape[0])) 

290 nearest_bound['parent_id'] = np.zeros((xyz_coords.shape[0])) 

291 # nearest_bound['parent_adj_id'] = np.zeros((xyz_coords.shape[0])) 

292 nearest_bound['parent_col'] = [] 

293 

294 for iP, point in enumerate(xyz_coords): 

295 d = np.dot(vector, point) 

296 x_vals = np.r_[np.linspace(point[0] - extent / 1e6, point[0] + extent / 1e6, steps), 

297 point[0]] 

298 y_vals = np.r_[np.linspace(point[1] - extent / 1e6, point[1] + extent / 1e6, steps), 

299 point[1]] 

300 

301 X, Y = np.meshgrid(x_vals, y_vals) 

302 Z = (d - vector[0] * X - vector[1] * Y) / vector[2] 

303 XYZ = np.c_[np.reshape(X, X.size), np.reshape(Y, Y.size), np.reshape(Z, Z.size)] 

304 dist = np.sqrt(np.sum((XYZ - point) ** 2, axis=1)) 

305 

306 try: 

307 brain_id = brain_atlas.regions.get(brain_atlas.get_labels(XYZ))['id'] 

308 except Exception as err: 

309 print(err) 

310 continue 

311 

312 dist_sorted = np.argsort(dist) 

313 brain_id_sorted = brain_id[dist_sorted] 

314 nearest_bound['id'][iP] = brain_id_sorted[0] 

315 nearest_bound['col'].append(allen['color_hex_triplet'][np.where(allen['id'] == 

316 brain_id_sorted[0])[0][0]]) 

317 bound_idx = np.where(brain_id_sorted != brain_id_sorted[0])[0] 

318 if np.any(bound_idx): 

319 nearest_bound['dist'][iP] = dist[dist_sorted[bound_idx[0]]] * 1e6 

320 # nearest_bound['adj_id'][iP] = brain_id_sorted[bound_idx[0]] 

321 else: 

322 nearest_bound['dist'][iP] = np.max(dist) * 1e6 

323 # nearest_bound['adj_id'][iP] = brain_id_sorted[0] 

324 

325 if parent: 

326 # Now compute for the parents 

327 brain_parent = np.array([allen['parent_structure_id'][np.where(allen['id'] == br) 

328 [0][0]] for br in brain_id_sorted]) 

329 brain_parent[np.isnan(brain_parent)] = 0 

330 

331 nearest_bound['parent_id'][iP] = brain_parent[0] 

332 nearest_bound['parent_col'].append(allen['color_hex_triplet'] 

333 [np.where(allen['id'] == 

334 brain_parent[0])[0][0]]) 

335 

336 parent_idx = np.where(brain_parent != brain_parent[0])[0] 

337 if np.any(parent_idx): 

338 nearest_bound['parent_dist'][iP] = dist[dist_sorted[parent_idx[0]]] * 1e6 

339 # nearest_bound['parent_adj_id'][iP] = brain_parent[parent_idx[0]] 

340 else: 

341 nearest_bound['parent_dist'][iP] = np.max(dist) * 1e6 

342 # nearest_bound['parent_adj_id'][iP] = brain_parent[0] 

343 

344 return nearest_bound 

345 

346 @staticmethod 

347 def arrange_into_regions(depth_coords, region_ids, distance, region_colours): 

348 """ 

349 Arrange output from get_nearest_boundary into a form that can be plot using pyqtgraph or 

350 matplotlib 

351 :param depth_coords: depth along probe or track where each point is located 

352 :type depth_coords: np.array((n_points)) 

353 :param region_ids: brain region id at each depth along probe 

354 :type regions_ids: np.array((n_points)) 

355 :param distance: distance to nearest boundary in plane at each point 

356 :type distance: np.array((n_points)) 

357 :param region_colours: allen atlas hex colour for each region id 

358 :type region_colours: list of strings len(n_points) 

359 :return all_x: dist values for each region along probe track 

360 :type all_x: list of np.array 

361 :return all_y: depth values for each region along probe track 

362 :type all_y: list of np.array 

363 :return all_colour: colour assigned to each region along probe track 

364 :type all_colour: list of str 

365 """ 

366 

367 boundaries = np.where(np.diff(region_ids))[0] 

368 bound = np.r_[0, boundaries + 1, region_ids.shape[0]] 

369 all_y = [] 

370 all_x = [] 

371 all_colour = [] 

372 for iB in np.arange(len(bound) - 1): 

373 y = depth_coords[bound[iB]:(bound[iB + 1])] 

374 y = np.r_[y[0], y, y[-1]] 

375 x = distance[bound[iB]:(bound[iB + 1])] 

376 x = np.r_[0, x, 0] 

377 all_y.append(y) 

378 all_x.append(x) 

379 col = region_colours[bound[iB]] 

380 if not isinstance(col, str): 

381 col = '#FFFFFF' 

382 else: 

383 col = '#' + col 

384 all_colour.append(col) 

385 

386 return all_x, all_y, all_colour 

387 

388 def get_scale_factor(self, region, region_orig=None): 

389 """ 

390 Find how much each brain region has been scaled following interpolation 

391 :param region: scaled histology boundaries 

392 :type region: np.array((n_bound, 2)) n_bound: no. of histology boundaries 

393 :return scaled_region: regions that have unique scaling applied 

394 :type scaled_region: np.array((n_scale, 2)) n_scale: no. of uniquely scaled regions 

395 :return scale_factor: scale factor applied to each scaled region 

396 :type scale_factor: np.array((n_scale)) 

397 """ 

398 

399 region_orig = region_orig if region_orig is not None else self.region 1dfe

400 scale = [] 1dfe

401 for iR, (reg, reg_orig) in enumerate(zip(region, region_orig * 1e6)): 1dfe

402 scale = np.r_[scale, (reg[1] - reg[0]) / (reg_orig[1] - reg_orig[0])] 1dfe

403 boundaries = np.where(np.diff(np.around(scale, 3)))[0] 1dfe

404 if boundaries.size == 0: 1dfe

405 scaled_region = np.array([[region[0][0], region[-1][1]]]) 1f

406 scale_factor = np.unique(scale) 1f

407 else: 

408 scaled_region = np.empty((boundaries.size + 1, 2)) 1de

409 scale_factor = [] 1de

410 for bound in np.arange(boundaries.size + 1): 1de

411 if bound == 0: 1de

412 _scaled_region = np.array([region[0][0], 1de

413 region[boundaries[bound]][1]]) 

414 _scale_factor = scale[0] 1de

415 elif bound == boundaries.size: 1de

416 _scaled_region = np.array([region[boundaries[bound - 1]][1], 1de

417 region[-1][1]]) 

418 _scale_factor = scale[-1] 1de

419 else: 

420 _scaled_region = np.array([region[boundaries[bound - 1]][1], 1de

421 region[boundaries[bound]][1]]) 

422 _scale_factor = scale[boundaries[bound]] 1de

423 scaled_region[bound, :] = _scaled_region 1de

424 scale_factor = np.r_[scale_factor, _scale_factor] 1de

425 return scaled_region, scale_factor 1dfe

426 

427 def get_channel_locations(self, feature, track, depths=None): 

428 """ 

429 Gets 3d coordinates from a depth along the electrophysiology feature. 2 steps 

430 1) interpolate from the electrophys features depths space to the probe depth space 

431 2) interpolate from the probe depth space to the true 3D coordinates 

432 if depths is not provided, defaults to channels local coordinates depths 

433 """ 

434 if depths is None: 1bchfg

435 depths = self.chn_depths / 1e6 1bc

436 # nb using scipy here so we can change to cubic spline if needed 

437 channel_depths_track = self.feature2track(depths, feature, track) - self.track_extent[0] 1bchfg

438 xyz_channels = histology.interpolate_along_track(self.xyz_track, channel_depths_track) 1bchfg

439 return xyz_channels 1bchfg

440 

441 def get_brain_locations(self, xyz_channels): 

442 """ 

443 Finds the brain regions from 3D coordinates of electrode locations 

444 :param xyz_channels: 3D coordinates of electrodes on probe 

445 :type xyz_channels: np.array((n_elec, 3)) n_elec: no. of electrodes (384) 

446 :return brain_regions: brain region object for each electrode 

447 :type dict 

448 """ 

449 brain_regions = self.brain_atlas.regions.get(self.brain_atlas.get_labels(xyz_channels)) 1bcg

450 return brain_regions 1bcg

451 

452 def get_perp_vector(self, feature, track): 

453 """ 

454 Finds the perpendicular vector along the trajectory at the depth of reference lines 

455 :param feature: reference coordinates in feature space (ephys plots) 

456 :type feature: np.array((n_lines + 2)) n_lines: no. of user reference lines 

457 :param track: reference coordinates in track space (histology track) 

458 :type track: np.array((n_line+2)) 

459 :return slice_lines: coordinates of perpendicular lines 

460 :type slice_lines: np.array((n_lines, 2)) 

461 """ 

462 

463 slice_lines = [] 

464 for line in feature[1:-1]: 

465 depths = np.array([line, line + 10 / 1e6]) 

466 xyz = self.get_channel_locations(feature, track, depths) 

467 

468 extent = 500e-6 

469 vector = np.diff(xyz, axis=0)[0] 

470 point = xyz[0, :] 

471 vector_perp = np.array([1, 0, -1 * vector[0] / vector[2]]) 

472 xyz_per = np.r_[[point + (-1 * extent * vector_perp)], 

473 [point + (extent * vector_perp)]] 

474 slice_lines.append(xyz_per) 

475 

476 return slice_lines