Coverage for ibllib/qc/base.py: 92%
138 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-11 11:13 +0100
1import logging
2from abc import abstractmethod
3from pathlib import Path
4from itertools import chain
6import numpy as np
8from one.api import ONE
9from one.alf.spec import is_session_path, is_uuid_string
11"""dict: custom sign off categories"""
12SIGN_OFF_CATEGORIES = {'neuropixel': ['raw', 'spike_sorting', 'alignment']}
14"""dict: Map for comparing QC outcomes"""
15CRITERIA = {'CRITICAL': 4,
16 'FAIL': 3,
17 'WARNING': 2,
18 'PASS': 1,
19 'NOT_SET': 0
20 }
23class QC:
24 """A base class for data quality control"""
26 def __init__(self, endpoint_id, one=None, log=None, endpoint='sessions'):
27 """
28 :param endpoint_id: Eid for endpoint. If using sessions can also be a session path
29 :param log: A logging.Logger instance, if None the 'ibllib' logger is used
30 :param one: An ONE instance for fetching and setting the QC on Alyx
31 :param endpoint: The endpoint name to apply qc to. Default is 'sessions'
32 """
33 self.one = one or ONE() 1bghiejflrdnpqoastuc
34 self.log = log or logging.getLogger(__name__) 1bghiejflrdnpqoastuc
35 if endpoint == 'sessions': 1bghiejflrdnpqoastuc
36 self.endpoint = endpoint 1blrdnpqoastuc
37 self._set_eid_or_path(endpoint_id) 1blrdnpqoastuc
38 self.json = False 1blrdnpqoastuc
39 else:
40 self.endpoint = endpoint 1bghiejfa
41 self._confirm_endpoint_id(endpoint_id) 1bghiejfa
42 self.json = True 1bghiejfa
44 # Ensure outcome attribute matches Alyx record
45 updatable = self.eid and self.one and not self.one.offline 1bghiejflrdnpqoastuc
46 self._outcome = self.update('NOT_SET', namespace='') if updatable else 'NOT_SET' 1bghiejflrdnpqoastuc
47 self.log.debug(f'Current QC status is {self.outcome}') 1bghiejflrdnpqoastuc
49 @abstractmethod
50 def run(self):
51 """Run the QC tests and return the outcome
52 :return: One of "CRITICAL", "FAIL", "WARNING" or "PASS"
53 """
54 pass
56 @abstractmethod
57 def load_data(self):
58 """Load the data required to compute the QC
59 Subclasses may implement this for loading raw data
60 """
61 pass
63 @property
64 def outcome(self):
65 return self._outcome 1bghiejflmrdnpqoastukc
67 @outcome.setter
68 def outcome(self, value):
69 value = value.upper() # Ensure outcome is uppercase 1bldnpqoc
70 if value not in CRITERIA: 1bldnpqoc
71 raise ValueError('Invalid outcome; must be one of ' + ', '.join(CRITERIA.keys())) 1l
72 if CRITERIA[self._outcome] < CRITERIA[value]: 1bldnpqoc
73 self._outcome = value 1l
75 @staticmethod
76 def overall_outcome(outcomes: iter, agg=max) -> str:
77 """
78 Given an iterable of QC outcomes, returns the overall (i.e. worst) outcome.
80 Example:
81 QC.overall_outcome(['PASS', 'NOT_SET', None, 'FAIL']) # Returns 'FAIL'
83 :param outcomes: An iterable of QC outcomes
84 :param agg: outcome code aggregate function, default is max (i.e. worst)
85 :return: The overall outcome string
86 """
87 outcomes = filter(lambda x: x or (isinstance(x, float) and not np.isnan(x)), outcomes) 1yzABCdnoDac
88 code = agg(CRITERIA.get(x, 0) if isinstance(x, str) else x for x in outcomes) 1yzABCdnoDac
89 return next(k for k, v in CRITERIA.items() if v == code) 1yzABCdnoDac
91 @staticmethod
92 def code_to_outcome(code: int) -> str:
93 """
94 Given an outcome id, returns the corresponding string.
96 Example:
97 QC.overall_outcome(['PASS', 'NOT_SET', None, 'FAIL']) # Returns 'FAIL'
99 :param code: The outcome id
100 :return: The overall outcome string
101 """
102 return next(k for k, v in CRITERIA.items() if v == code) 1E
104 def _set_eid_or_path(self, session_path_or_eid):
105 """Parse a given eID or session path
106 If a session UUID is given, resolves and stores the local path and vice versa
107 :param session_path_or_eid: A session eid or path
108 :return:
109 """
110 self.eid = None 1bxlrdnpqoastuc
111 if is_uuid_string(str(session_path_or_eid)): 1bxlrdnpqoastuc
112 self.eid = session_path_or_eid 1bxl
113 # Try to set session_path if data is found locally
114 self.session_path = self.one.eid2path(self.eid) 1bxl
115 elif is_session_path(session_path_or_eid): 1bxrdnpqoastuc
116 self.session_path = Path(session_path_or_eid) 1bxrdnpqoastuc
117 if self.one is not None: 1bxrdnpqoastuc
118 self.eid = self.one.path2eid(self.session_path) 1bxrdnpqoastuc
119 if not self.eid: 1bxrdnpqoastuc
120 self.log.warning('Failed to determine eID from session path') 1brnpqostuc
121 else:
122 self.log.error('Cannot run QC: an experiment uuid or session path is required') 1x
123 raise ValueError("'session' must be a valid session path or uuid") 1x
125 def _confirm_endpoint_id(self, endpoint_id):
126 # Have as read for now since 'list' isn't working
127 target_obj = self.one.alyx.get(f'/{self.endpoint}/{endpoint_id}', clobber=True) or None 1bghiejfa
128 if target_obj: 1bghiejfa
129 self.eid = endpoint_id 1bghiejfa
130 json_field = target_obj.get('json') 1bghiejfa
131 if not json_field: 1bghiejfa
132 self.one.alyx.json_field_update(endpoint=self.endpoint, uuid=self.eid,
133 field_name='json', data={'qc': 'NOT_SET',
134 'extended_qc': {}})
135 elif not json_field.get('qc', None): 1ghiejfa
136 self.one.alyx.json_field_update(endpoint=self.endpoint, uuid=self.eid, 1ghi
137 field_name='json', data={'qc': 'NOT_SET',
138 'extended_qc': {}})
139 else:
140 self.log.error('Cannot run QC: endpoint id is not recognised')
141 raise ValueError("'endpoint_id' must be a valid uuid")
143 def update(self, outcome=None, namespace='experimenter', override=False):
144 """Update the qc field in Alyx
145 Updates the 'qc' field in Alyx if the new QC outcome is worse than the current value.
146 :param outcome: A string; one of "CRITICAL", "FAIL", "WARNING", "PASS" or "NOT_SET"
147 :param namespace: The extended QC key specifying the type of QC associated with the outcome
148 :param override: If True the QC field is updated even if new value is better than previous
149 :return: The current QC outcome str on Alyx
151 Example:
152 qc = QC('path/to/session')
153 qc.update('PASS') # Update current QC field to 'PASS' if not set
154 """
155 assert self.one, "instance of one should be provided" 1bghiejflmdakc
156 if self.one.offline: 1bghiejflmdakc
157 self.log.warning('Running on OneOffline instance, unable to update remote QC')
158 return
159 outcome = outcome or self.outcome 1bghiejflmdakc
160 outcome = outcome.upper() # Ensure outcome is uppercase 1bghiejflmdakc
161 if outcome not in CRITERIA: 1bghiejflmdakc
162 raise ValueError('Invalid outcome; must be one of ' + ', '.join(CRITERIA.keys())) 1m
163 assert self.eid, 'Unable to update Alyx; eID not set' 1bghiejflmdakc
164 if namespace: # Record in extended qc 1bghiejflmdakc
165 self.update_extended_qc({namespace: outcome}) 1efmdakc
166 details = self.one.alyx.get(f'/{self.endpoint}/{self.eid}', clobber=True) 1bghiejflmdakc
167 current_status = (details['json'] if self.json else details)['qc'] 1bghiejflmdakc
169 if CRITERIA[current_status] < CRITERIA[outcome] or override: 1bghiejflmdakc
170 r = self.one.alyx.json_field_update(endpoint=self.endpoint, uuid=self.eid, 1efmdakc
171 field_name='json', data={'qc': outcome}) \
172 if self.json else self.one.alyx.rest(self.endpoint, 'partial_update', id=self.eid,
173 data={'qc': outcome})
175 current_status = r['qc'].upper() 1efmdakc
176 assert current_status == outcome, 'Failed to update session QC' 1efmdakc
177 self.log.info(f'QC field successfully updated to {outcome} for {self.endpoint[:-1]} ' 1efmdakc
178 f'{self.eid}')
179 self._outcome = current_status 1bghiejflmdakc
180 return self.outcome 1bghiejflmdakc
182 def update_extended_qc(self, data):
183 """Update the extended_qc field in Alyx
184 Subclasses should chain a call to this.
185 :param data: a dict of qc tests and their outcomes, typically a value between 0. and 1.
186 :return: the updated extended_qc field
187 """
188 assert self.eid, 'Unable to update Alyx; eID not set' 1ghiejfwmdakc
189 assert self.one, "instance of one should be provided" 1ghiejfwmdakc
190 if self.one.offline: 1ghiejfwmdakc
191 self.log.warning('Running on OneOffline instance, unable to update remote QC')
192 return
194 # Ensure None instead of NaNs
195 for k, v in data.items(): 1ghiejfwmdakc
196 if v is not None and not isinstance(v, str): 1ghiejfwmdakc
197 if isinstance(v, tuple): 1ghiejfwdakc
198 data[k] = tuple(None if not isinstance(i, str) and np.isnan(i) else i for i in v) 1dac
199 else:
200 data[k] = None if np.isnan(v).all() else v 1ghiejfwdakc
202 details = self.one.alyx.get(f'/{self.endpoint}/{self.eid}', clobber=True) 1ghiejfwmdakc
203 if self.json: 1ghiejfwmdakc
204 extended_qc = details['json']['extended_qc'] or {} 1ghiejfa
205 extended_qc.update(data) 1ghiejfa
206 extended_qc_dict = {'extended_qc': extended_qc} 1ghiejfa
207 out = self.one.alyx.json_field_update( 1ghiejfa
208 endpoint=self.endpoint, uuid=self.eid, field_name='json', data=extended_qc_dict)
209 else:
210 extended_qc = details['extended_qc'] or {} 1wmdakc
211 extended_qc.update(data) 1wmdakc
212 out = self.one.alyx.json_field_update( 1wmdakc
213 endpoint=self.endpoint, uuid=self.eid, field_name='extended_qc', data=extended_qc)
215 self.log.info(f'Extended QC field successfully updated for {self.endpoint[:-1]} ' 1ghiejfwmdakc
216 f'{self.eid}')
217 return out 1ghiejfwmdakc
219 def compute_outcome_from_extended_qc(self) -> str:
220 """
221 Returns the session outcome computed from aggregating the extended QC
222 """
223 details = self.one.alyx.get(f'/{self.endpoint}/{self.eid}', clobber=True)
224 extended_qc = details['json']['extended_qc'] if self.json else details['extended_qc']
225 return self.overall_outcome(v for k, v in extended_qc or {} if k[0] != '_')
228def sign_off_dict(exp_dec, sign_off_categories=None):
229 """
230 Creates a dict containing 'sign off' keys for each device and task protocol in the provided
231 experiment description.
233 Parameters
234 ----------
235 exp_dec : dict
236 A loaded experiment description file.
237 sign_off_categories : dict of list
238 A dictionary of custom JSON keys for a given device in the acquisition description file.
240 Returns
241 -------
242 dict of dict
243 The sign off dictionary with the main key 'sign_off_checklist' containing keys for each
244 device and task protocol.
245 """
246 # Note this assumes devices each contain a dict of dicts
247 # e.g. {'devices': {'DAQ_1': {'device_1': {}, 'device_2': {}},}
248 sign_off_categories = sign_off_categories or SIGN_OFF_CATEGORIES 1vdac
249 sign_off_keys = set() 1vdac
250 for k, v in exp_dec.get('devices', {}).items(): 1vdac
251 assert isinstance(v, dict) and v 1vdac
252 if len(v.keys()) == 1 and next(iter(v.keys())) == k: 1vdac
253 if k in sign_off_categories: 1vdac
254 for subkey in sign_off_categories[k]: 1v
255 sign_off_keys.add(f'{k}_{subkey}') 1v
256 else:
257 sign_off_keys.add(k) 1vdac
258 else:
259 for kk in v.keys(): 1vdac
260 if k in sign_off_categories: 1vdac
261 for subkey in sign_off_categories[k]: 1va
262 sign_off_keys.add(f'{k}_{subkey}_{kk}') 1va
263 else:
264 sign_off_keys.add(f'{k}_{kk}') 1vdac
266 # Add keys for each protocol
267 for i, v in enumerate(chain(*map(dict.keys, exp_dec.get('tasks', [])))): 1vdac
268 sign_off_keys.add(f'{v}_{i:02}') 1vdac
270 return {'sign_off_checklist': dict.fromkeys(map(lambda x: f'_{x}', sign_off_keys))} 1vdac