1import fhirspec_pb2 2from typing import Collection, Mapping 3 4# LINT.IfChange(fhir_resource_type_mapping) 5RESOURCE_TYPE_STRING_TO_HC_INT_MAPPING = { 6 "Immunization": 1, 7 "AllergyIntolerance": 2, 8 "Observation": 3, 9 "Condition": 4, 10 "Procedure": 5, 11 "Medication": 6, 12 "MedicationRequest": 7, 13 "MedicationStatement": 8, 14 "Patient": 9, 15 "Practitioner": 10, 16 "PractitionerRole": 11, 17 "Encounter": 12, 18 "Location": 13, 19 "Organization": 14, 20} 21# LINT.ThenChange(/framework/java/android/health/connect/datatypes/FhirResource.java) 22 23HC_SUPPORTED_RESOURCE_SET = set(RESOURCE_TYPE_STRING_TO_HC_INT_MAPPING.keys()) 24 25FHIR_VERSION_R4 = "4.0.1" 26 27 28class FhirSpecExtractor: 29 """Extractor for getting information for HC FHIR validation from official FHIR spec json files. 30 31 Typical usage example: 32 extractor = new FhirSpecExtractor(profile_resources_json, {"Immunization", "Observation"}) 33 fhir_spec_message = extractor.generate_r4_fhir_spec_proto_message() 34 """ 35 36 def __init__(self, profile_resources_json: Mapping, resource_names: set[str]): 37 """Extracts StructureDefinitions for the requested resources from the provided fhir spec. 38 39 Args: 40 profile_resources_json: The contents of the profile-resources.json fhir spec file, which 41 is in the structure of a https://hl7.org/fhir/Bundle.html, parsed to dict. The 42 Bundle.entry will contain the list of https://hl7.org/fhir/StructureDefinition.html that 43 we are interested in. 44 resource_names: The set of FHIR resources to extract FHIR spec information for. 45 46 Raises: 47 ValueError: If a requested resource is not present in the spec, if it's not supported by 48 Health Connect, or if any spec values are not as expected. 49 """ 50 if not resource_names.issubset(HC_SUPPORTED_RESOURCE_SET): 51 raise ValueError("Provided resource set was not a subset of supported resources") 52 53 # A mapping from the resource name to the list of field definitions, which are in the 54 # structure of https://hl7.org/fhir/ElementDefinition.html 55 self._resource_to_element_definitions = ( 56 self._extract_element_definitions_by_resource_from_spec( 57 profile_resources_json, resource_names)) 58 59 def generate_r4_fhir_spec_proto_message(self) -> fhirspec_pb2.FhirResourceSpec: 60 """Generates a FhirResourceSpec message from the fhir json spec. 61 62 Returns: 63 The FhirResourceSpec message, with an entry for each requested resource. 64 """ 65 # TODO: b/360091651 - Extract additional information such as field types, cardinality and 66 # structure of each type. Note that the field "Observation.component.referenceRange" will 67 # need special handling. It doesn't have a type, but a contentReference to 68 # "Observation.referenceRange" and should use that type's structure. 69 70 r4_resource_spec = fhirspec_pb2.FhirResourceSpec() 71 72 for resource, element_definitions in self._resource_to_element_definitions.items(): 73 resource_type_int = RESOURCE_TYPE_STRING_TO_HC_INT_MAPPING[resource] 74 75 resource_data_type_config = ( 76 self._generate_fhir_data_type_config_from_element_definitions(element_definitions)) 77 78 r4_resource_spec.resource_type_to_config[ 79 resource_type_int].CopyFrom(resource_data_type_config) 80 81 return r4_resource_spec 82 83 def _extract_element_definitions_by_resource_from_spec( 84 self, profile_resources_json: Mapping, resource_names: set[str]) -> Mapping: 85 resource_to_element_definitions = {} 86 # For each StructureDefinition that matches a resource in resource_names, we extract 87 # the list of ElementDefinitions. Each ElementDefinition contains the spec for a path / 88 # field of the resource. 89 for entry in profile_resources_json["entry"]: 90 fullUrl = entry["fullUrl"] 91 if not (fullUrl.startswith("http://hl7.org/fhir/StructureDefinition/") and 92 fullUrl.split("/")[-1] in resource_names): 93 continue 94 95 resource_name = fullUrl.split("/")[-1] 96 resource_structure_definition = entry["resource"] 97 98 # Do some assertions on expected values 99 if resource_structure_definition["fhirVersion"] != FHIR_VERSION_R4: 100 raise ValueError("Unexpected fhir version found") 101 if resource_structure_definition["kind"] != "resource": 102 raise ValueError("Unexpected kind field in structure definition") 103 if resource_structure_definition["type"] != resource_name: 104 raise ValueError("Unexpected resource type in structure definition") 105 106 # We select the list of elements in "snapshot" (as opposed to "differential"), as we 107 # want the full definition of fields, including fields from any base definitions. 108 resource_to_element_definitions[resource_name] = ( 109 resource_structure_definition)["snapshot"]["element"] 110 111 if set(resource_to_element_definitions.keys()) != resource_names: 112 raise ValueError("Did not find resource definitions for all requested resources.") 113 114 return resource_to_element_definitions 115 116 def _generate_fhir_data_type_config_from_element_definitions( 117 self, element_definitions: Collection[Mapping]) -> fhirspec_pb2.FhirDataTypeConfig: 118 required_fields = set() 119 120 multi_type_configs = [] 121 122 field_configs_by_name = {} 123 # Manually add resourceType field, as this is not present in the spec 124 field_configs_by_name["resourceType"] = fhirspec_pb2.FhirFieldConfig( 125 is_array=False, 126 r4_type=fhirspec_pb2.R4FhirType.R4_FHIR_TYPE_STRING, 127 kind=fhirspec_pb2.Kind.KIND_PRIMITIVE_TYPE 128 ) 129 130 for element in element_definitions: 131 field_id = element["id"] 132 if field_id != element["path"]: 133 raise ValueError("Expected id and path field to be the same") 134 field_parts = field_id.split(".") 135 field_parts_length = len(field_parts) 136 137 if field_parts_length == 1: 138 # This is the path to the element itself. For example for the Observation resource, 139 # There will be an ElementDefinition with id "Observation" 140 continue 141 142 elif field_parts_length == 2: 143 # This is a "regular" nested field, e.g. Immunization.status, so we extract the 144 # field configs 145 field_name = field_parts[1] 146 field_configs_to_add, multi_type_config = ( 147 self._generate_field_configs_and_multi_type_config_from_field_element( 148 element, field_name)) 149 for name in field_configs_to_add: 150 if name in field_configs_by_name: raise ValueError("Field name already exists") 151 152 field_configs_by_name.update(field_configs_to_add) 153 if self.field_name_is_multi_type_field(field_name): 154 multi_type_configs.append(multi_type_config) 155 elif self._field_is_required(element): 156 required_fields.add(field_name) 157 158 elif field_parts_length > 2: 159 # This means the field is part of a BackBoneElement. For an example see the 160 # https://hl7.org/fhir/Immunization.html "reaction" field. 161 # BackBoneElements need to be handled separately, as those fields don't have a type 162 # defined, but have the BackBoneElement definition instead. 163 # Note that the following field contains a double backbone element, which we need to 164 # consider: "MedicationRequest.dispenseRequest.initialFill", 165 166 # For now we are just recording the top level allowed field, which has its own 167 # element definition, so is covered by the "elif field_parts_length == 2" above 168 continue 169 170 else: 171 raise ValueError("This should not happen") 172 173 return fhirspec_pb2.FhirDataTypeConfig( 174 allowed_field_names_to_config=field_configs_by_name, 175 # Sort the list of required fields alphabetically, as the output of this script is part 176 # of the build, which needs to be deterministic. The required_fields come from a set, 177 # which does not have ordering guarantees. 178 required_fields=sorted(required_fields), 179 multi_type_fields=multi_type_configs 180 ) 181 182 def _generate_field_configs_and_multi_type_config_from_field_element( 183 self, element_definition, field_name) -> (Mapping[str, fhirspec_pb2.FhirFieldConfig], 184 list[fhirspec_pb2.MultiTypeFieldConfig]): 185 field_is_array = self._field_is_array(element_definition) 186 187 field_configs_by_name = {} 188 189 multi_type_config = None 190 191 # If the field is a multi type field, it means one of several types can be set. An example 192 # is the field Immunization.occurrence, which has types "string" and "dateTime" and 193 # therefore means the fields "occurrenceString" and "occurrenceDateTime" are allowed. We 194 # therefore expand the field name with each defined type. 195 if self.field_name_is_multi_type_field(field_name): 196 if field_is_array: 197 raise ValueError( 198 "Unexpected cardinality for type choice field. Did not expect array.") 199 200 multi_type_fields = [] 201 for data_type in element_definition["type"]: 202 field_with_type = self._get_multi_type_name_for_type(field_name, data_type["code"]) 203 type_enum, kind_enum = self._get_type_and_kind_enum_from_type(data_type["code"]) 204 field_configs_by_name[field_with_type] = fhirspec_pb2.FhirFieldConfig( 205 is_array=False, 206 r4_type=type_enum, 207 kind=kind_enum 208 ) 209 multi_type_fields.append(field_with_type) 210 211 multi_type_config = fhirspec_pb2.MultiTypeFieldConfig( 212 name=field_name, 213 typed_field_names=multi_type_fields, 214 is_required=self._field_is_required(element_definition) 215 ) 216 217 else: 218 if len(element_definition["type"]) != 1: 219 raise ValueError("Expected exactly one type") 220 type_code = element_definition["type"][0]["code"] 221 type_enum, kind_enum = self._get_type_and_kind_enum_from_type(type_code) 222 field_configs_by_name[field_name] = fhirspec_pb2.FhirFieldConfig( 223 is_array=field_is_array, 224 r4_type=type_enum, 225 kind=kind_enum 226 ) 227 228 return field_configs_by_name, multi_type_config 229 230 def field_name_is_multi_type_field(self, field_name) -> bool: 231 """Returns true if the field is a oneof / type choice field, which can be contains several 232 data types. 233 234 This is the case if the field name ends with "[x]" and means that one of several types can 235 be set. 236 """ 237 238 return field_name.endswith("[x]") 239 240 def _get_multi_type_name_for_type(self, field_name, type_code) -> bool: 241 """Returns the one of field name for a specific type. 242 243 For example for the field name "occurrence[x]" and type "dateTime" this will return 244 "occurrenceDateTime". 245 """ 246 247 return field_name[:-3] + type_code[0].upper() + type_code[1:] 248 249 def _field_is_required(self, element_definition) -> bool: 250 """Returns true if the field is required 251 252 FHIR fields can have the following cardinalities: 253 - 0..1, meaning the field is optional 254 - 1..1, meaning the field is required 255 - 0..*, meaning the field is an optional array 256 - 1..*, meaning the field is a required array 257 """ 258 259 min = element_definition["min"] 260 261 if min not in [0, 1]: 262 raise ValueError("Unexpected min cardinality value: " + min) 263 264 return min 265 266 def _field_is_array(self, element_definition) -> bool: 267 """Returns true if the field should be an array 268 269 FHIR fields can have the following cardinalities: 270 - 0..1, meaning the field is optional 271 - 1..1, meaning the field is required 272 - 0..*, meaning the field is an optional array 273 - 1..*, meaning the field is a required array 274 """ 275 276 max = element_definition["max"] 277 278 if max == "1": 279 return False 280 elif max == "*": 281 return True 282 else: 283 raise ValueError("Unexpected max cardinality value: " + max) 284 285 def _get_type_and_kind_enum_from_type(self, type_code: str): 286 # "id" fields usually have a type containing the following type code and extension 287 # https://hl7.org/fhir/extensions/StructureDefinition-structuredefinition-fhir-type.html 288 if type_code == "http://hl7.org/fhirpath/System.String": 289 return (fhirspec_pb2.R4FhirType.R4_FHIR_TYPE_SYSTEM_STRING, 290 fhirspec_pb2.Kind.KIND_PRIMITIVE_TYPE) 291 292 data_type = fhirspec_pb2.R4FhirType.Value( 293 self._convert_type_string_to_enum_string(type_code)) 294 kind = fhirspec_pb2.Kind.KIND_PRIMITIVE_TYPE \ 295 if self._is_primitive_type(type_code) else fhirspec_pb2.Kind.KIND_COMPLEX_TYPE 296 297 return data_type, kind 298 299 def _convert_type_string_to_enum_string(self, type_string: str) -> str: 300 if not type_string.isalpha(): 301 raise ValueError("Unexpected characters found in type_string: " + type_string) 302 303 # TODO: b/361775175 - Extract all fhir types individually instead of combining non-primitive 304 # types to COMPLEX enum value. 305 if not self._is_primitive_type(type_string): 306 return "R4_FHIR_TYPE_COMPLEX" 307 308 snake_case_type_string = type_string[0].upper() + "".join( 309 [c if c.islower() else "_" + c for c in type_string[1:]]) 310 311 return "R4_FHIR_TYPE_" + snake_case_type_string.upper() 312 313 def _is_primitive_type(self, type_string: str) -> bool: 314 # See https://hl7.org/fhir/R4/datatypes.html for possible types. 315 # TODO: b/361775175 - Read this from the type definitions file instead of inferring from the 316 # name 317 return type_string[0].islower() and type_string != "xhtml" 318