Coverage for ibllib/qc/base.py: 92%

138 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-10-11 11:13 +0100

1import logging 

2from abc import abstractmethod 

3from pathlib import Path 

4from itertools import chain 

5 

6import numpy as np 

7 

8from one.api import ONE 

9from one.alf.spec import is_session_path, is_uuid_string 

10 

11"""dict: custom sign off categories""" 

12SIGN_OFF_CATEGORIES = {'neuropixel': ['raw', 'spike_sorting', 'alignment']} 

13 

14"""dict: Map for comparing QC outcomes""" 

15CRITERIA = {'CRITICAL': 4, 

16 'FAIL': 3, 

17 'WARNING': 2, 

18 'PASS': 1, 

19 'NOT_SET': 0 

20 } 

21 

22 

23class QC: 

24 """A base class for data quality control""" 

25 

26 def __init__(self, endpoint_id, one=None, log=None, endpoint='sessions'): 

27 """ 

28 :param endpoint_id: Eid for endpoint. If using sessions can also be a session path 

29 :param log: A logging.Logger instance, if None the 'ibllib' logger is used 

30 :param one: An ONE instance for fetching and setting the QC on Alyx 

31 :param endpoint: The endpoint name to apply qc to. Default is 'sessions' 

32 """ 

33 self.one = one or ONE() 1bghiejflrdnpqoastuc

34 self.log = log or logging.getLogger(__name__) 1bghiejflrdnpqoastuc

35 if endpoint == 'sessions': 1bghiejflrdnpqoastuc

36 self.endpoint = endpoint 1blrdnpqoastuc

37 self._set_eid_or_path(endpoint_id) 1blrdnpqoastuc

38 self.json = False 1blrdnpqoastuc

39 else: 

40 self.endpoint = endpoint 1bghiejfa

41 self._confirm_endpoint_id(endpoint_id) 1bghiejfa

42 self.json = True 1bghiejfa

43 

44 # Ensure outcome attribute matches Alyx record 

45 updatable = self.eid and self.one and not self.one.offline 1bghiejflrdnpqoastuc

46 self._outcome = self.update('NOT_SET', namespace='') if updatable else 'NOT_SET' 1bghiejflrdnpqoastuc

47 self.log.debug(f'Current QC status is {self.outcome}') 1bghiejflrdnpqoastuc

48 

49 @abstractmethod 

50 def run(self): 

51 """Run the QC tests and return the outcome 

52 :return: One of "CRITICAL", "FAIL", "WARNING" or "PASS" 

53 """ 

54 pass 

55 

56 @abstractmethod 

57 def load_data(self): 

58 """Load the data required to compute the QC 

59 Subclasses may implement this for loading raw data 

60 """ 

61 pass 

62 

63 @property 

64 def outcome(self): 

65 return self._outcome 1bghiejflmrdnpqoastukc

66 

67 @outcome.setter 

68 def outcome(self, value): 

69 value = value.upper() # Ensure outcome is uppercase 1bldnpqoc

70 if value not in CRITERIA: 1bldnpqoc

71 raise ValueError('Invalid outcome; must be one of ' + ', '.join(CRITERIA.keys())) 1l

72 if CRITERIA[self._outcome] < CRITERIA[value]: 1bldnpqoc

73 self._outcome = value 1l

74 

75 @staticmethod 

76 def overall_outcome(outcomes: iter, agg=max) -> str: 

77 """ 

78 Given an iterable of QC outcomes, returns the overall (i.e. worst) outcome. 

79 

80 Example: 

81 QC.overall_outcome(['PASS', 'NOT_SET', None, 'FAIL']) # Returns 'FAIL' 

82 

83 :param outcomes: An iterable of QC outcomes 

84 :param agg: outcome code aggregate function, default is max (i.e. worst) 

85 :return: The overall outcome string 

86 """ 

87 outcomes = filter(lambda x: x or (isinstance(x, float) and not np.isnan(x)), outcomes) 1yzABCdnoDac

88 code = agg(CRITERIA.get(x, 0) if isinstance(x, str) else x for x in outcomes) 1yzABCdnoDac

89 return next(k for k, v in CRITERIA.items() if v == code) 1yzABCdnoDac

90 

91 @staticmethod 

92 def code_to_outcome(code: int) -> str: 

93 """ 

94 Given an outcome id, returns the corresponding string. 

95 

96 Example: 

97 QC.overall_outcome(['PASS', 'NOT_SET', None, 'FAIL']) # Returns 'FAIL' 

98 

99 :param code: The outcome id 

100 :return: The overall outcome string 

101 """ 

102 return next(k for k, v in CRITERIA.items() if v == code) 1E

103 

104 def _set_eid_or_path(self, session_path_or_eid): 

105 """Parse a given eID or session path 

106 If a session UUID is given, resolves and stores the local path and vice versa 

107 :param session_path_or_eid: A session eid or path 

108 :return: 

109 """ 

110 self.eid = None 1bxlrdnpqoastuc

111 if is_uuid_string(str(session_path_or_eid)): 1bxlrdnpqoastuc

112 self.eid = session_path_or_eid 1bxl

113 # Try to set session_path if data is found locally 

114 self.session_path = self.one.eid2path(self.eid) 1bxl

115 elif is_session_path(session_path_or_eid): 1bxrdnpqoastuc

116 self.session_path = Path(session_path_or_eid) 1bxrdnpqoastuc

117 if self.one is not None: 1bxrdnpqoastuc

118 self.eid = self.one.path2eid(self.session_path) 1bxrdnpqoastuc

119 if not self.eid: 1bxrdnpqoastuc

120 self.log.warning('Failed to determine eID from session path') 1brnpqostuc

121 else: 

122 self.log.error('Cannot run QC: an experiment uuid or session path is required') 1x

123 raise ValueError("'session' must be a valid session path or uuid") 1x

124 

125 def _confirm_endpoint_id(self, endpoint_id): 

126 # Have as read for now since 'list' isn't working 

127 target_obj = self.one.alyx.get(f'/{self.endpoint}/{endpoint_id}', clobber=True) or None 1bghiejfa

128 if target_obj: 1bghiejfa

129 self.eid = endpoint_id 1bghiejfa

130 json_field = target_obj.get('json') 1bghiejfa

131 if not json_field: 1bghiejfa

132 self.one.alyx.json_field_update(endpoint=self.endpoint, uuid=self.eid, 

133 field_name='json', data={'qc': 'NOT_SET', 

134 'extended_qc': {}}) 

135 elif not json_field.get('qc', None): 1ghiejfa

136 self.one.alyx.json_field_update(endpoint=self.endpoint, uuid=self.eid, 1ghi

137 field_name='json', data={'qc': 'NOT_SET', 

138 'extended_qc': {}}) 

139 else: 

140 self.log.error('Cannot run QC: endpoint id is not recognised') 

141 raise ValueError("'endpoint_id' must be a valid uuid") 

142 

143 def update(self, outcome=None, namespace='experimenter', override=False): 

144 """Update the qc field in Alyx 

145 Updates the 'qc' field in Alyx if the new QC outcome is worse than the current value. 

146 :param outcome: A string; one of "CRITICAL", "FAIL", "WARNING", "PASS" or "NOT_SET" 

147 :param namespace: The extended QC key specifying the type of QC associated with the outcome 

148 :param override: If True the QC field is updated even if new value is better than previous 

149 :return: The current QC outcome str on Alyx 

150 

151 Example: 

152 qc = QC('path/to/session') 

153 qc.update('PASS') # Update current QC field to 'PASS' if not set 

154 """ 

155 assert self.one, "instance of one should be provided" 1bghiejflmdakc

156 if self.one.offline: 1bghiejflmdakc

157 self.log.warning('Running on OneOffline instance, unable to update remote QC') 

158 return 

159 outcome = outcome or self.outcome 1bghiejflmdakc

160 outcome = outcome.upper() # Ensure outcome is uppercase 1bghiejflmdakc

161 if outcome not in CRITERIA: 1bghiejflmdakc

162 raise ValueError('Invalid outcome; must be one of ' + ', '.join(CRITERIA.keys())) 1m

163 assert self.eid, 'Unable to update Alyx; eID not set' 1bghiejflmdakc

164 if namespace: # Record in extended qc 1bghiejflmdakc

165 self.update_extended_qc({namespace: outcome}) 1efmdakc

166 details = self.one.alyx.get(f'/{self.endpoint}/{self.eid}', clobber=True) 1bghiejflmdakc

167 current_status = (details['json'] if self.json else details)['qc'] 1bghiejflmdakc

168 

169 if CRITERIA[current_status] < CRITERIA[outcome] or override: 1bghiejflmdakc

170 r = self.one.alyx.json_field_update(endpoint=self.endpoint, uuid=self.eid, 1efmdakc

171 field_name='json', data={'qc': outcome}) \ 

172 if self.json else self.one.alyx.rest(self.endpoint, 'partial_update', id=self.eid, 

173 data={'qc': outcome}) 

174 

175 current_status = r['qc'].upper() 1efmdakc

176 assert current_status == outcome, 'Failed to update session QC' 1efmdakc

177 self.log.info(f'QC field successfully updated to {outcome} for {self.endpoint[:-1]} ' 1efmdakc

178 f'{self.eid}') 

179 self._outcome = current_status 1bghiejflmdakc

180 return self.outcome 1bghiejflmdakc

181 

182 def update_extended_qc(self, data): 

183 """Update the extended_qc field in Alyx 

184 Subclasses should chain a call to this. 

185 :param data: a dict of qc tests and their outcomes, typically a value between 0. and 1. 

186 :return: the updated extended_qc field 

187 """ 

188 assert self.eid, 'Unable to update Alyx; eID not set' 1ghiejfwmdakc

189 assert self.one, "instance of one should be provided" 1ghiejfwmdakc

190 if self.one.offline: 1ghiejfwmdakc

191 self.log.warning('Running on OneOffline instance, unable to update remote QC') 

192 return 

193 

194 # Ensure None instead of NaNs 

195 for k, v in data.items(): 1ghiejfwmdakc

196 if v is not None and not isinstance(v, str): 1ghiejfwmdakc

197 if isinstance(v, tuple): 1ghiejfwdakc

198 data[k] = tuple(None if not isinstance(i, str) and np.isnan(i) else i for i in v) 1dac

199 else: 

200 data[k] = None if np.isnan(v).all() else v 1ghiejfwdakc

201 

202 details = self.one.alyx.get(f'/{self.endpoint}/{self.eid}', clobber=True) 1ghiejfwmdakc

203 if self.json: 1ghiejfwmdakc

204 extended_qc = details['json']['extended_qc'] or {} 1ghiejfa

205 extended_qc.update(data) 1ghiejfa

206 extended_qc_dict = {'extended_qc': extended_qc} 1ghiejfa

207 out = self.one.alyx.json_field_update( 1ghiejfa

208 endpoint=self.endpoint, uuid=self.eid, field_name='json', data=extended_qc_dict) 

209 else: 

210 extended_qc = details['extended_qc'] or {} 1wmdakc

211 extended_qc.update(data) 1wmdakc

212 out = self.one.alyx.json_field_update( 1wmdakc

213 endpoint=self.endpoint, uuid=self.eid, field_name='extended_qc', data=extended_qc) 

214 

215 self.log.info(f'Extended QC field successfully updated for {self.endpoint[:-1]} ' 1ghiejfwmdakc

216 f'{self.eid}') 

217 return out 1ghiejfwmdakc

218 

219 def compute_outcome_from_extended_qc(self) -> str: 

220 """ 

221 Returns the session outcome computed from aggregating the extended QC 

222 """ 

223 details = self.one.alyx.get(f'/{self.endpoint}/{self.eid}', clobber=True) 

224 extended_qc = details['json']['extended_qc'] if self.json else details['extended_qc'] 

225 return self.overall_outcome(v for k, v in extended_qc or {} if k[0] != '_') 

226 

227 

228def sign_off_dict(exp_dec, sign_off_categories=None): 

229 """ 

230 Creates a dict containing 'sign off' keys for each device and task protocol in the provided 

231 experiment description. 

232 

233 Parameters 

234 ---------- 

235 exp_dec : dict 

236 A loaded experiment description file. 

237 sign_off_categories : dict of list 

238 A dictionary of custom JSON keys for a given device in the acquisition description file. 

239 

240 Returns 

241 ------- 

242 dict of dict 

243 The sign off dictionary with the main key 'sign_off_checklist' containing keys for each 

244 device and task protocol. 

245 """ 

246 # Note this assumes devices each contain a dict of dicts 

247 # e.g. {'devices': {'DAQ_1': {'device_1': {}, 'device_2': {}},} 

248 sign_off_categories = sign_off_categories or SIGN_OFF_CATEGORIES 1vdac

249 sign_off_keys = set() 1vdac

250 for k, v in exp_dec.get('devices', {}).items(): 1vdac

251 assert isinstance(v, dict) and v 1vdac

252 if len(v.keys()) == 1 and next(iter(v.keys())) == k: 1vdac

253 if k in sign_off_categories: 1vdac

254 for subkey in sign_off_categories[k]: 1v

255 sign_off_keys.add(f'{k}_{subkey}') 1v

256 else: 

257 sign_off_keys.add(k) 1vdac

258 else: 

259 for kk in v.keys(): 1vdac

260 if k in sign_off_categories: 1vdac

261 for subkey in sign_off_categories[k]: 1va

262 sign_off_keys.add(f'{k}_{subkey}_{kk}') 1va

263 else: 

264 sign_off_keys.add(f'{k}_{kk}') 1vdac

265 

266 # Add keys for each protocol 

267 for i, v in enumerate(chain(*map(dict.keys, exp_dec.get('tasks', [])))): 1vdac

268 sign_off_keys.add(f'{v}_{i:02}') 1vdac

269 

270 return {'sign_off_checklist': dict.fromkeys(map(lambda x: f'_{x}', sign_off_keys))} 1vdac