xref: /aosp_15_r20/external/scapy/test/tls/travis_test_client.py (revision 7dc08ffc4802948ccbc861daaf1e81c405c2c4bd)
1*7dc08ffcSJunyu Lai#!/usr/bin/env python
2*7dc08ffcSJunyu Lai
3*7dc08ffcSJunyu Lai## This file is part of Scapy
4*7dc08ffcSJunyu Lai## This program is published under a GPLv2 license
5*7dc08ffcSJunyu Lai
6*7dc08ffcSJunyu Lai"""
7*7dc08ffcSJunyu LaiTLS client used in unit tests.
8*7dc08ffcSJunyu Lai
9*7dc08ffcSJunyu LaiStart our TLS client, send our send_data, and terminate session with an Alert.
10*7dc08ffcSJunyu LaiOptional cipher_cuite_code and version may be provided as hexadecimal strings
11*7dc08ffcSJunyu Lai(e.g. c09e for TLS_DHE_RSA_WITH_AES_128_CCM and 0303 for TLS 1.2).
12*7dc08ffcSJunyu LaiReception of the exact send_data on the server is to be checked externally.
13*7dc08ffcSJunyu Lai"""
14*7dc08ffcSJunyu Lai
15*7dc08ffcSJunyu Laiimport sys, os, time
16*7dc08ffcSJunyu Laiimport multiprocessing
17*7dc08ffcSJunyu Lai
18*7dc08ffcSJunyu Laibasedir = os.path.abspath(os.path.join(os.path.dirname(__file__),"../../"))
19*7dc08ffcSJunyu Laisys.path=[basedir]+sys.path
20*7dc08ffcSJunyu Lai
21*7dc08ffcSJunyu Laifrom scapy.layers.tls.automaton_cli import TLSClientAutomaton
22*7dc08ffcSJunyu Laifrom scapy.layers.tls.handshake import TLSClientHello
23*7dc08ffcSJunyu Lai
24*7dc08ffcSJunyu Lai
25*7dc08ffcSJunyu Laisend_data = cipher_suite_code = version = None
26*7dc08ffcSJunyu Lai
27*7dc08ffcSJunyu Laidef run_tls_test_client(send_data=None, cipher_suite_code=None, version=None):
28*7dc08ffcSJunyu Lai    if version == "0002":
29*7dc08ffcSJunyu Lai        t = TLSClientAutomaton(data=[send_data, b"stop_server", b"quit"], version="sslv2")
30*7dc08ffcSJunyu Lai    else:
31*7dc08ffcSJunyu Lai        ch = TLSClientHello(version=int(version, 16), ciphers=int(cipher_suite_code, 16))
32*7dc08ffcSJunyu Lai        t = TLSClientAutomaton(client_hello=ch, data=[send_data, b"stop_server", b"quit"], debug=1)
33*7dc08ffcSJunyu Lai    t.run()
34*7dc08ffcSJunyu Lai
35*7dc08ffcSJunyu Laifrom travis_test_server import run_tls_test_server
36*7dc08ffcSJunyu Lai
37*7dc08ffcSJunyu Laidef test_tls_client(suite, version, q):
38*7dc08ffcSJunyu Lai    msg = ("TestC_%s_data" % suite).encode()
39*7dc08ffcSJunyu Lai    # Run server
40*7dc08ffcSJunyu Lai    q_ = multiprocessing.Manager().Queue()
41*7dc08ffcSJunyu Lai    th_ = multiprocessing.Process(target=run_tls_test_server, args=(msg, q_))
42*7dc08ffcSJunyu Lai    th_.start()
43*7dc08ffcSJunyu Lai    # Synchronise threads
44*7dc08ffcSJunyu Lai    assert q_.get() is True
45*7dc08ffcSJunyu Lai    time.sleep(1)
46*7dc08ffcSJunyu Lai    # Run client
47*7dc08ffcSJunyu Lai    run_tls_test_client(msg, suite, version)
48*7dc08ffcSJunyu Lai    # Wait for server
49*7dc08ffcSJunyu Lai    th_.join(60)
50*7dc08ffcSJunyu Lai    if th_.is_alive():
51*7dc08ffcSJunyu Lai        th_.terminate()
52*7dc08ffcSJunyu Lai        raise RuntimeError("Test timed out")
53*7dc08ffcSJunyu Lai    # Return values
54*7dc08ffcSJunyu Lai    q.put(q_.get())
55*7dc08ffcSJunyu Lai    q.put(th_.exitcode)
56