Coverage for ibllib/pipes/ephys_alignment.py: 64%
215 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 13:38 +0100
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 13:38 +0100
1import scipy
2import numpy as np
3import ibllib.pipes.histology as histology
4import iblatlas.atlas as atlas
5TIP_SIZE_UM = 200
8def _cumulative_distance(xyz):
9 return np.cumsum(np.r_[0, np.sqrt(np.sum(np.diff(xyz, axis=0) ** 2, axis=1))]) 1abge
12class EphysAlignment:
14 def __init__(self, xyz_picks, chn_depths=None, track_prev=None,
15 feature_prev=None, brain_atlas=None, speedy=False):
17 if not brain_atlas: 1ab
18 self.brain_atlas = atlas.AllenAtlas(25)
19 else:
20 self.brain_atlas = brain_atlas 1ab
22 self.xyz_track, self.track_extent = self.get_insertion_track(xyz_picks, speedy=speedy) 1ab
24 self.chn_depths = chn_depths 1ab
25 if np.any(track_prev): 1ab
26 self.track_init = track_prev 1ab
27 self.feature_init = feature_prev 1ab
28 else:
29 start_lims = 6000 / 1e6
30 self.track_init = np.array([-1 * start_lims, start_lims])
31 self.feature_init = np.array([-1 * start_lims, start_lims])
33 self.sampling_trk = np.arange(self.track_extent[0], 1ab
34 self.track_extent[-1] - 10 * 1e-6, 10 * 1e-6)
35 self.xyz_samples = histology.interpolate_along_track(self.xyz_track, 1ab
36 self.sampling_trk -
37 self.sampling_trk[0])
38 # ensure none of the track is outside the y or x lim of atlas
39 xlim = np.bitwise_and(self.xyz_samples[:, 0] > self.brain_atlas.bc.xlim[0], 1ab
40 self.xyz_samples[:, 0] < self.brain_atlas.bc.xlim[1])
41 ylim = np.bitwise_and(self.xyz_samples[:, 1] < self.brain_atlas.bc.ylim[0], 1ab
42 self.xyz_samples[:, 1] > self.brain_atlas.bc.ylim[1])
43 rem = np.bitwise_and(xlim, ylim) 1ab
44 self.xyz_samples = self.xyz_samples[rem] 1ab
46 self.region, self.region_label, self.region_colour, self.region_id\ 1ab
47 = self.get_histology_regions(self.xyz_samples, self.sampling_trk, self.brain_atlas)
49 def get_insertion_track(self, xyz_picks, speedy=False):
50 """
51 Extends probe trajectory from bottom of brain to upper bound of allen atlas
52 :param xyz_picks: points defining probe trajectory in 3D space (xyz)
53 :type xyz_picks: np.array((n, 3)) - n: no. of unique points
54 :return xyz_track: points defining extended trajectory in 3D space (xyz)
55 :type xyz_track: np.array((n+2, 3))
56 :return track_extent: cumulative distance between two extremes of xyz_track (bottom of
57 brain and top of atlas) offset by distance to probe tip
58 :type track_extent: np.array((2))
59 """
60 # Use the first and last quarter of xyz_picks to estimate the trajectory beyond xyz_picks
61 n_picks = np.max([4, round(xyz_picks.shape[0] / 4)]) 1ab
62 traj_entry = atlas.Trajectory.fit(xyz_picks[:n_picks, :]) 1ab
63 traj_exit = atlas.Trajectory.fit(xyz_picks[-1 * n_picks:, :]) 1ab
65 # Force the entry to be on the upper z lim of the atlas to account for cases where channels
66 # may be located above the surface of the brain
67 entry = (traj_entry.eval_z(self.brain_atlas.bc.zlim))[0, :] 1ab
68 if speedy: 1ab
69 exit = (traj_exit.eval_z(self.brain_atlas.bc.zlim))[1, :]
70 else:
71 exit = atlas.Insertion.get_brain_exit(traj_exit, self.brain_atlas) 1ab
72 # The exit is just below the bottom surfacce of the brain
73 exit[2] = exit[2] - 200 / 1e6 1ab
75 # Catch cases where the exit
76 if any(np.isnan(exit)): 1ab
77 exit = (traj_exit.eval_z(self.brain_atlas.bc.zlim))[1, :]
78 xyz_track = np.r_[exit[np.newaxis, :], xyz_picks, entry[np.newaxis, :]] 1ab
79 # Sort so that most ventral coordinate is first
80 xyz_track = xyz_track[np.argsort(xyz_track[:, 2]), :] 1ab
82 # Compute distance to first electrode from bottom coordinate
83 tip_distance = _cumulative_distance(xyz_track)[1] + TIP_SIZE_UM / 1e6 1ab
84 track_length = _cumulative_distance(xyz_track)[-1] 1ab
85 track_extent = np.array([0, track_length]) - tip_distance 1ab
87 return xyz_track, track_extent 1ab
89 def get_track_and_feature(self):
90 """
91 Return track, feature and xyz_track variables
92 """
93 return self.feature_init, self.track_init, self.xyz_track
95 @staticmethod
96 def feature2track(trk, feature, track):
97 """
98 Estimate new values of trk according to interpolated fit between feature and track space
99 :param trk: points in track space to convert feature space
100 :type trk: np.array
101 :param feature: reference coordinates in feature space (ephys plots)
102 :type feature: np.array((n_lines + 2)) n_lines: no. of user reference lines
103 :param track: reference coordinates in track space (histology track)
104 :type track: np.array((n_lines + 2))
105 :return fcn(trk): interpolated values of trk
106 :type fcn(trk): np.array
107 """
109 fcn = scipy.interpolate.interp1d(feature, track, fill_value="extrapolate") 1bcgedf
110 return fcn(trk) 1bcgedf
112 @staticmethod
113 def track2feature(ft, feature, track):
114 """
115 Estimate new values of ft according to interpolated fit between track and feature space
116 :param ft: points in feature space to convert track space
117 :type ft: np.array
118 :param feature: reference coordinates in feature space (ephys plots)
119 :type feature: np.array((n_lines + 2)) n_lines: no. of user reference lines
120 :param track: reference coordinates in track space (histology track)
121 :type track: np.array((n_lines + 2))
122 :return fcn(ft): interpolated values of ft
123 :type fcn(ft): np.array
124 """
125 fcn = scipy.interpolate.interp1d(track, feature, fill_value="extrapolate") 1ced
126 return fcn(ft) 1ced
128 @staticmethod
129 def feature2track_lin(trk, feature, track):
130 """
131 Estimate new values of trk according to linear fit between feature and track space, only
132 implemented if no. of reference points >= 3
133 :param trk: points in track space to convert feature space
134 :type trk: np.array
135 :param feature: reference coordinates in feature space (ephys plots)
136 :type feature: np.array((n_lines + 2)) n_lines: no. of user reference lines
137 :param track: reference coordinates in track space (histology track)
138 :type track: np.array((n_lines + 2))
139 :return fcn(trk): linear fit values of trk
140 :type fcn(trk): np.array
141 """
142 if feature.size >= 5: 1c
143 fcn_lin = np.poly1d(np.polyfit(feature[1:-1], track[1:-1], 1)) 1c
144 lin_fit = fcn_lin(trk) 1c
145 else:
146 lin_fit = 0
147 return lin_fit 1c
149 @staticmethod
150 def adjust_extremes_uniform(feature, track):
151 """
152 Change the value of the first and last reference points (non user chosen points) such
153 that coordinates outside user picked regions are left unchanged
154 :param feature: reference coordinates in feature space (ephys plots)
155 :type feature: np.array((n_lines + 2)) n_lines: no. of user reference lines
156 :param track: reference coordinates in track space (histology track)
157 :type track: np.array((n_lines + 2))
158 :return track: reference coordinates in track space with first and last value adjusted
159 :type track: np.array((n_lines + 2))
160 """
161 diff = np.diff(feature - track) 1ed
162 track[0] -= diff[0] 1ed
163 track[-1] += diff[-1] 1ed
164 return track 1ed
166 def adjust_extremes_linear(self, feature, track, extend_feature=1):
167 """
168 Change the value of the first and last reference points (non user chosen points) such
169 that coordinates outside user picked regions have a linear fit applied
170 :param feature: reference coordinates in feature space (ephys plots)
171 :type feature: np.array((n_lines + 2)) n_lines: no. of user reference lines
172 :param track: reference coordinates in track space (histology track)
173 :type track: np.array((n_lines + 2))
174 :param extend_feature: amount to extend extreme coordinates before applying linear fit
175 :type extend_feature: float
176 :return feature: reference coordinates in feature space with first and last value adjusted
177 :type feature: np.array((n_lines + 2))
178 :return track: reference coordinates in track space with first and last value adjusted
179 :type track: np.array((n_lines + 2))
180 """
182 feature[0] = self.track_init[0] - extend_feature 1c
183 feature[-1] = self.track_init[-1] + extend_feature 1c
184 extend_track = self.feature2track_lin(feature[[0, -1]], feature, track) 1c
185 track[0] = extend_track[0] 1c
186 track[-1] = extend_track[-1] 1c
187 return feature, track 1c
189 def scale_histology_regions(self, feature, track, region=None, region_label=None):
190 """
191 Recompute locations of brain region boundaries using interpolated fit based on reference
192 lines
193 :param feature: reference coordinates in feature space (ephys plots)
194 :type feature: np.array((n_lines + 2)) n_lines: no. of user reference lines
195 :param track: reference coordinates in track space (histology track)
196 :type track: np.array((n_lines + 2))
197 :return region: new coordinates of histology boundaries after applying interpolation
198 :type region: np.array((n_bound, 2)) n_bound: no. of histology boundaries
199 :return region_label: new coordinates of histology labels positions after applying
200 interpolation
201 :type region_label: np.array((n_bound)) of tuples (coordinate - float, label - str)
202 """
203 region = np.copy(region) if region is not None else np.copy(self.region) 1ced
204 region_label = np.copy(region_label) if region_label is not None else np.copy(self.region_label) 1ced
205 region = self.track2feature(region, feature, track) * 1e6 1ced
206 region_label[:, 0] = (self.track2feature(np.float64(region_label[:, 0]), feature, 1ced
207 track) * 1e6)
208 return region, region_label 1ced
210 @staticmethod
211 def get_histology_regions(xyz_coords, depth_coords, brain_atlas=None, mapping=None):
212 """
213 Find all brain regions and their boundaries along the depth of probe or track
214 :param xyz_coords: 3D coordinates of points along probe or track
215 :type xyz_coords: np.array((n_points, 3)) n_points: no. of points
216 :param depth_coords: depth along probe or track where each xyz_coord is located
217 :type depth_coords: np.array((n_points))
218 :return region: coordinates bounding each brain region
219 :type region: np.array((n_bound, 2)) n_bound: no. of histology boundaries
220 :return region_label: label for each brain region and coordinate of where to place label
221 :type region_label: np.array((n_bound)) of tuples (coordinate - float, label - str)
222 :return region_colour: allen atlas rgb colour for each brain region along track
223 :type region_colour: np.array((n_bound, 3))
224 :return region_id: allen atlas id for each brain region along track
225 :type region_id: np.array((n_bound))
226 """
227 if not brain_atlas: 1ab
228 brain_atlas = atlas.AllenAtlas(25)
230 region_ids = brain_atlas.get_labels(xyz_coords, mapping=mapping) 1ab
231 region_info = brain_atlas.regions.get(region_ids) 1ab
232 boundaries = np.where(np.diff(region_info.id))[0] 1ab
233 region = np.empty((boundaries.size + 1, 2)) 1ab
234 region_label = np.empty((boundaries.size + 1, 2), dtype=object) 1ab
235 region_id = np.empty((boundaries.size + 1, 1), dtype=int) 1ab
236 region_colour = np.empty((boundaries.size + 1, 3), dtype=int) 1ab
237 for bound in np.arange(boundaries.size + 1): 1ab
238 if bound == 0: 1ab
239 _region = np.array([0, boundaries[bound]]) 1ab
240 elif bound == boundaries.size: 1ab
241 _region = np.array([boundaries[bound - 1], region_info.id.size - 1]) 1ab
242 else:
243 _region = np.array([boundaries[bound - 1], boundaries[bound]]) 1ab
244 _region_colour = region_info.rgb[_region[1]] 1ab
245 _region_label = region_info.acronym[_region[1]] 1ab
246 _region_id = region_info.id[_region[1]] 1ab
247 _region = depth_coords[_region] 1ab
248 _region_mean = np.mean(_region) 1ab
249 region[bound, :] = _region 1ab
250 region_colour[bound, :] = _region_colour 1ab
251 region_id[bound, :] = _region_id 1ab
252 region_label[bound, :] = (_region_mean, _region_label) 1ab
254 return region, region_label, region_colour, region_id 1ab
256 @staticmethod
257 def get_nearest_boundary(xyz_coords, allen, extent=100, steps=8, parent=True,
258 brain_atlas=None):
259 """
260 Finds distance to closest neighbouring brain region along trajectory. For each point in
261 xyz_coords computes the plane passing through point and perpendicular to trajectory and
262 finds all brain regions that lie in that plane up to a given distance extent from specified
263 point. Additionally, if requested, computes distance between the parents of regions.
264 :param xyz_coords: 3D coordinates of points along probe or track
265 :type xyz_coords: np.array((n_points, 3)) n_points: no. of points
266 :param allen: dataframe containing allen info. Loaded from allen_structure_tree in
267 ibllib/atlas
268 :type allen: pandas Dataframe
269 :param extent: extent of plane in each direction from origin in (um)
270 :type extent: float
271 :param steps: no. of steps to discretise plane into
272 :type steps: int
273 :param parent: Whether to also compute nearest distance between parents of regions
274 :type parent: bool
275 :return nearest_bound: dict containing results
276 :type nearest_bound: dict
277 """
278 if not brain_atlas:
279 brain_atlas = atlas.AllenAtlas(25)
281 vector = atlas.Insertion.from_track(xyz_coords, brain_atlas=brain_atlas).trajectory.vector
282 nearest_bound = dict()
283 nearest_bound['dist'] = np.zeros((xyz_coords.shape[0]))
284 nearest_bound['id'] = np.zeros((xyz_coords.shape[0]))
285 # nearest_bound['adj_id'] = np.zeros((xyz_coords.shape[0]))
286 nearest_bound['col'] = []
288 if parent:
289 nearest_bound['parent_dist'] = np.zeros((xyz_coords.shape[0]))
290 nearest_bound['parent_id'] = np.zeros((xyz_coords.shape[0]))
291 # nearest_bound['parent_adj_id'] = np.zeros((xyz_coords.shape[0]))
292 nearest_bound['parent_col'] = []
294 for iP, point in enumerate(xyz_coords):
295 d = np.dot(vector, point)
296 x_vals = np.r_[np.linspace(point[0] - extent / 1e6, point[0] + extent / 1e6, steps),
297 point[0]]
298 y_vals = np.r_[np.linspace(point[1] - extent / 1e6, point[1] + extent / 1e6, steps),
299 point[1]]
301 X, Y = np.meshgrid(x_vals, y_vals)
302 Z = (d - vector[0] * X - vector[1] * Y) / vector[2]
303 XYZ = np.c_[np.reshape(X, X.size), np.reshape(Y, Y.size), np.reshape(Z, Z.size)]
304 dist = np.sqrt(np.sum((XYZ - point) ** 2, axis=1))
306 try:
307 brain_id = brain_atlas.regions.get(brain_atlas.get_labels(XYZ))['id']
308 except Exception as err:
309 print(err)
310 continue
312 dist_sorted = np.argsort(dist)
313 brain_id_sorted = brain_id[dist_sorted]
314 nearest_bound['id'][iP] = brain_id_sorted[0]
315 nearest_bound['col'].append(allen['color_hex_triplet'][np.where(allen['id'] ==
316 brain_id_sorted[0])[0][0]])
317 bound_idx = np.where(brain_id_sorted != brain_id_sorted[0])[0]
318 if np.any(bound_idx):
319 nearest_bound['dist'][iP] = dist[dist_sorted[bound_idx[0]]] * 1e6
320 # nearest_bound['adj_id'][iP] = brain_id_sorted[bound_idx[0]]
321 else:
322 nearest_bound['dist'][iP] = np.max(dist) * 1e6
323 # nearest_bound['adj_id'][iP] = brain_id_sorted[0]
325 if parent:
326 # Now compute for the parents
327 brain_parent = np.array([allen['parent_structure_id'][np.where(allen['id'] == br)
328 [0][0]] for br in brain_id_sorted])
329 brain_parent[np.isnan(brain_parent)] = 0
331 nearest_bound['parent_id'][iP] = brain_parent[0]
332 nearest_bound['parent_col'].append(allen['color_hex_triplet']
333 [np.where(allen['id'] ==
334 brain_parent[0])[0][0]])
336 parent_idx = np.where(brain_parent != brain_parent[0])[0]
337 if np.any(parent_idx):
338 nearest_bound['parent_dist'][iP] = dist[dist_sorted[parent_idx[0]]] * 1e6
339 # nearest_bound['parent_adj_id'][iP] = brain_parent[parent_idx[0]]
340 else:
341 nearest_bound['parent_dist'][iP] = np.max(dist) * 1e6
342 # nearest_bound['parent_adj_id'][iP] = brain_parent[0]
344 return nearest_bound
346 @staticmethod
347 def arrange_into_regions(depth_coords, region_ids, distance, region_colours):
348 """
349 Arrange output from get_nearest_boundary into a form that can be plot using pyqtgraph or
350 matplotlib
351 :param depth_coords: depth along probe or track where each point is located
352 :type depth_coords: np.array((n_points))
353 :param region_ids: brain region id at each depth along probe
354 :type regions_ids: np.array((n_points))
355 :param distance: distance to nearest boundary in plane at each point
356 :type distance: np.array((n_points))
357 :param region_colours: allen atlas hex colour for each region id
358 :type region_colours: list of strings len(n_points)
359 :return all_x: dist values for each region along probe track
360 :type all_x: list of np.array
361 :return all_y: depth values for each region along probe track
362 :type all_y: list of np.array
363 :return all_colour: colour assigned to each region along probe track
364 :type all_colour: list of str
365 """
367 boundaries = np.where(np.diff(region_ids))[0]
368 bound = np.r_[0, boundaries + 1, region_ids.shape[0]]
369 all_y = []
370 all_x = []
371 all_colour = []
372 for iB in np.arange(len(bound) - 1):
373 y = depth_coords[bound[iB]:(bound[iB + 1])]
374 y = np.r_[y[0], y, y[-1]]
375 x = distance[bound[iB]:(bound[iB + 1])]
376 x = np.r_[0, x, 0]
377 all_y.append(y)
378 all_x.append(x)
379 col = region_colours[bound[iB]]
380 if not isinstance(col, str):
381 col = '#FFFFFF'
382 else:
383 col = '#' + col
384 all_colour.append(col)
386 return all_x, all_y, all_colour
388 def get_scale_factor(self, region, region_orig=None):
389 """
390 Find how much each brain region has been scaled following interpolation
391 :param region: scaled histology boundaries
392 :type region: np.array((n_bound, 2)) n_bound: no. of histology boundaries
393 :return scaled_region: regions that have unique scaling applied
394 :type scaled_region: np.array((n_scale, 2)) n_scale: no. of uniquely scaled regions
395 :return scale_factor: scale factor applied to each scaled region
396 :type scale_factor: np.array((n_scale))
397 """
399 region_orig = region_orig if region_orig is not None else self.region 1ced
400 scale = [] 1ced
401 for iR, (reg, reg_orig) in enumerate(zip(region, region_orig * 1e6)): 1ced
402 scale = np.r_[scale, (reg[1] - reg[0]) / (reg_orig[1] - reg_orig[0])] 1ced
403 boundaries = np.where(np.diff(np.around(scale, 3)))[0] 1ced
404 if boundaries.size == 0: 1ced
405 scaled_region = np.array([[region[0][0], region[-1][1]]]) 1e
406 scale_factor = np.unique(scale) 1e
407 else:
408 scaled_region = np.empty((boundaries.size + 1, 2)) 1cd
409 scale_factor = [] 1cd
410 for bound in np.arange(boundaries.size + 1): 1cd
411 if bound == 0: 1cd
412 _scaled_region = np.array([region[0][0], 1cd
413 region[boundaries[bound]][1]])
414 _scale_factor = scale[0] 1cd
415 elif bound == boundaries.size: 1cd
416 _scaled_region = np.array([region[boundaries[bound - 1]][1], 1cd
417 region[-1][1]])
418 _scale_factor = scale[-1] 1cd
419 else:
420 _scaled_region = np.array([region[boundaries[bound - 1]][1], 1cd
421 region[boundaries[bound]][1]])
422 _scale_factor = scale[boundaries[bound]] 1cd
423 scaled_region[bound, :] = _scaled_region 1cd
424 scale_factor = np.r_[scale_factor, _scale_factor] 1cd
425 return scaled_region, scale_factor 1ced
427 def get_channel_locations(self, feature, track, depths=None):
428 """
429 Gets 3d coordinates from a depth along the electrophysiology feature. 2 steps
430 1) interpolate from the electrophys features depths space to the probe depth space
431 2) interpolate from the probe depth space to the true 3D coordinates
432 if depths is not provided, defaults to channels local coordinates depths
433 """
434 if depths is None: 1bgef
435 depths = self.chn_depths / 1e6 1b
436 # nb using scipy here so we can change to cubic spline if needed
437 channel_depths_track = self.feature2track(depths, feature, track) - self.track_extent[0] 1bgef
438 xyz_channels = histology.interpolate_along_track(self.xyz_track, channel_depths_track) 1bgef
439 return xyz_channels 1bgef
441 def get_brain_locations(self, xyz_channels):
442 """
443 Finds the brain regions from 3D coordinates of electrode locations
444 :param xyz_channels: 3D coordinates of electrodes on probe
445 :type xyz_channels: np.array((n_elec, 3)) n_elec: no. of electrodes (384)
446 :return brain_regions: brain region object for each electrode
447 :type dict
448 """
449 brain_regions = self.brain_atlas.regions.get(self.brain_atlas.get_labels(xyz_channels)) 1bf
450 return brain_regions 1bf
452 def get_perp_vector(self, feature, track):
453 """
454 Finds the perpendicular vector along the trajectory at the depth of reference lines
455 :param feature: reference coordinates in feature space (ephys plots)
456 :type feature: np.array((n_lines + 2)) n_lines: no. of user reference lines
457 :param track: reference coordinates in track space (histology track)
458 :type track: np.array((n_line+2))
459 :return slice_lines: coordinates of perpendicular lines
460 :type slice_lines: np.array((n_lines, 2))
461 """
463 slice_lines = []
464 for line in feature[1:-1]:
465 depths = np.array([line, line + 10 / 1e6])
466 xyz = self.get_channel_locations(feature, track, depths)
468 extent = 500e-6
469 vector = np.diff(xyz, axis=0)[0]
470 point = xyz[0, :]
471 vector_perp = np.array([1, 0, -1 * vector[0] / vector[2]])
472 xyz_per = np.r_[[point + (-1 * extent * vector_perp)],
473 [point + (extent * vector_perp)]]
474 slice_lines.append(xyz_per)
476 return slice_lines