## This file is part of Scapy ## Copyright (C) 2007, 2008, 2009 Arnaud Ebalard ## 2015, 2016, 2017 Maxence Tury ## This program is published under a GPLv2 license """ TLS handshake fields & logic. This module covers the handshake TLS subprotocol, except for the key exchange mechanisms which are addressed with keyexchange.py. """ from __future__ import absolute_import import math from scapy.error import log_runtime, warning from scapy.fields import * from scapy.compat import * from scapy.packet import Packet, Raw, Padding from scapy.utils import repr_hex from scapy.layers.x509 import OCSP_Response from scapy.layers.tls.cert import Cert, PrivKey, PubKey from scapy.layers.tls.basefields import (_tls_version, _TLSVersionField, _TLSClientVersionField) from scapy.layers.tls.extensions import (_ExtensionsLenField, _ExtensionsField, _cert_status_type, TLS_Ext_SupportedVersions) from scapy.layers.tls.keyexchange import (_TLSSignature, _TLSServerParamsField, _TLSSignatureField, ServerRSAParams, SigAndHashAlgsField, _tls_hash_sig, SigAndHashAlgsLenField) from scapy.layers.tls.keyexchange_tls13 import TicketField from scapy.layers.tls.session import (_GenericTLSSessionInheritance, readConnState, writeConnState) from scapy.layers.tls.crypto.compression import (_tls_compression_algs, _tls_compression_algs_cls, Comp_NULL, _GenericComp, _GenericCompMetaclass) from scapy.layers.tls.crypto.suites import (_tls_cipher_suites, _tls_cipher_suites_cls, _GenericCipherSuite, _GenericCipherSuiteMetaclass) ############################################################################### ### Generic TLS Handshake message ### ############################################################################### _tls_handshake_type = { 0: "hello_request", 1: "client_hello", 2: "server_hello", 3: "hello_verify_request", 4: "session_ticket", 6: "hello_retry_request", 8: "encrypted_extensions", 11: "certificate", 12: "server_key_exchange", 13: "certificate_request", 14: "server_hello_done", 15: "certificate_verify", 16: "client_key_exchange", 20: "finished", 21: "certificate_url", 22: "certificate_status", 23: "supplemental_data", 24: "key_update" } class _TLSHandshake(_GenericTLSSessionInheritance): """ Inherited by other Handshake classes to get post_build(). Also used as a fallback for unknown TLS Handshake packets. """ name = "TLS Handshake Generic message" fields_desc = [ ByteEnumField("msgtype", None, _tls_handshake_type), ThreeBytesField("msglen", None), StrLenField("msg", "", length_from=lambda pkt: pkt.msglen) ] def post_build(self, p, pay): l = len(p) if self.msglen is None: l2 = l - 4 p = struct.pack("!I", (orb(p[0]) << 24) | l2) + p[4:] return p + pay def guess_payload_class(self, p): return conf.padding_layer def tls_session_update(self, msg_str): """ Covers both post_build- and post_dissection- context updates. """ self.tls_session.handshake_messages.append(msg_str) self.tls_session.handshake_messages_parsed.append(self) ############################################################################### ### HelloRequest ### ############################################################################### class TLSHelloRequest(_TLSHandshake): name = "TLS Handshake - Hello Request" fields_desc = [ ByteEnumField("msgtype", 0, _tls_handshake_type), ThreeBytesField("msglen", None) ] def tls_session_update(self, msg_str): """ Message should not be added to the list of handshake messages that will be hashed in the finished and certificate verify messages. """ return ############################################################################### ### ClientHello fields ### ############################################################################### class _GMTUnixTimeField(UTCTimeField): """ "The current time and date in standard UNIX 32-bit format (seconds since the midnight starting Jan 1, 1970, GMT, ignoring leap seconds)." """ def i2h(self, pkt, x): if x is not None: return x return 0 class _TLSRandomBytesField(StrFixedLenField): def i2repr(self, pkt, x): if x is None: return repr(x) return repr_hex(self.i2h(pkt,x)) class _SessionIDField(StrLenField): """ opaque SessionID<0..32>; section 7.4.1.2 of RFC 4346 """ pass class _CipherSuitesField(StrLenField): __slots__ = ["itemfmt", "itemsize", "i2s", "s2i"] islist = 1 def __init__(self, name, default, dico, length_from=None, itemfmt="!H"): StrLenField.__init__(self, name, default, length_from=length_from) self.itemfmt = itemfmt self.itemsize = struct.calcsize(itemfmt) i2s = self.i2s = {} s2i = self.s2i = {} for k in six.iterkeys(dico): i2s[k] = dico[k] s2i[dico[k]] = k def any2i_one(self, pkt, x): if (isinstance(x, _GenericCipherSuite) or isinstance(x, _GenericCipherSuiteMetaclass)): x = x.val if isinstance(x, bytes): x = self.s2i[x] return x def i2repr_one(self, pkt, x): fmt = "0x%%0%dx" % self.itemsize return self.i2s.get(x, fmt % x) def any2i(self, pkt, x): if x is None: return None if not isinstance(x, list): x = [x] return [self.any2i_one(pkt, z) for z in x] def i2repr(self, pkt, x): if x is None: return "None" l = [self.i2repr_one(pkt, z) for z in x] if len(l) == 1: l = l[0] else: l = "[%s]" % ", ".join(l) return l def i2m(self, pkt, val): if val is None: val = [] return b"".join(struct.pack(self.itemfmt, x) for x in val) def m2i(self, pkt, m): res = [] itemlen = struct.calcsize(self.itemfmt) while m: res.append(struct.unpack(self.itemfmt, m[:itemlen])[0]) m = m[itemlen:] return res def i2len(self, pkt, i): if i is None: return 0 return len(i)*self.itemsize class _CompressionMethodsField(_CipherSuitesField): def any2i_one(self, pkt, x): if (isinstance(x, _GenericComp) or isinstance(x, _GenericCompMetaclass)): x = x.val if isinstance(x, str): x = self.s2i[x] return x ############################################################################### ### ClientHello ### ############################################################################### class TLSClientHello(_TLSHandshake): """ TLS ClientHello, with abilities to handle extensions. The Random structure follows the RFC 5246: while it is 32-byte long, many implementations use the first 4 bytes as a gmt_unix_time, and then the remaining 28 byts should be completely random. This was designed in order to (sort of) mitigate broken RNGs. If you prefer to show the full 32 random bytes without any GMT time, just comment in/out the lines below. """ name = "TLS Handshake - Client Hello" fields_desc = [ ByteEnumField("msgtype", 1, _tls_handshake_type), ThreeBytesField("msglen", None), _TLSClientVersionField("version", None, _tls_version), #_TLSRandomBytesField("random_bytes", None, 32), _GMTUnixTimeField("gmt_unix_time", None), _TLSRandomBytesField("random_bytes", None, 28), FieldLenField("sidlen", None, fmt="B", length_of="sid"), _SessionIDField("sid", "", length_from=lambda pkt:pkt.sidlen), FieldLenField("cipherslen", None, fmt="!H", length_of="ciphers"), _CipherSuitesField("ciphers", None, _tls_cipher_suites, itemfmt="!H", length_from=lambda pkt: pkt.cipherslen), FieldLenField("complen", None, fmt="B", length_of="comp"), _CompressionMethodsField("comp", [0], _tls_compression_algs, itemfmt="B", length_from= lambda pkt: pkt.complen), _ExtensionsLenField("extlen", None, length_of="ext"), _ExtensionsField("ext", None, length_from=lambda pkt: (pkt.msglen - (pkt.sidlen or 0) - (pkt.cipherslen or 0) - (pkt.complen or 0) - 40)) ] def post_build(self, p, pay): if self.random_bytes is None: p = p[:10] + randstring(28) + p[10+28:] # if no ciphersuites were provided, we add a few usual, supported # ciphersuites along with the appropriate extensions if self.ciphers is None: cipherstart = 39 + (self.sidlen or 0) s = b"001ac02bc023c02fc027009e0067009c003cc009c0130033002f000a" p = p[:cipherstart] + bytes_hex(s) + p[cipherstart+2:] if self.ext is None: ext_len = b'\x00\x2c' ext_reneg = b'\xff\x01\x00\x01\x00' ext_sn = b'\x00\x00\x00\x0f\x00\r\x00\x00\nsecdev.org' ext_sigalg = b'\x00\r\x00\x08\x00\x06\x04\x03\x04\x01\x02\x01' ext_supgroups = b'\x00\n\x00\x04\x00\x02\x00\x17' p += ext_len + ext_reneg + ext_sn + ext_sigalg + ext_supgroups return super(TLSClientHello, self).post_build(p, pay) def tls_session_update(self, msg_str): """ Either for parsing or building, we store the client_random along with the raw string representing this handshake message. """ super(TLSClientHello, self).tls_session_update(msg_str) self.tls_session.advertised_tls_version = self.version self.random_bytes = msg_str[10:38] self.tls_session.client_random = (struct.pack('!I', self.gmt_unix_time) + self.random_bytes) if self.ext: for e in self.ext: if isinstance(e, TLS_Ext_SupportedVersions): if self.tls_session.tls13_early_secret is None: # this is not recomputed if there was a TLS 1.3 HRR self.tls_session.compute_tls13_early_secrets() break ############################################################################### ### ServerHello ### ############################################################################### class TLSServerHello(TLSClientHello): """ TLS ServerHello, with abilities to handle extensions. The Random structure follows the RFC 5246: while it is 32-byte long, many implementations use the first 4 bytes as a gmt_unix_time, and then the remaining 28 byts should be completely random. This was designed in order to (sort of) mitigate broken RNGs. If you prefer to show the full 32 random bytes without any GMT time, just comment in/out the lines below. """ name = "TLS Handshake - Server Hello" fields_desc = [ ByteEnumField("msgtype", 2, _tls_handshake_type), ThreeBytesField("msglen", None), _TLSVersionField("version", None, _tls_version), #_TLSRandomBytesField("random_bytes", None, 32), _GMTUnixTimeField("gmt_unix_time", None), _TLSRandomBytesField("random_bytes", None, 28), FieldLenField("sidlen", None, length_of="sid", fmt="B"), _SessionIDField("sid", "", length_from = lambda pkt: pkt.sidlen), EnumField("cipher", None, _tls_cipher_suites), _CompressionMethodsField("comp", [0], _tls_compression_algs, itemfmt="B", length_from=lambda pkt: 1), _ExtensionsLenField("extlen", None, length_of="ext"), _ExtensionsField("ext", None, length_from=lambda pkt: (pkt.msglen - (pkt.sidlen or 0) - 38)) ] #40)) ] @classmethod def dispatch_hook(cls, _pkt=None, *args, **kargs): if _pkt and len(_pkt) >= 6: version = struct.unpack("!H", _pkt[4:6])[0] if version == 0x0304 or version > 0x7f00: return TLS13ServerHello return TLSServerHello def post_build(self, p, pay): if self.random_bytes is None: p = p[:10] + randstring(28) + p[10+28:] return super(TLSClientHello, self).post_build(p, pay) def tls_session_update(self, msg_str): """ Either for parsing or building, we store the server_random along with the raw string representing this handshake message. We also store the session_id, the cipher suite (if recognized), the compression method, and finally we instantiate the pending write and read connection states. Usually they get updated later on in the negotiation when we learn the session keys, and eventually they are committed once a ChangeCipherSpec has been sent/received. """ super(TLSClientHello, self).tls_session_update(msg_str) self.tls_session.tls_version = self.version self.random_bytes = msg_str[10:38] self.tls_session.server_random = (struct.pack('!I', self.gmt_unix_time) + self.random_bytes) self.tls_session.sid = self.sid cs_cls = None if self.cipher: cs_val = self.cipher if cs_val not in _tls_cipher_suites_cls: warning("Unknown cipher suite %d from ServerHello" % cs_val) # we do not try to set a default nor stop the execution else: cs_cls = _tls_cipher_suites_cls[cs_val] comp_cls = Comp_NULL if self.comp: comp_val = self.comp[0] if comp_val not in _tls_compression_algs_cls: err = "Unknown compression alg %d from ServerHello" % comp_val warning(err) comp_val = 0 comp_cls = _tls_compression_algs_cls[comp_val] connection_end = self.tls_session.connection_end self.tls_session.pwcs = writeConnState(ciphersuite=cs_cls, compression_alg=comp_cls, connection_end=connection_end, tls_version=self.version) self.tls_session.prcs = readConnState(ciphersuite=cs_cls, compression_alg=comp_cls, connection_end=connection_end, tls_version=self.version) class TLS13ServerHello(TLSClientHello): """ TLS 1.3 ServerHello """ name = "TLS 1.3 Handshake - Server Hello" fields_desc = [ ByteEnumField("msgtype", 2, _tls_handshake_type), ThreeBytesField("msglen", None), _TLSVersionField("version", None, _tls_version), _TLSRandomBytesField("random_bytes", None, 32), EnumField("cipher", None, _tls_cipher_suites), _ExtensionsLenField("extlen", None, length_of="ext"), _ExtensionsField("ext", None, length_from=lambda pkt: (pkt.msglen - 38)) ] def tls_session_update(self, msg_str): """ Either for parsing or building, we store the server_random along with the raw string representing this handshake message. We also store the cipher suite (if recognized), and finally we instantiate the write and read connection states. """ super(TLSClientHello, self).tls_session_update(msg_str) s = self.tls_session s.tls_version = self.version s.server_random = self.random_bytes cs_cls = None if self.cipher: cs_val = self.cipher if cs_val not in _tls_cipher_suites_cls: warning("Unknown cipher suite %d from ServerHello" % cs_val) # we do not try to set a default nor stop the execution else: cs_cls = _tls_cipher_suites_cls[cs_val] connection_end = s.connection_end s.pwcs = writeConnState(ciphersuite=cs_cls, connection_end=connection_end, tls_version=self.version) s.triggered_pwcs_commit = True s.prcs = readConnState(ciphersuite=cs_cls, connection_end=connection_end, tls_version=self.version) s.triggered_prcs_commit = True if self.tls_session.tls13_early_secret is None: # In case the connState was not pre-initialized, we could not # compute the early secrets at the ClientHello, so we do it here. self.tls_session.compute_tls13_early_secrets() s.compute_tls13_handshake_secrets() ############################################################################### ### HelloRetryRequest ### ############################################################################### class TLSHelloRetryRequest(_TLSHandshake): name = "TLS 1.3 Handshake - Hello Retry Request" fields_desc = [ ByteEnumField("msgtype", 6, _tls_handshake_type), ThreeBytesField("msglen", None), _TLSVersionField("version", None, _tls_version), _ExtensionsLenField("extlen", None, length_of="ext"), _ExtensionsField("ext", None, length_from=lambda pkt: pkt.msglen - 4) ] ############################################################################### ### EncryptedExtensions ### ############################################################################### class TLSEncryptedExtensions(_TLSHandshake): name = "TLS 1.3 Handshake - Encrypted Extensions" fields_desc = [ ByteEnumField("msgtype", 8, _tls_handshake_type), ThreeBytesField("msglen", None), _ExtensionsLenField("extlen", None, length_of="ext"), _ExtensionsField("ext", None, length_from=lambda pkt: pkt.msglen - 2) ] ############################################################################### ### Certificate ### ############################################################################### #XXX It might be appropriate to rewrite this mess with basic 3-byte FieldLenField. class _ASN1CertLenField(FieldLenField): """ This is mostly a 3-byte FieldLenField. """ def __init__(self, name, default, length_of=None, adjust=lambda pkt, x: x): self.length_of = length_of self.adjust = adjust Field.__init__(self, name, default, fmt="!I") def i2m(self, pkt, x): if x is None: if self.length_of is not None: fld,fval = pkt.getfield_and_val(self.length_of) f = fld.i2len(pkt, fval) x = self.adjust(pkt, f) return x def addfield(self, pkt, s, val): return s + struct.pack(self.fmt, self.i2m(pkt,val))[1:4] def getfield(self, pkt, s): return s[3:], self.m2i(pkt, struct.unpack(self.fmt, b"\x00" + s[:3])[0]) class _ASN1CertListField(StrLenField): islist = 1 def i2len(self, pkt, i): if i is None: return 0 return len(self.i2m(pkt, i)) def getfield(self, pkt, s): """ Extract Certs in a loop. XXX We should provide safeguards when trying to parse a Cert. """ l = None if self.length_from is not None: l = self.length_from(pkt) lst = [] ret = b"" m = s if l is not None: m, ret = s[:l], s[l:] while m: clen = struct.unpack("!I", b'\x00' + m[:3])[0] lst.append((clen, Cert(m[3:3 + clen]))) m = m[3 + clen:] return m + ret, lst def i2m(self, pkt, i): def i2m_one(i): if isinstance(i, str): return i if isinstance(i, Cert): s = i.der l = struct.pack("!I", len(s))[1:4] return l + s (l, s) = i if isinstance(s, Cert): s = s.der return struct.pack("!I", l)[1:4] + s if i is None: return b"" if isinstance(i, str): return i if isinstance(i, Cert): i = [i] return b"".join(i2m_one(x) for x in i) def any2i(self, pkt, x): return x class _ASN1CertField(StrLenField): def i2len(self, pkt, i): if i is None: return 0 return len(self.i2m(pkt, i)) def getfield(self, pkt, s): l = None if self.length_from is not None: l = self.length_from(pkt) ret = b"" m = s if l is not None: m, ret = s[:l], s[l:] clen = struct.unpack("!I", b'\x00' + m[:3])[0] len_cert = (clen, Cert(m[3:3 + clen])) m = m[3 + clen:] return m + ret, len_cert def i2m(self, pkt, i): def i2m_one(i): if isinstance(i, str): return i if isinstance(i, Cert): s = i.der l = struct.pack("!I", len(s))[1:4] return l + s (l, s) = i if isinstance(s, Cert): s = s.der return struct.pack("!I", l)[1:4] + s if i is None: return b"" return i2m_one(i) def any2i(self, pkt, x): return x class TLSCertificate(_TLSHandshake): """ XXX We do not support RFC 5081, i.e. OpenPGP certificates. """ name = "TLS Handshake - Certificate" fields_desc = [ ByteEnumField("msgtype", 11, _tls_handshake_type), ThreeBytesField("msglen", None), _ASN1CertLenField("certslen", None, length_of="certs"), _ASN1CertListField("certs", [], length_from = lambda pkt: pkt.certslen) ] @classmethod def dispatch_hook(cls, _pkt=None, *args, **kargs): if _pkt: tls_session = kargs.get("tls_session", None) if tls_session and (tls_session.tls_version or 0) >= 0x0304: return TLS13Certificate return TLSCertificate def post_dissection_tls_session_update(self, msg_str): self.tls_session_update(msg_str) connection_end = self.tls_session.connection_end if connection_end == "client": self.tls_session.server_certs = [x[1] for x in self.certs] else: self.tls_session.client_certs = [x[1] for x in self.certs] class _ASN1CertAndExt(_GenericTLSSessionInheritance): name = "Certificate and Extensions" fields_desc = [ _ASN1CertField("cert", ""), FieldLenField("extlen", None, length_of="ext"), _ExtensionsField("ext", [], length_from=lambda pkt: pkt.extlen) ] def extract_padding(self, s): return b"", s class _ASN1CertAndExtListField(PacketListField): def m2i(self, pkt, m): return self.cls(m, tls_session=pkt.tls_session) class TLS13Certificate(_TLSHandshake): name = "TLS 1.3 Handshake - Certificate" fields_desc = [ ByteEnumField("msgtype", 11, _tls_handshake_type), ThreeBytesField("msglen", None), FieldLenField("cert_req_ctxt_len", None, fmt="B", length_of="cert_req_ctxt"), StrLenField("cert_req_ctxt", "", length_from=lambda pkt: pkt.cert_req_ctxt_len), _ASN1CertLenField("certslen", None, length_of="certs"), _ASN1CertAndExtListField("certs", [], _ASN1CertAndExt, length_from=lambda pkt: pkt.certslen) ] def post_dissection_tls_session_update(self, msg_str): self.tls_session_update(msg_str) connection_end = self.tls_session.connection_end if connection_end == "client": if self.certs: sc = [x.cert[1] for x in self.certs] self.tls_session.server_certs = sc else: if self.certs: cc = [x.cert[1] for x in self.certs] self.tls_session.client_certs = cc ############################################################################### ### ServerKeyExchange ### ############################################################################### class TLSServerKeyExchange(_TLSHandshake): name = "TLS Handshake - Server Key Exchange" fields_desc = [ ByteEnumField("msgtype", 12, _tls_handshake_type), ThreeBytesField("msglen", None), _TLSServerParamsField("params", None, length_from=lambda pkt: pkt.msglen), _TLSSignatureField("sig", None, length_from=lambda pkt: pkt.msglen - len(pkt.params)) ] def build(self, *args, **kargs): """ We overload build() method in order to provide a valid default value for params based on TLS session if not provided. This cannot be done by overriding i2m() because the method is called on a copy of the packet. The 'params' field is built according to key_exchange.server_kx_msg_cls which should have been set after receiving a cipher suite in a previous ServerHello. Usual cases are: - None: for RSA encryption or fixed FF/ECDH. This should never happen, as no ServerKeyExchange should be generated in the first place. - ServerDHParams: for ephemeral FFDH. In that case, the parameter to server_kx_msg_cls does not matter. - ServerECDH*Params: for ephemeral ECDH. There are actually three classes, which are dispatched by _tls_server_ecdh_cls_guess on the first byte retrieved. The default here is b"\03", which corresponds to ServerECDHNamedCurveParams (implicit curves). When the Server*DHParams are built via .fill_missing(), the session server_kx_privkey will be updated accordingly. """ fval = self.getfieldval("params") if fval is None: s = self.tls_session if s.pwcs: if s.pwcs.key_exchange.export: cls = ServerRSAParams(tls_session=s) else: cls = s.pwcs.key_exchange.server_kx_msg_cls(b"\x03") cls = cls(tls_session=s) try: cls.fill_missing() except: pass else: cls = Raw() self.params = cls fval = self.getfieldval("sig") if fval is None: s = self.tls_session if s.pwcs: if not s.pwcs.key_exchange.anonymous: p = self.params if p is None: p = b"" m = s.client_random + s.server_random + raw(p) cls = _TLSSignature(tls_session=s) cls._update_sig(m, s.server_key) else: cls = Raw() else: cls = Raw() self.sig = cls return _TLSHandshake.build(self, *args, **kargs) def post_dissection(self, pkt): """ While previously dissecting Server*DHParams, the session server_kx_pubkey should have been updated. XXX Add a 'fixed_dh' OR condition to the 'anonymous' test. """ s = self.tls_session if s.prcs and s.prcs.key_exchange.no_ske: pkt_info = pkt.firstlayer().summary() log_runtime.info("TLS: useless ServerKeyExchange [%s]", pkt_info) if (s.prcs and not s.prcs.key_exchange.anonymous and s.client_random and s.server_random and s.server_certs and len(s.server_certs) > 0): m = s.client_random + s.server_random + raw(self.params) sig_test = self.sig._verify_sig(m, s.server_certs[0]) if not sig_test: pkt_info = pkt.firstlayer().summary() log_runtime.info("TLS: invalid ServerKeyExchange signature [%s]", pkt_info) ############################################################################### ### CertificateRequest ### ############################################################################### _tls_client_certificate_types = { 1: "rsa_sign", 2: "dss_sign", 3: "rsa_fixed_dh", 4: "dss_fixed_dh", 5: "rsa_ephemeral_dh_RESERVED", 6: "dss_ephemeral_dh_RESERVED", 20: "fortezza_dms_RESERVED", 64: "ecdsa_sign", 65: "rsa_fixed_ecdh", 66: "ecdsa_fixed_ecdh" } class _CertTypesField(_CipherSuitesField): pass class _CertAuthoritiesField(StrLenField): """ XXX Rework this with proper ASN.1 parsing. """ islist = 1 def getfield(self, pkt, s): l = self.length_from(pkt) return s[l:], self.m2i(pkt, s[:l]) def m2i(self, pkt, m): res = [] while len(m) > 1: l = struct.unpack("!H", m[:2])[0] if len(m) < l + 2: res.append((l, m[2:])) break dn = m[2:2+l] res.append((l, dn)) m = m[2+l:] return res def i2m(self, pkt, i): return b"".join(map(lambda x_y: struct.pack("!H", x_y[0]) + x_y[1], i)) def addfield(self, pkt, s, val): return s + self.i2m(pkt, val) def i2len(self, pkt, val): if val is None: return 0 else: return len(self.i2m(pkt, val)) class TLSCertificateRequest(_TLSHandshake): name = "TLS Handshake - Certificate Request" fields_desc = [ ByteEnumField("msgtype", 13, _tls_handshake_type), ThreeBytesField("msglen", None), FieldLenField("ctypeslen", None, fmt="B", length_of="ctypes"), _CertTypesField("ctypes", [1, 64], _tls_client_certificate_types, itemfmt="!B", length_from=lambda pkt: pkt.ctypeslen), SigAndHashAlgsLenField("sig_algs_len", None, length_of="sig_algs"), SigAndHashAlgsField("sig_algs", [0x0403, 0x0401, 0x0201], EnumField("hash_sig", None, _tls_hash_sig), length_from=lambda pkt: pkt.sig_algs_len), FieldLenField("certauthlen", None, fmt="!H", length_of="certauth"), _CertAuthoritiesField("certauth", [], length_from=lambda pkt: pkt.certauthlen) ] ############################################################################### ### ServerHelloDone ### ############################################################################### class TLSServerHelloDone(_TLSHandshake): name = "TLS Handshake - Server Hello Done" fields_desc = [ ByteEnumField("msgtype", 14, _tls_handshake_type), ThreeBytesField("msglen", None) ] ############################################################################### ### CertificateVerify ### ############################################################################### class TLSCertificateVerify(_TLSHandshake): name = "TLS Handshake - Certificate Verify" fields_desc = [ ByteEnumField("msgtype", 15, _tls_handshake_type), ThreeBytesField("msglen", None), _TLSSignatureField("sig", None, length_from=lambda pkt: pkt.msglen) ] def build(self, *args, **kargs): sig = self.getfieldval("sig") if sig is None: s = self.tls_session m = b"".join(s.handshake_messages) if s.tls_version >= 0x0304: if s.connection_end == "client": context_string = "TLS 1.3, client CertificateVerify" elif s.connection_end == "server": context_string = "TLS 1.3, server CertificateVerify" m = b"\x20"*64 + context_string + b"\x00" + s.wcs.hash.digest(m) self.sig = _TLSSignature(tls_session=s) if s.connection_end == "client": self.sig._update_sig(m, s.client_key) elif s.connection_end == "server": # should be TLS 1.3 only self.sig._update_sig(m, s.server_key) return _TLSHandshake.build(self, *args, **kargs) def post_dissection(self, pkt): s = self.tls_session m = b"".join(s.handshake_messages) if s.tls_version >= 0x0304: if s.connection_end == "client": context_string = b"TLS 1.3, server CertificateVerify" elif s.connection_end == "server": context_string = b"TLS 1.3, client CertificateVerify" m = b"\x20"*64 + context_string + b"\x00" + s.rcs.hash.digest(m) if s.connection_end == "server": if s.client_certs and len(s.client_certs) > 0: sig_test = self.sig._verify_sig(m, s.client_certs[0]) if not sig_test: pkt_info = pkt.firstlayer().summary() log_runtime.info("TLS: invalid CertificateVerify signature [%s]", pkt_info) elif s.connection_end == "client": # should be TLS 1.3 only if s.server_certs and len(s.server_certs) > 0: sig_test = self.sig._verify_sig(m, s.server_certs[0]) if not sig_test: pkt_info = pkt.firstlayer().summary() log_runtime.info("TLS: invalid CertificateVerify signature [%s]", pkt_info) ############################################################################### ### ClientKeyExchange ### ############################################################################### class _TLSCKExchKeysField(PacketField): __slots__ = ["length_from"] holds_packet = 1 def __init__(self, name, length_from=None, remain=0): self.length_from = length_from PacketField.__init__(self, name, None, None, remain=remain) def m2i(self, pkt, m): """ The client_kx_msg may be either None, EncryptedPreMasterSecret (for RSA encryption key exchange), ClientDiffieHellmanPublic, or ClientECDiffieHellmanPublic. When either one of them gets dissected, the session context is updated accordingly. """ l = self.length_from(pkt) tbd, rem = m[:l], m[l:] s = pkt.tls_session cls = None if s.prcs and s.prcs.key_exchange: cls = s.prcs.key_exchange.client_kx_msg_cls if cls is None: return Raw(tbd)/Padding(rem) return cls(tbd, tls_session=s)/Padding(rem) class TLSClientKeyExchange(_TLSHandshake): """ This class mostly works like TLSServerKeyExchange and its 'params' field. """ name = "TLS Handshake - Client Key Exchange" fields_desc = [ ByteEnumField("msgtype", 16, _tls_handshake_type), ThreeBytesField("msglen", None), _TLSCKExchKeysField("exchkeys", length_from = lambda pkt: pkt.msglen) ] def build(self, *args, **kargs): fval = self.getfieldval("exchkeys") if fval is None: s = self.tls_session if s.prcs: cls = s.prcs.key_exchange.client_kx_msg_cls cls = cls(tls_session=s) else: cls = Raw() self.exchkeys = cls return _TLSHandshake.build(self, *args, **kargs) ############################################################################### ### Finished ### ############################################################################### class _VerifyDataField(StrLenField): def getfield(self, pkt, s): if pkt.tls_session.tls_version == 0x0300: sep = 36 elif pkt.tls_session.tls_version >= 0x0304: sep = pkt.tls_session.rcs.hash.hash_len else: sep = 12 return s[sep:], s[:sep] class TLSFinished(_TLSHandshake): name = "TLS Handshake - Finished" fields_desc = [ ByteEnumField("msgtype", 20, _tls_handshake_type), ThreeBytesField("msglen", None), _VerifyDataField("vdata", None) ] def build(self, *args, **kargs): fval = self.getfieldval("vdata") if fval is None: s = self.tls_session handshake_msg = b"".join(s.handshake_messages) con_end = s.connection_end if s.tls_version < 0x0304: ms = s.master_secret self.vdata = s.wcs.prf.compute_verify_data(con_end, "write", handshake_msg, ms) else: self.vdata = s.compute_tls13_verify_data(con_end, "write") return _TLSHandshake.build(self, *args, **kargs) def post_dissection(self, pkt): s = self.tls_session if not s.frozen: handshake_msg = b"".join(s.handshake_messages) if s.tls_version < 0x0304 and s.master_secret is not None: ms = s.master_secret con_end = s.connection_end verify_data = s.rcs.prf.compute_verify_data(con_end, "read", handshake_msg, ms) if self.vdata != verify_data: pkt_info = pkt.firstlayer().summary() log_runtime.info("TLS: invalid Finished received [%s]", pkt_info) elif s.tls_version >= 0x0304: con_end = s.connection_end verify_data = s.compute_tls13_verify_data(con_end, "read") if self.vdata != verify_data: pkt_info = pkt.firstlayer().summary() log_runtime.info("TLS: invalid Finished received [%s]", pkt_info) def post_build_tls_session_update(self, msg_str): self.tls_session_update(msg_str) s = self.tls_session if s.tls_version >= 0x0304: s.pwcs = writeConnState(ciphersuite=type(s.wcs.ciphersuite), connection_end=s.connection_end, tls_version=s.tls_version) s.triggered_pwcs_commit = True if s.connection_end == "server": s.compute_tls13_traffic_secrets() elif s.connection_end == "client": s.compute_tls13_traffic_secrets_end() s.compute_tls13_resumption_secret() def post_dissection_tls_session_update(self, msg_str): self.tls_session_update(msg_str) s = self.tls_session if s.tls_version >= 0x0304: s.prcs = readConnState(ciphersuite=type(s.rcs.ciphersuite), connection_end=s.connection_end, tls_version=s.tls_version) s.triggered_prcs_commit = True if s.connection_end == "client": s.compute_tls13_traffic_secrets() elif s.connection_end == "server": s.compute_tls13_traffic_secrets_end() s.compute_tls13_resumption_secret() ## Additional handshake messages ############################################################################### ### HelloVerifyRequest ### ############################################################################### class TLSHelloVerifyRequest(_TLSHandshake): """ Defined for DTLS, see RFC 6347. """ name = "TLS Handshake - Hello Verify Request" fields_desc = [ ByteEnumField("msgtype", 21, _tls_handshake_type), ThreeBytesField("msglen", None), FieldLenField("cookielen", None, fmt="B", length_of="cookie"), StrLenField("cookie", "", length_from=lambda pkt: pkt.cookielen) ] ############################################################################### ### CertificateURL ### ############################################################################### _tls_cert_chain_types = { 0: "individual_certs", 1: "pkipath" } class URLAndOptionalHash(Packet): name = "URLAndOptionHash structure for TLSCertificateURL" fields_desc = [ FieldLenField("urllen", None, length_of="url"), StrLenField("url", "", length_from=lambda pkt: pkt.urllen), FieldLenField("hash_present", None, fmt="B", length_of="hash", adjust=lambda pkt,x: int(math.ceil(x/20.))), StrLenField("hash", "", length_from=lambda pkt: 20*pkt.hash_present) ] def guess_payload_class(self, p): return Padding class TLSCertificateURL(_TLSHandshake): """ Defined in RFC 4366. PkiPath structure of section 8 is not implemented yet. """ name = "TLS Handshake - Certificate URL" fields_desc = [ ByteEnumField("msgtype", 21, _tls_handshake_type), ThreeBytesField("msglen", None), ByteEnumField("certchaintype", None, _tls_cert_chain_types), FieldLenField("uahlen", None, length_of="uah"), PacketListField("uah", [], URLAndOptionalHash, length_from=lambda pkt: pkt.uahlen) ] ############################################################################### ### CertificateStatus ### ############################################################################### class ThreeBytesLenField(FieldLenField): def __init__(self, name, default, length_of=None, adjust=lambda pkt, x:x): FieldLenField.__init__(self, name, default, length_of=length_of, fmt='!I', adjust=adjust) def i2repr(self, pkt, x): if x is None: return 0 return repr(self.i2h(pkt,x)) def addfield(self, pkt, s, val): return s+struct.pack(self.fmt, self.i2m(pkt,val))[1:4] def getfield(self, pkt, s): return s[3:], self.m2i(pkt, struct.unpack(self.fmt, b"\x00"+s[:3])[0]) _cert_status_cls = { 1: OCSP_Response } class _StatusField(PacketField): def m2i(self, pkt, m): idtype = pkt.status_type cls = self.cls if idtype in _cert_status_cls: cls = _cert_status_cls[idtype] return cls(m) class TLSCertificateStatus(_TLSHandshake): name = "TLS Handshake - Certificate Status" fields_desc = [ ByteEnumField("msgtype", 22, _tls_handshake_type), ThreeBytesField("msglen", None), ByteEnumField("status_type", 1, _cert_status_type), ThreeBytesLenField("responselen", None, length_of="response"), _StatusField("response", None, Raw) ] ############################################################################### ### SupplementalData ### ############################################################################### class SupDataEntry(Packet): name = "Supplemental Data Entry - Generic" fields_desc = [ ShortField("sdtype", None), FieldLenField("len", None, length_of="data"), StrLenField("data", "", length_from=lambda pkt:pkt.len) ] def guess_payload_class(self, p): return Padding class UserMappingData(Packet): name = "User Mapping Data" fields_desc = [ ByteField("version", None), FieldLenField("len", None, length_of="data"), StrLenField("data", "", length_from=lambda pkt: pkt.len)] def guess_payload_class(self, p): return Padding class SupDataEntryUM(Packet): name = "Supplemental Data Entry - User Mapping" fields_desc = [ ShortField("sdtype", None), FieldLenField("len", None, length_of="data", adjust=lambda pkt, x: x+2), FieldLenField("dlen", None, length_of="data"), PacketListField("data", [], UserMappingData, length_from=lambda pkt:pkt.dlen) ] def guess_payload_class(self, p): return Padding class TLSSupplementalData(_TLSHandshake): name = "TLS Handshake - Supplemental Data" fields_desc = [ ByteEnumField("msgtype", 23, _tls_handshake_type), ThreeBytesField("msglen", None), ThreeBytesLenField("sdatalen", None, length_of="sdata"), PacketListField("sdata", [], SupDataEntry, length_from=lambda pkt: pkt.sdatalen) ] ############################################################################### ### NewSessionTicket ### ############################################################################### class TLSNewSessionTicket(_TLSHandshake): """ XXX When knowing the right secret, we should be able to read the ticket. """ name = "TLS Handshake - New Session Ticket" fields_desc = [ ByteEnumField("msgtype", 4, _tls_handshake_type), ThreeBytesField("msglen", None), IntField("lifetime", 0xffffffff), FieldLenField("ticketlen", None, length_of="ticket"), StrLenField("ticket", "", length_from=lambda pkt: pkt.ticketlen) ] @classmethod def dispatch_hook(cls, _pkt=None, *args, **kargs): s = kargs.get("tls_session", None) if s and s.tls_version >= 0x0304: return TLS13NewSessionTicket return TLSNewSessionTicket def post_dissection_tls_session_update(self, msg_str): self.tls_session_update(msg_str) if self.tls_session.connection_end == "client": self.tls_session.client_session_ticket = self.ticket class TLS13NewSessionTicket(_TLSHandshake): """ Uncomment the TicketField line for parsing a RFC 5077 ticket. """ name = "TLS Handshake - New Session Ticket" fields_desc = [ ByteEnumField("msgtype", 4, _tls_handshake_type), ThreeBytesField("msglen", None), IntField("ticket_lifetime", 0xffffffff), IntField("ticket_age_add", 0), FieldLenField("ticketlen", None, length_of="ticket"), #TicketField("ticket", "", StrLenField("ticket", "", length_from=lambda pkt: pkt.ticketlen), _ExtensionsLenField("extlen", None, length_of="ext"), _ExtensionsField("ext", None, length_from=lambda pkt: (pkt.msglen - (pkt.ticketlen or 0) - 12)) ] def post_dissection_tls_session_update(self, msg_str): self.tls_session_update(msg_str) if self.tls_session.connection_end == "client": self.tls_session.client_session_ticket = self.ticket ############################################################################### ### All handshake messages defined in this module ### ############################################################################### _tls_handshake_cls = { 0: TLSHelloRequest, 1: TLSClientHello, 2: TLSServerHello, 3: TLSHelloVerifyRequest, 4: TLSNewSessionTicket, 6: TLSHelloRetryRequest, 8: TLSEncryptedExtensions, 11: TLSCertificate, 12: TLSServerKeyExchange, 13: TLSCertificateRequest, 14: TLSServerHelloDone, 15: TLSCertificateVerify, 16: TLSClientKeyExchange, 20: TLSFinished, 21: TLSCertificateURL, 22: TLSCertificateStatus, 23: TLSSupplementalData }