1*7dc08ffcSJunyu Lai#!/usr/bin/env python 2*7dc08ffcSJunyu Lai 3*7dc08ffcSJunyu Lai## This file is part of Scapy 4*7dc08ffcSJunyu Lai## This program is published under a GPLv2 license 5*7dc08ffcSJunyu Lai 6*7dc08ffcSJunyu Lai""" 7*7dc08ffcSJunyu LaiTLS client used in unit tests. 8*7dc08ffcSJunyu Lai 9*7dc08ffcSJunyu LaiStart our TLS client, send our send_data, and terminate session with an Alert. 10*7dc08ffcSJunyu LaiOptional cipher_cuite_code and version may be provided as hexadecimal strings 11*7dc08ffcSJunyu Lai(e.g. c09e for TLS_DHE_RSA_WITH_AES_128_CCM and 0303 for TLS 1.2). 12*7dc08ffcSJunyu LaiReception of the exact send_data on the server is to be checked externally. 13*7dc08ffcSJunyu Lai""" 14*7dc08ffcSJunyu Lai 15*7dc08ffcSJunyu Laiimport sys, os, time 16*7dc08ffcSJunyu Laiimport multiprocessing 17*7dc08ffcSJunyu Lai 18*7dc08ffcSJunyu Laibasedir = os.path.abspath(os.path.join(os.path.dirname(__file__),"../../")) 19*7dc08ffcSJunyu Laisys.path=[basedir]+sys.path 20*7dc08ffcSJunyu Lai 21*7dc08ffcSJunyu Laifrom scapy.layers.tls.automaton_cli import TLSClientAutomaton 22*7dc08ffcSJunyu Laifrom scapy.layers.tls.handshake import TLSClientHello 23*7dc08ffcSJunyu Lai 24*7dc08ffcSJunyu Lai 25*7dc08ffcSJunyu Laisend_data = cipher_suite_code = version = None 26*7dc08ffcSJunyu Lai 27*7dc08ffcSJunyu Laidef run_tls_test_client(send_data=None, cipher_suite_code=None, version=None): 28*7dc08ffcSJunyu Lai if version == "0002": 29*7dc08ffcSJunyu Lai t = TLSClientAutomaton(data=[send_data, b"stop_server", b"quit"], version="sslv2") 30*7dc08ffcSJunyu Lai else: 31*7dc08ffcSJunyu Lai ch = TLSClientHello(version=int(version, 16), ciphers=int(cipher_suite_code, 16)) 32*7dc08ffcSJunyu Lai t = TLSClientAutomaton(client_hello=ch, data=[send_data, b"stop_server", b"quit"], debug=1) 33*7dc08ffcSJunyu Lai t.run() 34*7dc08ffcSJunyu Lai 35*7dc08ffcSJunyu Laifrom travis_test_server import run_tls_test_server 36*7dc08ffcSJunyu Lai 37*7dc08ffcSJunyu Laidef test_tls_client(suite, version, q): 38*7dc08ffcSJunyu Lai msg = ("TestC_%s_data" % suite).encode() 39*7dc08ffcSJunyu Lai # Run server 40*7dc08ffcSJunyu Lai q_ = multiprocessing.Manager().Queue() 41*7dc08ffcSJunyu Lai th_ = multiprocessing.Process(target=run_tls_test_server, args=(msg, q_)) 42*7dc08ffcSJunyu Lai th_.start() 43*7dc08ffcSJunyu Lai # Synchronise threads 44*7dc08ffcSJunyu Lai assert q_.get() is True 45*7dc08ffcSJunyu Lai time.sleep(1) 46*7dc08ffcSJunyu Lai # Run client 47*7dc08ffcSJunyu Lai run_tls_test_client(msg, suite, version) 48*7dc08ffcSJunyu Lai # Wait for server 49*7dc08ffcSJunyu Lai th_.join(60) 50*7dc08ffcSJunyu Lai if th_.is_alive(): 51*7dc08ffcSJunyu Lai th_.terminate() 52*7dc08ffcSJunyu Lai raise RuntimeError("Test timed out") 53*7dc08ffcSJunyu Lai # Return values 54*7dc08ffcSJunyu Lai q.put(q_.get()) 55*7dc08ffcSJunyu Lai q.put(th_.exitcode) 56