1import fhirspec_pb2
2from typing import Collection, Mapping
3
4# LINT.IfChange(fhir_resource_type_mapping)
5RESOURCE_TYPE_STRING_TO_HC_INT_MAPPING = {
6    "Immunization": 1,
7    "AllergyIntolerance": 2,
8    "Observation": 3,
9    "Condition": 4,
10    "Procedure": 5,
11    "Medication": 6,
12    "MedicationRequest": 7,
13    "MedicationStatement": 8,
14    "Patient": 9,
15    "Practitioner": 10,
16    "PractitionerRole": 11,
17    "Encounter": 12,
18    "Location": 13,
19    "Organization": 14,
20}
21# LINT.ThenChange(/framework/java/android/health/connect/datatypes/FhirResource.java)
22
23HC_SUPPORTED_RESOURCE_SET = set(RESOURCE_TYPE_STRING_TO_HC_INT_MAPPING.keys())
24
25FHIR_VERSION_R4 = "4.0.1"
26
27
28class FhirSpecExtractor:
29    """Extractor for getting information for HC FHIR validation from official FHIR spec json files.
30
31    Typical usage example:
32        extractor = new FhirSpecExtractor(profile_resources_json, {"Immunization", "Observation"})
33        fhir_spec_message = extractor.generate_r4_fhir_spec_proto_message()
34    """
35
36    def __init__(self, profile_resources_json: Mapping, resource_names: set[str]):
37        """Extracts StructureDefinitions for the requested resources from the provided fhir spec.
38
39        Args:
40            profile_resources_json: The contents of the profile-resources.json fhir spec file, which
41            is in the structure of a https://hl7.org/fhir/Bundle.html, parsed to dict. The
42            Bundle.entry will contain the list of https://hl7.org/fhir/StructureDefinition.html that
43            we are interested in.
44            resource_names: The set of FHIR resources to extract FHIR spec information for.
45
46        Raises:
47            ValueError: If a requested resource is not present in the spec, if it's not supported by
48            Health Connect, or if any spec values are not as expected.
49        """
50        if not resource_names.issubset(HC_SUPPORTED_RESOURCE_SET):
51            raise ValueError("Provided resource set was not a subset of supported resources")
52
53        # A mapping from the resource name to the list of field definitions, which are in the
54        # structure of https://hl7.org/fhir/ElementDefinition.html
55        self._resource_to_element_definitions = (
56            self._extract_element_definitions_by_resource_from_spec(
57                profile_resources_json, resource_names))
58
59    def generate_r4_fhir_spec_proto_message(self) -> fhirspec_pb2.FhirResourceSpec:
60        """Generates a FhirResourceSpec message from the fhir json spec.
61
62        Returns:
63            The FhirResourceSpec message, with an entry for each requested resource.
64        """
65        # TODO: b/360091651 - Extract additional information such as field types, cardinality and
66        #  structure of each type. Note that the field "Observation.component.referenceRange" will
67        #  need special handling. It doesn't have a type, but a contentReference to
68        #  "Observation.referenceRange" and should use that type's structure.
69
70        r4_resource_spec = fhirspec_pb2.FhirResourceSpec()
71
72        for resource, element_definitions in self._resource_to_element_definitions.items():
73            resource_type_int = RESOURCE_TYPE_STRING_TO_HC_INT_MAPPING[resource]
74
75            resource_data_type_config = (
76                self._generate_fhir_data_type_config_from_element_definitions(element_definitions))
77
78            r4_resource_spec.resource_type_to_config[
79                resource_type_int].CopyFrom(resource_data_type_config)
80
81        return r4_resource_spec
82
83    def _extract_element_definitions_by_resource_from_spec(
84            self, profile_resources_json: Mapping, resource_names: set[str]) -> Mapping:
85        resource_to_element_definitions = {}
86        # For each StructureDefinition that matches a resource in resource_names, we extract
87        # the list of ElementDefinitions. Each ElementDefinition contains the spec for a path /
88        # field of the resource.
89        for entry in profile_resources_json["entry"]:
90            fullUrl = entry["fullUrl"]
91            if not (fullUrl.startswith("http://hl7.org/fhir/StructureDefinition/") and
92                    fullUrl.split("/")[-1] in resource_names):
93                continue
94
95            resource_name = fullUrl.split("/")[-1]
96            resource_structure_definition = entry["resource"]
97
98            # Do some assertions on expected values
99            if resource_structure_definition["fhirVersion"] != FHIR_VERSION_R4:
100                raise ValueError("Unexpected fhir version found")
101            if resource_structure_definition["kind"] != "resource":
102                raise ValueError("Unexpected kind field in structure definition")
103            if resource_structure_definition["type"] != resource_name:
104                raise ValueError("Unexpected resource type in structure definition")
105
106            # We select the list of elements in "snapshot" (as opposed to "differential"), as we
107            # want the full definition of fields, including fields from any base definitions.
108            resource_to_element_definitions[resource_name] = (
109                resource_structure_definition)["snapshot"]["element"]
110
111        if set(resource_to_element_definitions.keys()) != resource_names:
112            raise ValueError("Did not find resource definitions for all requested resources.")
113
114        return resource_to_element_definitions
115
116    def _generate_fhir_data_type_config_from_element_definitions(
117            self, element_definitions: Collection[Mapping]) -> fhirspec_pb2.FhirDataTypeConfig:
118        required_fields = set()
119
120        multi_type_configs = []
121
122        field_configs_by_name = {}
123        # Manually add resourceType field, as this is not present in the spec
124        field_configs_by_name["resourceType"] = fhirspec_pb2.FhirFieldConfig(
125            is_array=False,
126            r4_type=fhirspec_pb2.R4FhirType.R4_FHIR_TYPE_STRING,
127            kind=fhirspec_pb2.Kind.KIND_PRIMITIVE_TYPE
128        )
129
130        for element in element_definitions:
131            field_id = element["id"]
132            if field_id != element["path"]:
133                raise ValueError("Expected id and path field to be the same")
134            field_parts = field_id.split(".")
135            field_parts_length = len(field_parts)
136
137            if field_parts_length == 1:
138                # This is the path to the element itself. For example for the Observation resource,
139                # There will be an ElementDefinition with id "Observation"
140                continue
141
142            elif field_parts_length == 2:
143                # This is a "regular" nested field, e.g. Immunization.status, so we extract the
144                # field configs
145                field_name = field_parts[1]
146                field_configs_to_add, multi_type_config = (
147                    self._generate_field_configs_and_multi_type_config_from_field_element(
148                    element, field_name))
149                for name in field_configs_to_add:
150                    if name in field_configs_by_name: raise ValueError("Field name already exists")
151
152                field_configs_by_name.update(field_configs_to_add)
153                if self.field_name_is_multi_type_field(field_name):
154                    multi_type_configs.append(multi_type_config)
155                elif self._field_is_required(element):
156                    required_fields.add(field_name)
157
158            elif field_parts_length > 2:
159                # This means the field is part of a BackBoneElement. For an example see the
160                # https://hl7.org/fhir/Immunization.html "reaction" field.
161                # BackBoneElements need to be handled separately, as those fields don't have a type
162                # defined, but have the BackBoneElement definition instead.
163                # Note that the following field contains a double backbone element, which we need to
164                # consider: "MedicationRequest.dispenseRequest.initialFill",
165
166                # For now we are just recording the top level allowed field, which has its own
167                # element definition, so is covered by the "elif field_parts_length == 2" above
168                continue
169
170            else:
171                raise ValueError("This should not happen")
172
173        return fhirspec_pb2.FhirDataTypeConfig(
174            allowed_field_names_to_config=field_configs_by_name,
175            # Sort the list of required fields alphabetically, as the output of this script is part
176            # of the build, which needs to be deterministic. The required_fields come from a set,
177            # which does not have ordering guarantees.
178            required_fields=sorted(required_fields),
179            multi_type_fields=multi_type_configs
180        )
181
182    def _generate_field_configs_and_multi_type_config_from_field_element(
183            self, element_definition, field_name) -> (Mapping[str, fhirspec_pb2.FhirFieldConfig],
184                                                      list[fhirspec_pb2.MultiTypeFieldConfig]):
185        field_is_array = self._field_is_array(element_definition)
186
187        field_configs_by_name = {}
188
189        multi_type_config = None
190
191        # If the field is a multi type field, it means one of several types can be set. An example
192        # is the field Immunization.occurrence, which has types "string" and "dateTime" and
193        # therefore means the fields "occurrenceString" and "occurrenceDateTime" are allowed. We
194        # therefore expand the field name with each defined type.
195        if self.field_name_is_multi_type_field(field_name):
196            if field_is_array:
197                raise ValueError(
198                    "Unexpected cardinality for type choice field. Did not expect array.")
199
200            multi_type_fields = []
201            for data_type in element_definition["type"]:
202                field_with_type = self._get_multi_type_name_for_type(field_name, data_type["code"])
203                type_enum, kind_enum = self._get_type_and_kind_enum_from_type(data_type["code"])
204                field_configs_by_name[field_with_type] = fhirspec_pb2.FhirFieldConfig(
205                    is_array=False,
206                    r4_type=type_enum,
207                    kind=kind_enum
208                )
209                multi_type_fields.append(field_with_type)
210
211            multi_type_config = fhirspec_pb2.MultiTypeFieldConfig(
212                name=field_name,
213                typed_field_names=multi_type_fields,
214                is_required=self._field_is_required(element_definition)
215            )
216
217        else:
218            if len(element_definition["type"]) != 1:
219                raise ValueError("Expected exactly one type")
220            type_code = element_definition["type"][0]["code"]
221            type_enum, kind_enum = self._get_type_and_kind_enum_from_type(type_code)
222            field_configs_by_name[field_name] = fhirspec_pb2.FhirFieldConfig(
223                is_array=field_is_array,
224                r4_type=type_enum,
225                kind=kind_enum
226            )
227
228        return field_configs_by_name, multi_type_config
229
230    def field_name_is_multi_type_field(self, field_name) -> bool:
231        """Returns true if the field is a oneof / type choice field, which can be contains several
232        data types.
233
234        This is the case if the field name ends with "[x]" and means that one of several types can
235        be set.
236        """
237
238        return field_name.endswith("[x]")
239
240    def _get_multi_type_name_for_type(self, field_name, type_code) -> bool:
241        """Returns the one of field name for a specific type.
242
243        For example for the field name "occurrence[x]" and type "dateTime" this will return
244        "occurrenceDateTime".
245        """
246
247        return field_name[:-3] + type_code[0].upper() + type_code[1:]
248
249    def _field_is_required(self, element_definition) -> bool:
250        """Returns true if the field is required
251
252        FHIR fields can have the following cardinalities:
253        - 0..1, meaning the field is optional
254        - 1..1, meaning the field is required
255        - 0..*, meaning the field is an optional array
256        - 1..*, meaning the field is a required array
257        """
258
259        min = element_definition["min"]
260
261        if min not in [0, 1]:
262            raise ValueError("Unexpected min cardinality value: " + min)
263
264        return min
265
266    def _field_is_array(self, element_definition) -> bool:
267        """Returns true if the field should be an array
268
269        FHIR fields can have the following cardinalities:
270        - 0..1, meaning the field is optional
271        - 1..1, meaning the field is required
272        - 0..*, meaning the field is an optional array
273        - 1..*, meaning the field is a required array
274        """
275
276        max = element_definition["max"]
277
278        if max == "1":
279            return False
280        elif max == "*":
281            return True
282        else:
283            raise ValueError("Unexpected max cardinality value: " + max)
284
285    def _get_type_and_kind_enum_from_type(self, type_code: str):
286        # "id" fields usually have a type containing the following type code and extension
287        # https://hl7.org/fhir/extensions/StructureDefinition-structuredefinition-fhir-type.html
288        if type_code == "http://hl7.org/fhirpath/System.String":
289            return (fhirspec_pb2.R4FhirType.R4_FHIR_TYPE_SYSTEM_STRING,
290                    fhirspec_pb2.Kind.KIND_PRIMITIVE_TYPE)
291
292        data_type = fhirspec_pb2.R4FhirType.Value(
293            self._convert_type_string_to_enum_string(type_code))
294        kind = fhirspec_pb2.Kind.KIND_PRIMITIVE_TYPE \
295            if self._is_primitive_type(type_code) else fhirspec_pb2.Kind.KIND_COMPLEX_TYPE
296
297        return data_type, kind
298
299    def _convert_type_string_to_enum_string(self, type_string: str) -> str:
300        if not type_string.isalpha():
301            raise ValueError("Unexpected characters found in type_string: " + type_string)
302
303        # TODO: b/361775175 - Extract all fhir types individually instead of combining non-primitive
304        #  types to COMPLEX enum value.
305        if not self._is_primitive_type(type_string):
306            return "R4_FHIR_TYPE_COMPLEX"
307
308        snake_case_type_string = type_string[0].upper() + "".join(
309            [c if c.islower() else "_" + c for c in type_string[1:]])
310
311        return "R4_FHIR_TYPE_" + snake_case_type_string.upper()
312
313    def _is_primitive_type(self, type_string: str) -> bool:
314        # See https://hl7.org/fhir/R4/datatypes.html for possible types.
315        # TODO: b/361775175 - Read this from the type definitions file instead of inferring from the
316        #  name
317        return type_string[0].islower() and type_string != "xhtml"
318