#!/usr/bin/env python3 # # Copyright (c) 2022, The OpenThread Authors. # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # 3. Neither the name of the copyright holder nor the # names of its contributors may be used to endorse or promote products # derived from this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. # import argparse from concurrent import futures import enum import fcntl import grpc import logging import os import signal import socket import subprocess import tempfile import threading import time import pcap_codec from proto import sniffer_pb2 from proto import sniffer_pb2_grpc import sniffer_transport class CaptureState(enum.Flag): NONE = 0 THREAD = enum.auto() ETHERNET = enum.auto() class SnifferServicer(sniffer_pb2_grpc.Sniffer): """ Class representing the Sniffing node, whose main task is listening. """ logger = logging.getLogger('sniffer.SnifferServicer') RECV_BUFFER_SIZE = 4096 TIMEOUT = 0.1 def _reset(self): self._state = CaptureState.NONE self._pcap = None self._denied_nodeids = None self._transport = None self._thread = None self._thread_alive.clear() self._file_sync_done.clear() self._tshark_proc = None def __init__(self, max_nodes_num): self._max_nodes_num = max_nodes_num self._thread_alive = threading.Event() self._file_sync_done = threading.Event() self._nodeids_mutex = threading.Lock() # for `self._denied_nodeids` self._reset() def Start(self, request, context): """ Start sniffing. """ self.logger.debug('call Start') # Validate and change the state if self._state != CaptureState.NONE: return sniffer_pb2.StartResponse(status=sniffer_pb2.OPERATION_ERROR) self._state = CaptureState.THREAD # Create a temporary named pipe tempdir = tempfile.mkdtemp() fifo_name = os.path.join(tempdir, 'pcap.fifo') os.mkfifo(fifo_name) cmd = ['tshark', '-i', fifo_name] if request.includeEthernet: self._state |= CaptureState.ETHERNET cmd += ['-i', 'docker0'] cmd += ['-w', '-', '-q', 'not ip and not tcp and not arp and not ether proto 0x8899'] self.logger.debug('Running command: %s', ' '.join(cmd)) self._tshark_proc = subprocess.Popen(cmd, stdout=subprocess.PIPE) self._set_nonblocking(self._tshark_proc.stdout.fileno()) # Construct pcap codec after initiating tshark to avoid blocking self._pcap = pcap_codec.PcapCodec(request.channel, fifo_name) # Sniffer all nodes in default, i.e. there is no RF enclosure self._denied_nodeids = set() # Create transport transport_factory = sniffer_transport.SnifferTransportFactory() self._transport = transport_factory.create_transport() # Start the sniffer main loop thread self._thread = threading.Thread(target=self._sniffer_main_loop) self._thread.setDaemon(True) self._transport.open() self._thread_alive.set() self._thread.start() return sniffer_pb2.StartResponse(status=sniffer_pb2.OK) def _sniffer_main_loop(self): """ Sniffer main loop. """ while self._thread_alive.is_set(): try: data, nodeid = self._transport.recv(self.RECV_BUFFER_SIZE, self.TIMEOUT) except socket.timeout: continue with self._nodeids_mutex: denied_nodeids = self._denied_nodeids # Equivalent to RF enclosure if nodeid not in denied_nodeids: self._pcap.append(data) def TransferPcapng(self, request, context): """ Transfer the capture file. """ # Validate the state if self._state == CaptureState.NONE: return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.OPERATION_ERROR) # Synchronize the capture file while True: content = self._tshark_proc.stdout.read() if content is None: # Currently no captured packets time.sleep(self.TIMEOUT) elif content == b'': # Reach EOF when tshark terminates break else: # Forward the captured packets yield sniffer_pb2.TransferPcapngResponse(content=content) self._file_sync_done.set() def _set_nonblocking(self, fd): flags = fcntl.fcntl(fd, fcntl.F_GETFL) if flags < 0: raise RuntimeError('fcntl(F_GETFL) failed') flags |= os.O_NONBLOCK if fcntl.fcntl(fd, fcntl.F_SETFL, flags) < 0: raise RuntimeError('fcntl(F_SETFL) failed') def FilterNodes(self, request, context): """ Only sniffer the specified nodes. """ self.logger.debug('call FilterNodes') # Validate the state if not (self._state & CaptureState.THREAD): return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.OPERATION_ERROR) denied_nodeids = set(request.nodeids) # Validate the node IDs for nodeid in denied_nodeids: if not 1 <= nodeid <= self._max_nodes_num: return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.VALUE_ERROR) with self._nodeids_mutex: self._denied_nodeids = denied_nodeids return sniffer_pb2.FilterNodesResponse(status=sniffer_pb2.OK) def Stop(self, request, context): """ Stop sniffing, and return the pcap bytes. """ self.logger.debug('call Stop') # Validate and change the state if not (self._state & CaptureState.THREAD): return sniffer_pb2.StopResponse(status=sniffer_pb2.OPERATION_ERROR) self._state = CaptureState.NONE self._thread_alive.clear() self._thread.join() self._transport.close() self._pcap.close() self._tshark_proc.terminate() self._file_sync_done.wait() # `self._tshark_proc` becomes None after the next statement self._tshark_proc.wait() self._reset() return sniffer_pb2.StopResponse(status=sniffer_pb2.OK) def serve(address_port, max_nodes_num): # One worker is used for `Start`, `FilterNodes` and `Stop` # The other worker is used for `TransferPcapng`, which will be kept running by the client in a background thread server = grpc.server(futures.ThreadPoolExecutor(max_workers=2)) sniffer_pb2_grpc.add_SnifferServicer_to_server(SnifferServicer(max_nodes_num), server) # add_secure_port requires a web domain server.add_insecure_port(address_port) logging.info('server starts on %s', address_port) server.start() def exit_handler(signum, context): server.stop(1) signal.signal(signal.SIGINT, exit_handler) signal.signal(signal.SIGTERM, exit_handler) server.wait_for_termination() def run_sniffer(): logging.basicConfig(level=logging.INFO) parser = argparse.ArgumentParser() parser.add_argument('--grpc-server', dest='grpc_server', type=str, required=True, help='the address of the sniffer server') parser.add_argument('--max-nodes-num', dest='max_nodes_num', type=int, required=True, help='the maximum number of nodes') args = parser.parse_args() serve(args.grpc_server, args.max_nodes_num) if __name__ == '__main__': run_sniffer()