Coverage for ibllib/pipes/ephys_alignment.py: 64%
215 statements
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 15:25 +0000
« prev ^ index » next coverage.py v7.7.0, created at 2025-03-17 15:25 +0000
1import scipy
2import numpy as np
3import ibllib.pipes.histology as histology
4import iblatlas.atlas as atlas
5TIP_SIZE_UM = 200
8def _cumulative_distance(xyz):
9 return np.cumsum(np.r_[0, np.sqrt(np.sum(np.diff(xyz, axis=0) ** 2, axis=1))]) 1abchf
12class EphysAlignment:
14 def __init__(self, xyz_picks, chn_depths=None, track_prev=None,
15 feature_prev=None, brain_atlas=None, speedy=False):
17 if not brain_atlas: 1abc
18 self.brain_atlas = atlas.AllenAtlas(25)
19 else:
20 self.brain_atlas = brain_atlas 1abc
22 self.xyz_track, self.track_extent = self.get_insertion_track(xyz_picks, speedy=speedy) 1abc
24 self.chn_depths = chn_depths 1abc
25 if np.any(track_prev): 1abc
26 self.track_init = track_prev 1abc
27 self.feature_init = feature_prev 1abc
28 else:
29 start_lims = 6000 / 1e6
30 self.track_init = np.array([-1 * start_lims, start_lims])
31 self.feature_init = np.array([-1 * start_lims, start_lims])
33 self.sampling_trk = np.arange(self.track_extent[0], 1abc
34 self.track_extent[-1] - 10 * 1e-6, 10 * 1e-6)
35 self.xyz_samples = histology.interpolate_along_track(self.xyz_track, 1abc
36 self.sampling_trk -
37 self.sampling_trk[0])
38 # ensure none of the track is outside the y or x lim of atlas
39 xlim = np.bitwise_and(self.xyz_samples[:, 0] > self.brain_atlas.bc.xlim[0], 1abc
40 self.xyz_samples[:, 0] < self.brain_atlas.bc.xlim[1])
41 ylim = np.bitwise_and(self.xyz_samples[:, 1] < self.brain_atlas.bc.ylim[0], 1abc
42 self.xyz_samples[:, 1] > self.brain_atlas.bc.ylim[1])
43 rem = np.bitwise_and(xlim, ylim) 1abc
44 self.xyz_samples = self.xyz_samples[rem] 1abc
46 self.region, self.region_label, self.region_colour, self.region_id\ 1abc
47 = self.get_histology_regions(self.xyz_samples, self.sampling_trk, self.brain_atlas)
49 def get_insertion_track(self, xyz_picks, speedy=False):
50 """
51 Extends probe trajectory from bottom of brain to upper bound of allen atlas
52 :param xyz_picks: points defining probe trajectory in 3D space (xyz)
53 :type xyz_picks: np.array((n, 3)) - n: no. of unique points
54 :return xyz_track: points defining extended trajectory in 3D space (xyz)
55 :type xyz_track: np.array((n+2, 3))
56 :return track_extent: cumulative distance between two extremes of xyz_track (bottom of
57 brain and top of atlas) offset by distance to probe tip
58 :type track_extent: np.array((2))
59 """
60 # Use the first and last quarter of xyz_picks to estimate the trajectory beyond xyz_picks
61 n_picks = np.max([4, round(xyz_picks.shape[0] / 4)]) 1abc
62 traj_entry = atlas.Trajectory.fit(xyz_picks[:n_picks, :]) 1abc
63 traj_exit = atlas.Trajectory.fit(xyz_picks[-1 * n_picks:, :]) 1abc
65 # Force the entry to be on the upper z lim of the atlas to account for cases where channels
66 # may be located above the surface of the brain
67 entry = (traj_entry.eval_z(self.brain_atlas.bc.zlim))[0, :] 1abc
68 if speedy: 1abc
69 exit = (traj_exit.eval_z(self.brain_atlas.bc.zlim))[1, :]
70 else:
71 exit = atlas.Insertion.get_brain_exit(traj_exit, self.brain_atlas) 1abc
72 # The exit is just below the bottom surfacce of the brain
73 exit[2] = exit[2] - 200 / 1e6 1abc
75 # Catch cases where the exit
76 if any(np.isnan(exit)): 1abc
77 exit = (traj_exit.eval_z(self.brain_atlas.bc.zlim))[1, :]
78 xyz_track = np.r_[exit[np.newaxis, :], xyz_picks, entry[np.newaxis, :]] 1abc
79 # Sort so that most ventral coordinate is first
80 xyz_track = xyz_track[np.argsort(xyz_track[:, 2]), :] 1abc
82 # Compute distance to first electrode from bottom coordinate
83 tip_distance = _cumulative_distance(xyz_track)[1] + TIP_SIZE_UM / 1e6 1abc
84 track_length = _cumulative_distance(xyz_track)[-1] 1abc
85 track_extent = np.array([0, track_length]) - tip_distance 1abc
87 return xyz_track, track_extent 1abc
89 def get_track_and_feature(self):
90 """
91 Return track, feature and xyz_track variables
92 """
93 return self.feature_init, self.track_init, self.xyz_track
95 @staticmethod
96 def feature2track(trk, feature, track):
97 """
98 Estimate new values of trk according to interpolated fit between feature and track space
99 :param trk: points in track space to convert feature space
100 :type trk: np.array
101 :param feature: reference coordinates in feature space (ephys plots)
102 :type feature: np.array((n_lines + 2)) n_lines: no. of user reference lines
103 :param track: reference coordinates in track space (histology track)
104 :type track: np.array((n_lines + 2))
105 :return fcn(trk): interpolated values of trk
106 :type fcn(trk): np.array
107 """
109 fcn = scipy.interpolate.interp1d(feature, track, fill_value="extrapolate") 1bcdhfeg
110 return fcn(trk) 1bcdhfeg
112 @staticmethod
113 def track2feature(ft, feature, track):
114 """
115 Estimate new values of ft according to interpolated fit between track and feature space
116 :param ft: points in feature space to convert track space
117 :type ft: np.array
118 :param feature: reference coordinates in feature space (ephys plots)
119 :type feature: np.array((n_lines + 2)) n_lines: no. of user reference lines
120 :param track: reference coordinates in track space (histology track)
121 :type track: np.array((n_lines + 2))
122 :return fcn(ft): interpolated values of ft
123 :type fcn(ft): np.array
124 """
125 fcn = scipy.interpolate.interp1d(track, feature, fill_value="extrapolate") 1dfe
126 return fcn(ft) 1dfe
128 @staticmethod
129 def feature2track_lin(trk, feature, track):
130 """
131 Estimate new values of trk according to linear fit between feature and track space, only
132 implemented if no. of reference points >= 3
133 :param trk: points in track space to convert feature space
134 :type trk: np.array
135 :param feature: reference coordinates in feature space (ephys plots)
136 :type feature: np.array((n_lines + 2)) n_lines: no. of user reference lines
137 :param track: reference coordinates in track space (histology track)
138 :type track: np.array((n_lines + 2))
139 :return fcn(trk): linear fit values of trk
140 :type fcn(trk): np.array
141 """
142 if feature.size >= 5: 1d
143 fcn_lin = np.poly1d(np.polyfit(feature[1:-1], track[1:-1], 1)) 1d
144 lin_fit = fcn_lin(trk) 1d
145 else:
146 lin_fit = 0
147 return lin_fit 1d
149 @staticmethod
150 def adjust_extremes_uniform(feature, track):
151 """
152 Change the value of the first and last reference points (non user chosen points) such
153 that coordinates outside user picked regions are left unchanged
154 :param feature: reference coordinates in feature space (ephys plots)
155 :type feature: np.array((n_lines + 2)) n_lines: no. of user reference lines
156 :param track: reference coordinates in track space (histology track)
157 :type track: np.array((n_lines + 2))
158 :return track: reference coordinates in track space with first and last value adjusted
159 :type track: np.array((n_lines + 2))
160 """
161 diff = np.diff(feature - track) 1fe
162 track[0] -= diff[0] 1fe
163 track[-1] += diff[-1] 1fe
164 return track 1fe
166 def adjust_extremes_linear(self, feature, track, extend_feature=1):
167 """
168 Change the value of the first and last reference points (non user chosen points) such
169 that coordinates outside user picked regions have a linear fit applied
170 :param feature: reference coordinates in feature space (ephys plots)
171 :type feature: np.array((n_lines + 2)) n_lines: no. of user reference lines
172 :param track: reference coordinates in track space (histology track)
173 :type track: np.array((n_lines + 2))
174 :param extend_feature: amount to extend extreme coordinates before applying linear fit
175 :type extend_feature: float
176 :return feature: reference coordinates in feature space with first and last value adjusted
177 :type feature: np.array((n_lines + 2))
178 :return track: reference coordinates in track space with first and last value adjusted
179 :type track: np.array((n_lines + 2))
180 """
182 feature[0] = self.track_init[0] - extend_feature 1d
183 feature[-1] = self.track_init[-1] + extend_feature 1d
184 extend_track = self.feature2track_lin(feature[[0, -1]], feature, track) 1d
185 track[0] = extend_track[0] 1d
186 track[-1] = extend_track[-1] 1d
187 return feature, track 1d
189 def scale_histology_regions(self, feature, track, region=None, region_label=None):
190 """
191 Recompute locations of brain region boundaries using interpolated fit based on reference
192 lines
193 :param feature: reference coordinates in feature space (ephys plots)
194 :type feature: np.array((n_lines + 2)) n_lines: no. of user reference lines
195 :param track: reference coordinates in track space (histology track)
196 :type track: np.array((n_lines + 2))
197 :return region: new coordinates of histology boundaries after applying interpolation
198 :type region: np.array((n_bound, 2)) n_bound: no. of histology boundaries
199 :return region_label: new coordinates of histology labels positions after applying
200 interpolation
201 :type region_label: np.array((n_bound)) of tuples (coordinate - float, label - str)
202 """
203 region = np.copy(region) if region is not None else np.copy(self.region) 1dfe
204 region_label = np.copy(region_label) if region_label is not None else np.copy(self.region_label) 1dfe
205 region = self.track2feature(region, feature, track) * 1e6 1dfe
206 region_label[:, 0] = (self.track2feature(np.float64(region_label[:, 0]), feature, 1dfe
207 track) * 1e6)
208 return region, region_label 1dfe
210 @staticmethod
211 def get_histology_regions(xyz_coords, depth_coords, brain_atlas=None, mapping=None):
212 """
213 Find all brain regions and their boundaries along the depth of probe or track
214 :param xyz_coords: 3D coordinates of points along probe or track
215 :type xyz_coords: np.array((n_points, 3)) n_points: no. of points
216 :param depth_coords: depth along probe or track where each xyz_coord is located
217 :type depth_coords: np.array((n_points))
218 :return region: coordinates bounding each brain region
219 :type region: np.array((n_bound, 2)) n_bound: no. of histology boundaries
220 :return region_label: label for each brain region and coordinate of where to place label
221 :type region_label: np.array((n_bound)) of tuples (coordinate - float, label - str)
222 :return region_colour: allen atlas rgb colour for each brain region along track
223 :type region_colour: np.array((n_bound, 3))
224 :return region_id: allen atlas id for each brain region along track
225 :type region_id: np.array((n_bound))
226 """
227 if not brain_atlas: 1abc
228 brain_atlas = atlas.AllenAtlas(25)
230 region_ids = brain_atlas.get_labels(xyz_coords, mapping=mapping) 1abc
231 region_info = brain_atlas.regions.get(region_ids) 1abc
232 boundaries = np.where(np.diff(region_info.id))[0] 1abc
233 region = np.empty((boundaries.size + 1, 2)) 1abc
234 region_label = np.empty((boundaries.size + 1, 2), dtype=object) 1abc
235 region_id = np.empty((boundaries.size + 1, 1), dtype=int) 1abc
236 region_colour = np.empty((boundaries.size + 1, 3), dtype=int) 1abc
237 for bound in np.arange(boundaries.size + 1): 1abc
238 if bound == 0: 1abc
239 _region = np.array([0, boundaries[bound]]) 1abc
240 elif bound == boundaries.size: 1abc
241 _region = np.array([boundaries[bound - 1], region_info.id.size - 1]) 1abc
242 else:
243 _region = np.array([boundaries[bound - 1], boundaries[bound]]) 1abc
244 _region_colour = region_info.rgb[_region[1]] 1abc
245 _region_label = region_info.acronym[_region[1]] 1abc
246 _region_id = region_info.id[_region[1]] 1abc
247 _region = depth_coords[_region] 1abc
248 _region_mean = np.mean(_region) 1abc
249 region[bound, :] = _region 1abc
250 region_colour[bound, :] = _region_colour 1abc
251 region_id[bound, :] = _region_id 1abc
252 region_label[bound, :] = (_region_mean, _region_label) 1abc
254 return region, region_label, region_colour, region_id 1abc
256 @staticmethod
257 def get_nearest_boundary(xyz_coords, allen, extent=100, steps=8, parent=True,
258 brain_atlas=None):
259 """
260 Finds distance to closest neighbouring brain region along trajectory. For each point in
261 xyz_coords computes the plane passing through point and perpendicular to trajectory and
262 finds all brain regions that lie in that plane up to a given distance extent from specified
263 point. Additionally, if requested, computes distance between the parents of regions.
264 :param xyz_coords: 3D coordinates of points along probe or track
265 :type xyz_coords: np.array((n_points, 3)) n_points: no. of points
266 :param allen: dataframe containing allen info. Loaded from allen_structure_tree in
267 ibllib/atlas
268 :type allen: pandas Dataframe
269 :param extent: extent of plane in each direction from origin in (um)
270 :type extent: float
271 :param steps: no. of steps to discretise plane into
272 :type steps: int
273 :param parent: Whether to also compute nearest distance between parents of regions
274 :type parent: bool
275 :return nearest_bound: dict containing results
276 :type nearest_bound: dict
277 """
278 if not brain_atlas:
279 brain_atlas = atlas.AllenAtlas(25)
281 vector = atlas.Insertion.from_track(xyz_coords, brain_atlas=brain_atlas).trajectory.vector
282 nearest_bound = dict()
283 nearest_bound['dist'] = np.zeros((xyz_coords.shape[0]))
284 nearest_bound['id'] = np.zeros((xyz_coords.shape[0]))
285 # nearest_bound['adj_id'] = np.zeros((xyz_coords.shape[0]))
286 nearest_bound['col'] = []
288 if parent:
289 nearest_bound['parent_dist'] = np.zeros((xyz_coords.shape[0]))
290 nearest_bound['parent_id'] = np.zeros((xyz_coords.shape[0]))
291 # nearest_bound['parent_adj_id'] = np.zeros((xyz_coords.shape[0]))
292 nearest_bound['parent_col'] = []
294 for iP, point in enumerate(xyz_coords):
295 d = np.dot(vector, point)
296 x_vals = np.r_[np.linspace(point[0] - extent / 1e6, point[0] + extent / 1e6, steps),
297 point[0]]
298 y_vals = np.r_[np.linspace(point[1] - extent / 1e6, point[1] + extent / 1e6, steps),
299 point[1]]
301 X, Y = np.meshgrid(x_vals, y_vals)
302 Z = (d - vector[0] * X - vector[1] * Y) / vector[2]
303 XYZ = np.c_[np.reshape(X, X.size), np.reshape(Y, Y.size), np.reshape(Z, Z.size)]
304 dist = np.sqrt(np.sum((XYZ - point) ** 2, axis=1))
306 try:
307 brain_id = brain_atlas.regions.get(brain_atlas.get_labels(XYZ))['id']
308 except Exception as err:
309 print(err)
310 continue
312 dist_sorted = np.argsort(dist)
313 brain_id_sorted = brain_id[dist_sorted]
314 nearest_bound['id'][iP] = brain_id_sorted[0]
315 nearest_bound['col'].append(allen['color_hex_triplet'][np.where(allen['id'] ==
316 brain_id_sorted[0])[0][0]])
317 bound_idx = np.where(brain_id_sorted != brain_id_sorted[0])[0]
318 if np.any(bound_idx):
319 nearest_bound['dist'][iP] = dist[dist_sorted[bound_idx[0]]] * 1e6
320 # nearest_bound['adj_id'][iP] = brain_id_sorted[bound_idx[0]]
321 else:
322 nearest_bound['dist'][iP] = np.max(dist) * 1e6
323 # nearest_bound['adj_id'][iP] = brain_id_sorted[0]
325 if parent:
326 # Now compute for the parents
327 brain_parent = np.array([allen['parent_structure_id'][np.where(allen['id'] == br)
328 [0][0]] for br in brain_id_sorted])
329 brain_parent[np.isnan(brain_parent)] = 0
331 nearest_bound['parent_id'][iP] = brain_parent[0]
332 nearest_bound['parent_col'].append(allen['color_hex_triplet']
333 [np.where(allen['id'] ==
334 brain_parent[0])[0][0]])
336 parent_idx = np.where(brain_parent != brain_parent[0])[0]
337 if np.any(parent_idx):
338 nearest_bound['parent_dist'][iP] = dist[dist_sorted[parent_idx[0]]] * 1e6
339 # nearest_bound['parent_adj_id'][iP] = brain_parent[parent_idx[0]]
340 else:
341 nearest_bound['parent_dist'][iP] = np.max(dist) * 1e6
342 # nearest_bound['parent_adj_id'][iP] = brain_parent[0]
344 return nearest_bound
346 @staticmethod
347 def arrange_into_regions(depth_coords, region_ids, distance, region_colours):
348 """
349 Arrange output from get_nearest_boundary into a form that can be plot using pyqtgraph or
350 matplotlib
351 :param depth_coords: depth along probe or track where each point is located
352 :type depth_coords: np.array((n_points))
353 :param region_ids: brain region id at each depth along probe
354 :type regions_ids: np.array((n_points))
355 :param distance: distance to nearest boundary in plane at each point
356 :type distance: np.array((n_points))
357 :param region_colours: allen atlas hex colour for each region id
358 :type region_colours: list of strings len(n_points)
359 :return all_x: dist values for each region along probe track
360 :type all_x: list of np.array
361 :return all_y: depth values for each region along probe track
362 :type all_y: list of np.array
363 :return all_colour: colour assigned to each region along probe track
364 :type all_colour: list of str
365 """
367 boundaries = np.where(np.diff(region_ids))[0]
368 bound = np.r_[0, boundaries + 1, region_ids.shape[0]]
369 all_y = []
370 all_x = []
371 all_colour = []
372 for iB in np.arange(len(bound) - 1):
373 y = depth_coords[bound[iB]:(bound[iB + 1])]
374 y = np.r_[y[0], y, y[-1]]
375 x = distance[bound[iB]:(bound[iB + 1])]
376 x = np.r_[0, x, 0]
377 all_y.append(y)
378 all_x.append(x)
379 col = region_colours[bound[iB]]
380 if not isinstance(col, str):
381 col = '#FFFFFF'
382 else:
383 col = '#' + col
384 all_colour.append(col)
386 return all_x, all_y, all_colour
388 def get_scale_factor(self, region, region_orig=None):
389 """
390 Find how much each brain region has been scaled following interpolation
391 :param region: scaled histology boundaries
392 :type region: np.array((n_bound, 2)) n_bound: no. of histology boundaries
393 :return scaled_region: regions that have unique scaling applied
394 :type scaled_region: np.array((n_scale, 2)) n_scale: no. of uniquely scaled regions
395 :return scale_factor: scale factor applied to each scaled region
396 :type scale_factor: np.array((n_scale))
397 """
399 region_orig = region_orig if region_orig is not None else self.region 1dfe
400 scale = [] 1dfe
401 for iR, (reg, reg_orig) in enumerate(zip(region, region_orig * 1e6)): 1dfe
402 scale = np.r_[scale, (reg[1] - reg[0]) / (reg_orig[1] - reg_orig[0])] 1dfe
403 boundaries = np.where(np.diff(np.around(scale, 3)))[0] 1dfe
404 if boundaries.size == 0: 1dfe
405 scaled_region = np.array([[region[0][0], region[-1][1]]]) 1f
406 scale_factor = np.unique(scale) 1f
407 else:
408 scaled_region = np.empty((boundaries.size + 1, 2)) 1de
409 scale_factor = [] 1de
410 for bound in np.arange(boundaries.size + 1): 1de
411 if bound == 0: 1de
412 _scaled_region = np.array([region[0][0], 1de
413 region[boundaries[bound]][1]])
414 _scale_factor = scale[0] 1de
415 elif bound == boundaries.size: 1de
416 _scaled_region = np.array([region[boundaries[bound - 1]][1], 1de
417 region[-1][1]])
418 _scale_factor = scale[-1] 1de
419 else:
420 _scaled_region = np.array([region[boundaries[bound - 1]][1], 1de
421 region[boundaries[bound]][1]])
422 _scale_factor = scale[boundaries[bound]] 1de
423 scaled_region[bound, :] = _scaled_region 1de
424 scale_factor = np.r_[scale_factor, _scale_factor] 1de
425 return scaled_region, scale_factor 1dfe
427 def get_channel_locations(self, feature, track, depths=None):
428 """
429 Gets 3d coordinates from a depth along the electrophysiology feature. 2 steps
430 1) interpolate from the electrophys features depths space to the probe depth space
431 2) interpolate from the probe depth space to the true 3D coordinates
432 if depths is not provided, defaults to channels local coordinates depths
433 """
434 if depths is None: 1bchfg
435 depths = self.chn_depths / 1e6 1bc
436 # nb using scipy here so we can change to cubic spline if needed
437 channel_depths_track = self.feature2track(depths, feature, track) - self.track_extent[0] 1bchfg
438 xyz_channels = histology.interpolate_along_track(self.xyz_track, channel_depths_track) 1bchfg
439 return xyz_channels 1bchfg
441 def get_brain_locations(self, xyz_channels):
442 """
443 Finds the brain regions from 3D coordinates of electrode locations
444 :param xyz_channels: 3D coordinates of electrodes on probe
445 :type xyz_channels: np.array((n_elec, 3)) n_elec: no. of electrodes (384)
446 :return brain_regions: brain region object for each electrode
447 :type dict
448 """
449 brain_regions = self.brain_atlas.regions.get(self.brain_atlas.get_labels(xyz_channels)) 1bcg
450 return brain_regions 1bcg
452 def get_perp_vector(self, feature, track):
453 """
454 Finds the perpendicular vector along the trajectory at the depth of reference lines
455 :param feature: reference coordinates in feature space (ephys plots)
456 :type feature: np.array((n_lines + 2)) n_lines: no. of user reference lines
457 :param track: reference coordinates in track space (histology track)
458 :type track: np.array((n_line+2))
459 :return slice_lines: coordinates of perpendicular lines
460 :type slice_lines: np.array((n_lines, 2))
461 """
463 slice_lines = []
464 for line in feature[1:-1]:
465 depths = np.array([line, line + 10 / 1e6])
466 xyz = self.get_channel_locations(feature, track, depths)
468 extent = 500e-6
469 vector = np.diff(xyz, axis=0)[0]
470 point = xyz[0, :]
471 vector_perp = np.array([1, 0, -1 * vector[0] / vector[2]])
472 xyz_per = np.r_[[point + (-1 * extent * vector_perp)],
473 [point + (extent * vector_perp)]]
474 slice_lines.append(xyz_per)
476 return slice_lines