#!/usr/bin/env python3 # # Copyright (C) 2022 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # A tool that can feed data into GNSS HAL over virtio-console # This tool can be invoked as gnss_replay.py -s -i # where is the host-side of a UNIX domain socket that backs # virtio-console GNSS vport and is a log file generated by # GnssLogger or equivalent tooling import argparse import sys import socket import time class DataStore(object): def __init__(self, prefix, lines): self.idx = 0 self.lines = [] for line in lines: if line.startswith(prefix): self.lines.append(line.strip('\n')) def next(self): line = '' if len(self.lines) > 0: line = self.lines[self.idx] self.idx = (self.idx + 1) % len(self.lines) return line def __str__(self): return str(self.lines) def load(input): with open(input) as f: lines = f.readlines() return {'locations' : DataStore('Fix', lines), 'measurements' : DataStore('Raw', lines)} class CommandLoop(object): def __init__(self, socket): self.socket = socket self.commands = {} def command(self, action, reaction): self.commands[action] = reaction def loop(self): while True: inp = self.socket.recv(128) reaction = self.commands.get(inp, None) if reaction: response = reaction() + '\n\n\n\n' self.socket.send(str.encode(response)) else: print("Unknown command '%s'" % inp) self.socket.send(b'\n\n\n\n') def main(): parser = argparse.ArgumentParser(description="GNSS mock host agent") parser.add_argument('--socket', '-s', action='store', required=True, help='The UNIX socket to interact with') parser.add_argument('--input', '-i', action='store', required=True, help='The file with fix information to send over') args = parser.parse_args() sk = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET) sk.connect(args.socket) data = load(args.input) loop = CommandLoop(sk) loop.command(b'CMD_GET_LOCATION', lambda: data['locations'].next()) loop.command(b'CMD_GET_RAWMEASUREMENT', lambda: data['measurements'].next()) loop.loop() if __name__ == "__main__": main()