xref: /aosp_15_r20/external/pigweed/pw_sensor/py/pw_sensor/sensor_desc.py (revision 61c4878ac05f98d0ceed94b57d316916de578985)
1# Copyright 2024 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""CLI to describe a yaml sensor definition."""
15
16import argparse
17from collections.abc import Sequence
18from dataclasses import dataclass
19import importlib.resources
20import logging
21from pathlib import Path
22import subprocess
23import sys
24
25import jsonschema  # type: ignore
26import jsonschema.exceptions  # type: ignore
27from pw_sensor.validator import Validator
28import yaml
29
30logging.basicConfig(level=logging.DEBUG)
31_LOG = logging.getLogger("sensor-describe")
32
33_OUTPUT_SCHEMA = yaml.safe_load(
34    importlib.resources.read_text("pw_sensor", "resolved_schema.json")
35)
36
37
38@dataclass
39class Args:
40    """Strongly typed wrapper around the arguments provided"""
41
42    include_paths: Sequence[Path]
43    descriptor_paths: Sequence[Path]
44    generator_command: str | None
45    output_file: Path | None
46    log_level: int = logging.WARNING
47
48
49def get_args() -> Args:
50    """
51    Setup the argument parser, parse the args, and return a dataclass with the
52    arguments.
53    """
54    parser = argparse.ArgumentParser()
55    parser.add_argument(
56        "--include-path",
57        "-I",
58        action="append",
59        type=lambda p: Path(p).resolve(),
60        required=True,
61        help="Directories in which to search for dependency files",
62    )
63    parser.add_argument(
64        "--verbose",
65        "-v",
66        action="count",
67        default=0,
68        help="Increase verbosity level (can be used multiple times)",
69    )
70    parser.add_argument(
71        "--generator",
72        "-g",
73        type=str,
74        help="Generator command to run along with any flags. Data will be "
75        "passed into the generator as YAML through stdin",
76    )
77    parser.add_argument(
78        "-o",
79        dest="output",
80        type=Path,
81        help="Write output to file instead of standard out",
82    )
83    parser.add_argument(
84        "descriptors",
85        nargs="*",
86        type=lambda p: Path(p).resolve(),
87        help="One or more files to validate",
88    )
89
90    args = parser.parse_args()
91    if args.verbose == 0:
92        log_level = logging.WARNING
93    elif args.verbose == 1:
94        log_level = logging.INFO
95    else:
96        log_level = logging.DEBUG
97    return Args(
98        include_paths=args.include_path,
99        descriptor_paths=args.descriptors,
100        generator_command=args.generator,
101        output_file=args.output,
102        log_level=log_level,
103    )
104
105
106def main() -> None:
107    """
108    Main entry point to the CLI. After parsing the arguments for the below
109    parameters, the utility will validate the descriptor files and pass the
110    output to the generator:
111    - include paths
112    - verbosity
113    - generator
114    - descriptor files
115    """
116    args = get_args()
117    _LOG.setLevel(level=args.log_level)
118
119    validator = Validator(
120        include_paths=args.include_paths, log_level=args.log_level
121    )
122    superset: dict = {
123        "attributes": {},
124        "channels": {},
125        "triggers": {},
126        "units": {},
127        "sensors": {},
128    }
129    for descriptor_file in args.descriptor_paths:
130        _LOG.info("Loading '%s'", descriptor_file)
131        if not descriptor_file.is_file():
132            raise RuntimeError(f"'{descriptor_file}' is not a file")
133        with open(descriptor_file, mode="r", encoding="utf-8") as stream:
134            content = yaml.safe_load(stream=stream)
135            _LOG.debug("Validating:\n%s", yaml.safe_dump(content, indent=2))
136            content = validator.validate(content)
137            _LOG.debug("Result:\n%s", yaml.safe_dump(content, indent=2))
138        # Add sensor
139        for sensor_id, values in content["sensors"].items():
140            assert superset["sensors"].get(sensor_id) is None
141            superset["sensors"][sensor_id] = values
142        # Add channels
143        for chan_id, chan_spec in content["channels"].items():
144            conflict = superset["channels"].get(chan_id)
145            assert conflict is None or conflict == chan_spec
146            superset["channels"][chan_id] = chan_spec
147        # Add attributes
148        for attr_id, attr_spec in content["attributes"].items():
149            conflict = superset["attributes"].get(attr_id)
150            assert conflict is None or conflict == attr_spec
151            superset["attributes"][attr_id] = attr_spec
152        # Add triggers
153        for trig_id, trig_spec in content["triggers"].items():
154            conflict = superset["triggers"].get(trig_id)
155            assert conflict is None or conflict == trig_spec
156            superset["triggers"][trig_id] = trig_spec
157        # Add units
158        for units_id, units_spec in content["units"].items():
159            conflict = superset["units"].get(units_id)
160            assert conflict is None or conflict == units_spec
161            superset["units"][units_id] = units_spec
162
163    _LOG.debug("Final descriptor:\n%s", yaml.safe_dump(superset, indent=2))
164    _LOG.info("Validating...")
165    try:
166        jsonschema.validate(instance=superset, schema=_OUTPUT_SCHEMA)
167    except jsonschema.exceptions.ValidationError as e:
168        raise RuntimeError(
169            "ERROR: Malformed merged output:\n"
170            f"{yaml.safe_dump(superset, indent=2)}"
171        ) from e
172    content_string = yaml.safe_dump(superset)
173
174    if args.generator_command:
175        cmd = args.generator_command.split(sep=" ")
176        _LOG.info("Running generator %s", cmd)
177
178        with subprocess.Popen(
179            cmd,
180            stdin=subprocess.PIPE,
181            stdout=subprocess.PIPE,
182            stderr=subprocess.PIPE,
183        ) as process:
184            assert process.stdin is not None
185            process.stdin.write(content_string.encode("utf-8"))
186            out, err = process.communicate()
187
188        if out:
189            if args.output_file:
190                args.output_file.parent.mkdir(parents=True, exist_ok=True)
191                with open(args.output_file, mode="w", encoding="utf-8") as o:
192                    o.write(out.decode("utf-8"))
193            else:
194                print(out.decode("utf-8"))
195        if err:
196            _LOG.error(err.decode("utf-8"))
197        if process.returncode != 0:
198            sys.exit(-1)
199    else:
200        print(content_string)
201
202
203if __name__ == '__main__':
204    main()
205