1# Copyright 2024 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""CLI to describe a yaml sensor definition.""" 15 16import argparse 17from collections.abc import Sequence 18from dataclasses import dataclass 19import importlib.resources 20import logging 21from pathlib import Path 22import subprocess 23import sys 24 25import jsonschema # type: ignore 26import jsonschema.exceptions # type: ignore 27from pw_sensor.validator import Validator 28import yaml 29 30logging.basicConfig(level=logging.DEBUG) 31_LOG = logging.getLogger("sensor-describe") 32 33_OUTPUT_SCHEMA = yaml.safe_load( 34 importlib.resources.read_text("pw_sensor", "resolved_schema.json") 35) 36 37 38@dataclass 39class Args: 40 """Strongly typed wrapper around the arguments provided""" 41 42 include_paths: Sequence[Path] 43 descriptor_paths: Sequence[Path] 44 generator_command: str | None 45 output_file: Path | None 46 log_level: int = logging.WARNING 47 48 49def get_args() -> Args: 50 """ 51 Setup the argument parser, parse the args, and return a dataclass with the 52 arguments. 53 """ 54 parser = argparse.ArgumentParser() 55 parser.add_argument( 56 "--include-path", 57 "-I", 58 action="append", 59 type=lambda p: Path(p).resolve(), 60 required=True, 61 help="Directories in which to search for dependency files", 62 ) 63 parser.add_argument( 64 "--verbose", 65 "-v", 66 action="count", 67 default=0, 68 help="Increase verbosity level (can be used multiple times)", 69 ) 70 parser.add_argument( 71 "--generator", 72 "-g", 73 type=str, 74 help="Generator command to run along with any flags. Data will be " 75 "passed into the generator as YAML through stdin", 76 ) 77 parser.add_argument( 78 "-o", 79 dest="output", 80 type=Path, 81 help="Write output to file instead of standard out", 82 ) 83 parser.add_argument( 84 "descriptors", 85 nargs="*", 86 type=lambda p: Path(p).resolve(), 87 help="One or more files to validate", 88 ) 89 90 args = parser.parse_args() 91 if args.verbose == 0: 92 log_level = logging.WARNING 93 elif args.verbose == 1: 94 log_level = logging.INFO 95 else: 96 log_level = logging.DEBUG 97 return Args( 98 include_paths=args.include_path, 99 descriptor_paths=args.descriptors, 100 generator_command=args.generator, 101 output_file=args.output, 102 log_level=log_level, 103 ) 104 105 106def main() -> None: 107 """ 108 Main entry point to the CLI. After parsing the arguments for the below 109 parameters, the utility will validate the descriptor files and pass the 110 output to the generator: 111 - include paths 112 - verbosity 113 - generator 114 - descriptor files 115 """ 116 args = get_args() 117 _LOG.setLevel(level=args.log_level) 118 119 validator = Validator( 120 include_paths=args.include_paths, log_level=args.log_level 121 ) 122 superset: dict = { 123 "attributes": {}, 124 "channels": {}, 125 "triggers": {}, 126 "units": {}, 127 "sensors": {}, 128 } 129 for descriptor_file in args.descriptor_paths: 130 _LOG.info("Loading '%s'", descriptor_file) 131 if not descriptor_file.is_file(): 132 raise RuntimeError(f"'{descriptor_file}' is not a file") 133 with open(descriptor_file, mode="r", encoding="utf-8") as stream: 134 content = yaml.safe_load(stream=stream) 135 _LOG.debug("Validating:\n%s", yaml.safe_dump(content, indent=2)) 136 content = validator.validate(content) 137 _LOG.debug("Result:\n%s", yaml.safe_dump(content, indent=2)) 138 # Add sensor 139 for sensor_id, values in content["sensors"].items(): 140 assert superset["sensors"].get(sensor_id) is None 141 superset["sensors"][sensor_id] = values 142 # Add channels 143 for chan_id, chan_spec in content["channels"].items(): 144 conflict = superset["channels"].get(chan_id) 145 assert conflict is None or conflict == chan_spec 146 superset["channels"][chan_id] = chan_spec 147 # Add attributes 148 for attr_id, attr_spec in content["attributes"].items(): 149 conflict = superset["attributes"].get(attr_id) 150 assert conflict is None or conflict == attr_spec 151 superset["attributes"][attr_id] = attr_spec 152 # Add triggers 153 for trig_id, trig_spec in content["triggers"].items(): 154 conflict = superset["triggers"].get(trig_id) 155 assert conflict is None or conflict == trig_spec 156 superset["triggers"][trig_id] = trig_spec 157 # Add units 158 for units_id, units_spec in content["units"].items(): 159 conflict = superset["units"].get(units_id) 160 assert conflict is None or conflict == units_spec 161 superset["units"][units_id] = units_spec 162 163 _LOG.debug("Final descriptor:\n%s", yaml.safe_dump(superset, indent=2)) 164 _LOG.info("Validating...") 165 try: 166 jsonschema.validate(instance=superset, schema=_OUTPUT_SCHEMA) 167 except jsonschema.exceptions.ValidationError as e: 168 raise RuntimeError( 169 "ERROR: Malformed merged output:\n" 170 f"{yaml.safe_dump(superset, indent=2)}" 171 ) from e 172 content_string = yaml.safe_dump(superset) 173 174 if args.generator_command: 175 cmd = args.generator_command.split(sep=" ") 176 _LOG.info("Running generator %s", cmd) 177 178 with subprocess.Popen( 179 cmd, 180 stdin=subprocess.PIPE, 181 stdout=subprocess.PIPE, 182 stderr=subprocess.PIPE, 183 ) as process: 184 assert process.stdin is not None 185 process.stdin.write(content_string.encode("utf-8")) 186 out, err = process.communicate() 187 188 if out: 189 if args.output_file: 190 args.output_file.parent.mkdir(parents=True, exist_ok=True) 191 with open(args.output_file, mode="w", encoding="utf-8") as o: 192 o.write(out.decode("utf-8")) 193 else: 194 print(out.decode("utf-8")) 195 if err: 196 _LOG.error(err.decode("utf-8")) 197 if process.returncode != 0: 198 sys.exit(-1) 199 else: 200 print(content_string) 201 202 203if __name__ == '__main__': 204 main() 205