1# 2# This file is part of pyasn1 software. 3# 4# Copyright (c) 2005-2019, Ilya Etingof <[email protected]> 5# License: http://snmplabs.com/pyasn1/license.html 6# 7from pyasn1 import debug 8from pyasn1 import error 9from pyasn1.codec.ber import eoo 10from pyasn1.compat.integer import from_bytes 11from pyasn1.compat.octets import oct2int, octs2ints, ints2octs, null 12from pyasn1.type import base 13from pyasn1.type import char 14from pyasn1.type import tag 15from pyasn1.type import tagmap 16from pyasn1.type import univ 17from pyasn1.type import useful 18 19__all__ = ['decode'] 20 21LOG = debug.registerLoggee(__name__, flags=debug.DEBUG_DECODER) 22 23noValue = base.noValue 24 25 26class AbstractDecoder(object): 27 protoComponent = None 28 29 def valueDecoder(self, substrate, asn1Spec, 30 tagSet=None, length=None, state=None, 31 decodeFun=None, substrateFun=None, 32 **options): 33 raise error.PyAsn1Error('Decoder not implemented for %s' % (tagSet,)) 34 35 def indefLenValueDecoder(self, substrate, asn1Spec, 36 tagSet=None, length=None, state=None, 37 decodeFun=None, substrateFun=None, 38 **options): 39 raise error.PyAsn1Error('Indefinite length mode decoder not implemented for %s' % (tagSet,)) 40 41 42class AbstractSimpleDecoder(AbstractDecoder): 43 @staticmethod 44 def substrateCollector(asn1Object, substrate, length): 45 return substrate[:length], substrate[length:] 46 47 def _createComponent(self, asn1Spec, tagSet, value, **options): 48 if options.get('native'): 49 return value 50 elif asn1Spec is None: 51 return self.protoComponent.clone(value, tagSet=tagSet) 52 elif value is noValue: 53 return asn1Spec 54 else: 55 return asn1Spec.clone(value) 56 57 58class ExplicitTagDecoder(AbstractSimpleDecoder): 59 protoComponent = univ.Any('') 60 61 def valueDecoder(self, substrate, asn1Spec, 62 tagSet=None, length=None, state=None, 63 decodeFun=None, substrateFun=None, 64 **options): 65 if substrateFun: 66 return substrateFun( 67 self._createComponent(asn1Spec, tagSet, '', **options), 68 substrate, length 69 ) 70 71 head, tail = substrate[:length], substrate[length:] 72 73 value, _ = decodeFun(head, asn1Spec, tagSet, length, **options) 74 75 if LOG: 76 LOG('explicit tag container carries %d octets of trailing payload ' 77 '(will be lost!): %s' % (len(_), debug.hexdump(_))) 78 79 return value, tail 80 81 def indefLenValueDecoder(self, substrate, asn1Spec, 82 tagSet=None, length=None, state=None, 83 decodeFun=None, substrateFun=None, 84 **options): 85 if substrateFun: 86 return substrateFun( 87 self._createComponent(asn1Spec, tagSet, '', **options), 88 substrate, length 89 ) 90 91 value, substrate = decodeFun(substrate, asn1Spec, tagSet, length, **options) 92 93 eooMarker, substrate = decodeFun(substrate, allowEoo=True, **options) 94 95 if eooMarker is eoo.endOfOctets: 96 return value, substrate 97 else: 98 raise error.PyAsn1Error('Missing end-of-octets terminator') 99 100 101explicitTagDecoder = ExplicitTagDecoder() 102 103 104class IntegerDecoder(AbstractSimpleDecoder): 105 protoComponent = univ.Integer(0) 106 107 def valueDecoder(self, substrate, asn1Spec, 108 tagSet=None, length=None, state=None, 109 decodeFun=None, substrateFun=None, 110 **options): 111 112 if tagSet[0].tagFormat != tag.tagFormatSimple: 113 raise error.PyAsn1Error('Simple tag format expected') 114 115 head, tail = substrate[:length], substrate[length:] 116 117 if not head: 118 return self._createComponent(asn1Spec, tagSet, 0, **options), tail 119 120 value = from_bytes(head, signed=True) 121 122 return self._createComponent(asn1Spec, tagSet, value, **options), tail 123 124 125class BooleanDecoder(IntegerDecoder): 126 protoComponent = univ.Boolean(0) 127 128 def _createComponent(self, asn1Spec, tagSet, value, **options): 129 return IntegerDecoder._createComponent( 130 self, asn1Spec, tagSet, value and 1 or 0, **options) 131 132 133class BitStringDecoder(AbstractSimpleDecoder): 134 protoComponent = univ.BitString(()) 135 supportConstructedForm = True 136 137 def valueDecoder(self, substrate, asn1Spec, 138 tagSet=None, length=None, state=None, 139 decodeFun=None, substrateFun=None, 140 **options): 141 head, tail = substrate[:length], substrate[length:] 142 143 if substrateFun: 144 return substrateFun(self._createComponent( 145 asn1Spec, tagSet, noValue, **options), substrate, length) 146 147 if not head: 148 raise error.PyAsn1Error('Empty BIT STRING substrate') 149 150 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check? 151 152 trailingBits = oct2int(head[0]) 153 if trailingBits > 7: 154 raise error.PyAsn1Error( 155 'Trailing bits overflow %s' % trailingBits 156 ) 157 158 value = self.protoComponent.fromOctetString( 159 head[1:], internalFormat=True, padding=trailingBits) 160 161 return self._createComponent(asn1Spec, tagSet, value, **options), tail 162 163 if not self.supportConstructedForm: 164 raise error.PyAsn1Error('Constructed encoding form prohibited ' 165 'at %s' % self.__class__.__name__) 166 167 if LOG: 168 LOG('assembling constructed serialization') 169 170 # All inner fragments are of the same type, treat them as octet string 171 substrateFun = self.substrateCollector 172 173 bitString = self.protoComponent.fromOctetString(null, internalFormat=True) 174 175 while head: 176 component, head = decodeFun(head, self.protoComponent, 177 substrateFun=substrateFun, **options) 178 179 trailingBits = oct2int(component[0]) 180 if trailingBits > 7: 181 raise error.PyAsn1Error( 182 'Trailing bits overflow %s' % trailingBits 183 ) 184 185 bitString = self.protoComponent.fromOctetString( 186 component[1:], internalFormat=True, 187 prepend=bitString, padding=trailingBits 188 ) 189 190 return self._createComponent(asn1Spec, tagSet, bitString, **options), tail 191 192 def indefLenValueDecoder(self, substrate, asn1Spec, 193 tagSet=None, length=None, state=None, 194 decodeFun=None, substrateFun=None, 195 **options): 196 197 if substrateFun: 198 return substrateFun(self._createComponent(asn1Spec, tagSet, noValue, **options), substrate, length) 199 200 # All inner fragments are of the same type, treat them as octet string 201 substrateFun = self.substrateCollector 202 203 bitString = self.protoComponent.fromOctetString(null, internalFormat=True) 204 205 while substrate: 206 component, substrate = decodeFun(substrate, self.protoComponent, 207 substrateFun=substrateFun, 208 allowEoo=True, **options) 209 if component is eoo.endOfOctets: 210 break 211 212 trailingBits = oct2int(component[0]) 213 if trailingBits > 7: 214 raise error.PyAsn1Error( 215 'Trailing bits overflow %s' % trailingBits 216 ) 217 218 bitString = self.protoComponent.fromOctetString( 219 component[1:], internalFormat=True, 220 prepend=bitString, padding=trailingBits 221 ) 222 223 else: 224 raise error.SubstrateUnderrunError('No EOO seen before substrate ends') 225 226 return self._createComponent(asn1Spec, tagSet, bitString, **options), substrate 227 228 229class OctetStringDecoder(AbstractSimpleDecoder): 230 protoComponent = univ.OctetString('') 231 supportConstructedForm = True 232 233 def valueDecoder(self, substrate, asn1Spec, 234 tagSet=None, length=None, state=None, 235 decodeFun=None, substrateFun=None, 236 **options): 237 head, tail = substrate[:length], substrate[length:] 238 239 if substrateFun: 240 return substrateFun(self._createComponent(asn1Spec, tagSet, noValue, **options), 241 substrate, length) 242 243 if tagSet[0].tagFormat == tag.tagFormatSimple: # XXX what tag to check? 244 return self._createComponent(asn1Spec, tagSet, head, **options), tail 245 246 if not self.supportConstructedForm: 247 raise error.PyAsn1Error('Constructed encoding form prohibited at %s' % self.__class__.__name__) 248 249 if LOG: 250 LOG('assembling constructed serialization') 251 252 # All inner fragments are of the same type, treat them as octet string 253 substrateFun = self.substrateCollector 254 255 header = null 256 257 while head: 258 component, head = decodeFun(head, self.protoComponent, 259 substrateFun=substrateFun, 260 **options) 261 header += component 262 263 return self._createComponent(asn1Spec, tagSet, header, **options), tail 264 265 def indefLenValueDecoder(self, substrate, asn1Spec, 266 tagSet=None, length=None, state=None, 267 decodeFun=None, substrateFun=None, 268 **options): 269 if substrateFun and substrateFun is not self.substrateCollector: 270 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 271 return substrateFun(asn1Object, substrate, length) 272 273 # All inner fragments are of the same type, treat them as octet string 274 substrateFun = self.substrateCollector 275 276 header = null 277 278 while substrate: 279 component, substrate = decodeFun(substrate, 280 self.protoComponent, 281 substrateFun=substrateFun, 282 allowEoo=True, **options) 283 if component is eoo.endOfOctets: 284 break 285 286 header += component 287 288 else: 289 raise error.SubstrateUnderrunError( 290 'No EOO seen before substrate ends' 291 ) 292 293 return self._createComponent(asn1Spec, tagSet, header, **options), substrate 294 295 296class NullDecoder(AbstractSimpleDecoder): 297 protoComponent = univ.Null('') 298 299 def valueDecoder(self, substrate, asn1Spec, 300 tagSet=None, length=None, state=None, 301 decodeFun=None, substrateFun=None, 302 **options): 303 304 if tagSet[0].tagFormat != tag.tagFormatSimple: 305 raise error.PyAsn1Error('Simple tag format expected') 306 307 head, tail = substrate[:length], substrate[length:] 308 309 component = self._createComponent(asn1Spec, tagSet, '', **options) 310 311 if head: 312 raise error.PyAsn1Error('Unexpected %d-octet substrate for Null' % length) 313 314 return component, tail 315 316 317class ObjectIdentifierDecoder(AbstractSimpleDecoder): 318 protoComponent = univ.ObjectIdentifier(()) 319 320 def valueDecoder(self, substrate, asn1Spec, 321 tagSet=None, length=None, state=None, 322 decodeFun=None, substrateFun=None, 323 **options): 324 if tagSet[0].tagFormat != tag.tagFormatSimple: 325 raise error.PyAsn1Error('Simple tag format expected') 326 327 head, tail = substrate[:length], substrate[length:] 328 if not head: 329 raise error.PyAsn1Error('Empty substrate') 330 331 head = octs2ints(head) 332 333 oid = () 334 index = 0 335 substrateLen = len(head) 336 while index < substrateLen: 337 subId = head[index] 338 index += 1 339 if subId < 128: 340 oid += (subId,) 341 elif subId > 128: 342 # Construct subid from a number of octets 343 nextSubId = subId 344 subId = 0 345 while nextSubId >= 128: 346 subId = (subId << 7) + (nextSubId & 0x7F) 347 if index >= substrateLen: 348 raise error.SubstrateUnderrunError( 349 'Short substrate for sub-OID past %s' % (oid,) 350 ) 351 nextSubId = head[index] 352 index += 1 353 oid += ((subId << 7) + nextSubId,) 354 elif subId == 128: 355 # ASN.1 spec forbids leading zeros (0x80) in OID 356 # encoding, tolerating it opens a vulnerability. See 357 # https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf 358 # page 7 359 raise error.PyAsn1Error('Invalid octet 0x80 in OID encoding') 360 361 # Decode two leading arcs 362 if 0 <= oid[0] <= 39: 363 oid = (0,) + oid 364 elif 40 <= oid[0] <= 79: 365 oid = (1, oid[0] - 40) + oid[1:] 366 elif oid[0] >= 80: 367 oid = (2, oid[0] - 80) + oid[1:] 368 else: 369 raise error.PyAsn1Error('Malformed first OID octet: %s' % head[0]) 370 371 return self._createComponent(asn1Spec, tagSet, oid, **options), tail 372 373 374class RealDecoder(AbstractSimpleDecoder): 375 protoComponent = univ.Real() 376 377 def valueDecoder(self, substrate, asn1Spec, 378 tagSet=None, length=None, state=None, 379 decodeFun=None, substrateFun=None, 380 **options): 381 if tagSet[0].tagFormat != tag.tagFormatSimple: 382 raise error.PyAsn1Error('Simple tag format expected') 383 384 head, tail = substrate[:length], substrate[length:] 385 386 if not head: 387 return self._createComponent(asn1Spec, tagSet, 0.0, **options), tail 388 389 fo = oct2int(head[0]) 390 head = head[1:] 391 if fo & 0x80: # binary encoding 392 if not head: 393 raise error.PyAsn1Error("Incomplete floating-point value") 394 395 if LOG: 396 LOG('decoding binary encoded REAL') 397 398 n = (fo & 0x03) + 1 399 400 if n == 4: 401 n = oct2int(head[0]) 402 head = head[1:] 403 404 eo, head = head[:n], head[n:] 405 406 if not eo or not head: 407 raise error.PyAsn1Error('Real exponent screwed') 408 409 e = oct2int(eo[0]) & 0x80 and -1 or 0 410 411 while eo: # exponent 412 e <<= 8 413 e |= oct2int(eo[0]) 414 eo = eo[1:] 415 416 b = fo >> 4 & 0x03 # base bits 417 418 if b > 2: 419 raise error.PyAsn1Error('Illegal Real base') 420 421 if b == 1: # encbase = 8 422 e *= 3 423 424 elif b == 2: # encbase = 16 425 e *= 4 426 p = 0 427 428 while head: # value 429 p <<= 8 430 p |= oct2int(head[0]) 431 head = head[1:] 432 433 if fo & 0x40: # sign bit 434 p = -p 435 436 sf = fo >> 2 & 0x03 # scale bits 437 p *= 2 ** sf 438 value = (p, 2, e) 439 440 elif fo & 0x40: # infinite value 441 if LOG: 442 LOG('decoding infinite REAL') 443 444 value = fo & 0x01 and '-inf' or 'inf' 445 446 elif fo & 0xc0 == 0: # character encoding 447 if not head: 448 raise error.PyAsn1Error("Incomplete floating-point value") 449 450 if LOG: 451 LOG('decoding character encoded REAL') 452 453 try: 454 if fo & 0x3 == 0x1: # NR1 455 value = (int(head), 10, 0) 456 457 elif fo & 0x3 == 0x2: # NR2 458 value = float(head) 459 460 elif fo & 0x3 == 0x3: # NR3 461 value = float(head) 462 463 else: 464 raise error.SubstrateUnderrunError( 465 'Unknown NR (tag %s)' % fo 466 ) 467 468 except ValueError: 469 raise error.SubstrateUnderrunError( 470 'Bad character Real syntax' 471 ) 472 473 else: 474 raise error.SubstrateUnderrunError( 475 'Unknown encoding (tag %s)' % fo 476 ) 477 478 return self._createComponent(asn1Spec, tagSet, value, **options), tail 479 480 481class AbstractConstructedDecoder(AbstractDecoder): 482 protoComponent = None 483 484 485class UniversalConstructedTypeDecoder(AbstractConstructedDecoder): 486 protoRecordComponent = None 487 protoSequenceComponent = None 488 489 def _getComponentTagMap(self, asn1Object, idx): 490 raise NotImplementedError() 491 492 def _getComponentPositionByType(self, asn1Object, tagSet, idx): 493 raise NotImplementedError() 494 495 def _decodeComponents(self, substrate, tagSet=None, decodeFun=None, **options): 496 components = [] 497 componentTypes = set() 498 499 while substrate: 500 component, substrate = decodeFun(substrate, **options) 501 if component is eoo.endOfOctets: 502 break 503 504 components.append(component) 505 componentTypes.add(component.tagSet) 506 507 # Now we have to guess is it SEQUENCE/SET or SEQUENCE OF/SET OF 508 # The heuristics is: 509 # * 1+ components of different types -> likely SEQUENCE/SET 510 # * otherwise -> likely SEQUENCE OF/SET OF 511 if len(componentTypes) > 1: 512 protoComponent = self.protoRecordComponent 513 514 else: 515 protoComponent = self.protoSequenceComponent 516 517 asn1Object = protoComponent.clone( 518 # construct tagSet from base tag from prototype ASN.1 object 519 # and additional tags recovered from the substrate 520 tagSet=tag.TagSet(protoComponent.tagSet.baseTag, *tagSet.superTags) 521 ) 522 523 if LOG: 524 LOG('guessed %r container type (pass `asn1Spec` to guide the ' 525 'decoder)' % asn1Object) 526 527 for idx, component in enumerate(components): 528 asn1Object.setComponentByPosition( 529 idx, component, 530 verifyConstraints=False, 531 matchTags=False, matchConstraints=False 532 ) 533 534 return asn1Object, substrate 535 536 def valueDecoder(self, substrate, asn1Spec, 537 tagSet=None, length=None, state=None, 538 decodeFun=None, substrateFun=None, 539 **options): 540 if tagSet[0].tagFormat != tag.tagFormatConstructed: 541 raise error.PyAsn1Error('Constructed tag format expected') 542 543 head, tail = substrate[:length], substrate[length:] 544 545 if substrateFun is not None: 546 if asn1Spec is not None: 547 asn1Object = asn1Spec.clone() 548 549 elif self.protoComponent is not None: 550 asn1Object = self.protoComponent.clone(tagSet=tagSet) 551 552 else: 553 asn1Object = self.protoRecordComponent, self.protoSequenceComponent 554 555 return substrateFun(asn1Object, substrate, length) 556 557 if asn1Spec is None: 558 asn1Object, trailing = self._decodeComponents( 559 head, tagSet=tagSet, decodeFun=decodeFun, **options 560 ) 561 562 if trailing: 563 if LOG: 564 LOG('Unused trailing %d octets encountered: %s' % ( 565 len(trailing), debug.hexdump(trailing))) 566 567 return asn1Object, tail 568 569 asn1Object = asn1Spec.clone() 570 asn1Object.clear() 571 572 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId): 573 574 namedTypes = asn1Spec.componentType 575 576 isSetType = asn1Spec.typeId == univ.Set.typeId 577 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault 578 579 if LOG: 580 LOG('decoding %sdeterministic %s type %r chosen by type ID' % ( 581 not isDeterministic and 'non-' or '', isSetType and 'SET' or '', 582 asn1Spec)) 583 584 seenIndices = set() 585 idx = 0 586 while head: 587 if not namedTypes: 588 componentType = None 589 590 elif isSetType: 591 componentType = namedTypes.tagMapUnique 592 593 else: 594 try: 595 if isDeterministic: 596 componentType = namedTypes[idx].asn1Object 597 598 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 599 componentType = namedTypes.getTagMapNearPosition(idx) 600 601 else: 602 componentType = namedTypes[idx].asn1Object 603 604 except IndexError: 605 raise error.PyAsn1Error( 606 'Excessive components decoded at %r' % (asn1Spec,) 607 ) 608 609 component, head = decodeFun(head, componentType, **options) 610 611 if not isDeterministic and namedTypes: 612 if isSetType: 613 idx = namedTypes.getPositionByType(component.effectiveTagSet) 614 615 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 616 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx) 617 618 asn1Object.setComponentByPosition( 619 idx, component, 620 verifyConstraints=False, 621 matchTags=False, matchConstraints=False 622 ) 623 624 seenIndices.add(idx) 625 idx += 1 626 627 if LOG: 628 LOG('seen component indices %s' % seenIndices) 629 630 if namedTypes: 631 if not namedTypes.requiredComponents.issubset(seenIndices): 632 raise error.PyAsn1Error( 633 'ASN.1 object %s has uninitialized ' 634 'components' % asn1Object.__class__.__name__) 635 636 if namedTypes.hasOpenTypes: 637 638 openTypes = options.get('openTypes', {}) 639 640 if LOG: 641 LOG('using open types map: %r' % openTypes) 642 643 if openTypes or options.get('decodeOpenTypes', False): 644 645 for idx, namedType in enumerate(namedTypes.namedTypes): 646 if not namedType.openType: 647 continue 648 649 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue: 650 continue 651 652 governingValue = asn1Object.getComponentByName( 653 namedType.openType.name 654 ) 655 656 try: 657 openType = openTypes[governingValue] 658 659 except KeyError: 660 661 try: 662 openType = namedType.openType[governingValue] 663 664 except KeyError: 665 if LOG: 666 LOG('failed to resolve open type by governing ' 667 'value %r' % (governingValue,)) 668 continue 669 670 if LOG: 671 LOG('resolved open type %r by governing ' 672 'value %r' % (openType, governingValue)) 673 674 containerValue = asn1Object.getComponentByPosition(idx) 675 676 if containerValue.typeId in ( 677 univ.SetOf.typeId, univ.SequenceOf.typeId): 678 679 for pos, containerElement in enumerate( 680 containerValue): 681 682 component, rest = decodeFun( 683 containerValue[pos].asOctets(), 684 asn1Spec=openType, **options 685 ) 686 687 containerValue[pos] = component 688 689 else: 690 component, rest = decodeFun( 691 asn1Object.getComponentByPosition(idx).asOctets(), 692 asn1Spec=openType, **options 693 ) 694 695 asn1Object.setComponentByPosition(idx, component) 696 697 else: 698 inconsistency = asn1Object.isInconsistent 699 if inconsistency: 700 raise inconsistency 701 702 else: 703 asn1Object = asn1Spec.clone() 704 asn1Object.clear() 705 706 componentType = asn1Spec.componentType 707 708 if LOG: 709 LOG('decoding type %r chosen by given `asn1Spec`' % componentType) 710 711 idx = 0 712 713 while head: 714 component, head = decodeFun(head, componentType, **options) 715 asn1Object.setComponentByPosition( 716 idx, component, 717 verifyConstraints=False, 718 matchTags=False, matchConstraints=False 719 ) 720 721 idx += 1 722 723 return asn1Object, tail 724 725 def indefLenValueDecoder(self, substrate, asn1Spec, 726 tagSet=None, length=None, state=None, 727 decodeFun=None, substrateFun=None, 728 **options): 729 if tagSet[0].tagFormat != tag.tagFormatConstructed: 730 raise error.PyAsn1Error('Constructed tag format expected') 731 732 if substrateFun is not None: 733 if asn1Spec is not None: 734 asn1Object = asn1Spec.clone() 735 736 elif self.protoComponent is not None: 737 asn1Object = self.protoComponent.clone(tagSet=tagSet) 738 739 else: 740 asn1Object = self.protoRecordComponent, self.protoSequenceComponent 741 742 return substrateFun(asn1Object, substrate, length) 743 744 if asn1Spec is None: 745 return self._decodeComponents( 746 substrate, tagSet=tagSet, decodeFun=decodeFun, 747 **dict(options, allowEoo=True) 748 ) 749 750 asn1Object = asn1Spec.clone() 751 asn1Object.clear() 752 753 if asn1Spec.typeId in (univ.Sequence.typeId, univ.Set.typeId): 754 755 namedTypes = asn1Object.componentType 756 757 isSetType = asn1Object.typeId == univ.Set.typeId 758 isDeterministic = not isSetType and not namedTypes.hasOptionalOrDefault 759 760 if LOG: 761 LOG('decoding %sdeterministic %s type %r chosen by type ID' % ( 762 not isDeterministic and 'non-' or '', isSetType and 'SET' or '', 763 asn1Spec)) 764 765 seenIndices = set() 766 idx = 0 767 while substrate: 768 if len(namedTypes) <= idx: 769 asn1Spec = None 770 771 elif isSetType: 772 asn1Spec = namedTypes.tagMapUnique 773 774 else: 775 try: 776 if isDeterministic: 777 asn1Spec = namedTypes[idx].asn1Object 778 779 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 780 asn1Spec = namedTypes.getTagMapNearPosition(idx) 781 782 else: 783 asn1Spec = namedTypes[idx].asn1Object 784 785 except IndexError: 786 raise error.PyAsn1Error( 787 'Excessive components decoded at %r' % (asn1Object,) 788 ) 789 790 component, substrate = decodeFun(substrate, asn1Spec, allowEoo=True, **options) 791 if component is eoo.endOfOctets: 792 break 793 794 if not isDeterministic and namedTypes: 795 if isSetType: 796 idx = namedTypes.getPositionByType(component.effectiveTagSet) 797 elif namedTypes[idx].isOptional or namedTypes[idx].isDefaulted: 798 idx = namedTypes.getPositionNearType(component.effectiveTagSet, idx) 799 800 asn1Object.setComponentByPosition( 801 idx, component, 802 verifyConstraints=False, 803 matchTags=False, matchConstraints=False 804 ) 805 806 seenIndices.add(idx) 807 idx += 1 808 809 else: 810 raise error.SubstrateUnderrunError( 811 'No EOO seen before substrate ends' 812 ) 813 814 if LOG: 815 LOG('seen component indices %s' % seenIndices) 816 817 if namedTypes: 818 if not namedTypes.requiredComponents.issubset(seenIndices): 819 raise error.PyAsn1Error('ASN.1 object %s has uninitialized components' % asn1Object.__class__.__name__) 820 821 if namedTypes.hasOpenTypes: 822 823 openTypes = options.get('openTypes', {}) 824 825 if LOG: 826 LOG('using open types map: %r' % openTypes) 827 828 if openTypes or options.get('decodeOpenTypes', False): 829 830 for idx, namedType in enumerate(namedTypes.namedTypes): 831 if not namedType.openType: 832 continue 833 834 if namedType.isOptional and not asn1Object.getComponentByPosition(idx).isValue: 835 continue 836 837 governingValue = asn1Object.getComponentByName( 838 namedType.openType.name 839 ) 840 841 try: 842 openType = openTypes[governingValue] 843 844 except KeyError: 845 846 try: 847 openType = namedType.openType[governingValue] 848 849 except KeyError: 850 if LOG: 851 LOG('failed to resolve open type by governing ' 852 'value %r' % (governingValue,)) 853 continue 854 855 if LOG: 856 LOG('resolved open type %r by governing ' 857 'value %r' % (openType, governingValue)) 858 859 containerValue = asn1Object.getComponentByPosition(idx) 860 861 if containerValue.typeId in ( 862 univ.SetOf.typeId, univ.SequenceOf.typeId): 863 864 for pos, containerElement in enumerate( 865 containerValue): 866 867 component, rest = decodeFun( 868 containerValue[pos].asOctets(), 869 asn1Spec=openType, **dict(options, allowEoo=True) 870 ) 871 872 containerValue[pos] = component 873 874 else: 875 component, rest = decodeFun( 876 asn1Object.getComponentByPosition(idx).asOctets(), 877 asn1Spec=openType, **dict(options, allowEoo=True) 878 ) 879 880 if component is not eoo.endOfOctets: 881 asn1Object.setComponentByPosition(idx, component) 882 883 else: 884 inconsistency = asn1Object.isInconsistent 885 if inconsistency: 886 raise inconsistency 887 888 else: 889 asn1Object = asn1Spec.clone() 890 asn1Object.clear() 891 892 componentType = asn1Spec.componentType 893 894 if LOG: 895 LOG('decoding type %r chosen by given `asn1Spec`' % componentType) 896 897 idx = 0 898 899 while substrate: 900 component, substrate = decodeFun(substrate, componentType, allowEoo=True, **options) 901 902 if component is eoo.endOfOctets: 903 break 904 905 asn1Object.setComponentByPosition( 906 idx, component, 907 verifyConstraints=False, 908 matchTags=False, matchConstraints=False 909 ) 910 911 idx += 1 912 913 else: 914 raise error.SubstrateUnderrunError( 915 'No EOO seen before substrate ends' 916 ) 917 918 return asn1Object, substrate 919 920 921class SequenceOrSequenceOfDecoder(UniversalConstructedTypeDecoder): 922 protoRecordComponent = univ.Sequence() 923 protoSequenceComponent = univ.SequenceOf() 924 925 926class SequenceDecoder(SequenceOrSequenceOfDecoder): 927 protoComponent = univ.Sequence() 928 929 930class SequenceOfDecoder(SequenceOrSequenceOfDecoder): 931 protoComponent = univ.SequenceOf() 932 933 934class SetOrSetOfDecoder(UniversalConstructedTypeDecoder): 935 protoRecordComponent = univ.Set() 936 protoSequenceComponent = univ.SetOf() 937 938 939class SetDecoder(SetOrSetOfDecoder): 940 protoComponent = univ.Set() 941 942 943 944class SetOfDecoder(SetOrSetOfDecoder): 945 protoComponent = univ.SetOf() 946 947 948class ChoiceDecoder(AbstractConstructedDecoder): 949 protoComponent = univ.Choice() 950 951 def valueDecoder(self, substrate, asn1Spec, 952 tagSet=None, length=None, state=None, 953 decodeFun=None, substrateFun=None, 954 **options): 955 head, tail = substrate[:length], substrate[length:] 956 957 if asn1Spec is None: 958 asn1Object = self.protoComponent.clone(tagSet=tagSet) 959 960 else: 961 asn1Object = asn1Spec.clone() 962 963 if substrateFun: 964 return substrateFun(asn1Object, substrate, length) 965 966 if asn1Object.tagSet == tagSet: 967 if LOG: 968 LOG('decoding %s as explicitly tagged CHOICE' % (tagSet,)) 969 970 component, head = decodeFun( 971 head, asn1Object.componentTagMap, **options 972 ) 973 974 else: 975 if LOG: 976 LOG('decoding %s as untagged CHOICE' % (tagSet,)) 977 978 component, head = decodeFun( 979 head, asn1Object.componentTagMap, 980 tagSet, length, state, **options 981 ) 982 983 effectiveTagSet = component.effectiveTagSet 984 985 if LOG: 986 LOG('decoded component %s, effective tag set %s' % (component, effectiveTagSet)) 987 988 asn1Object.setComponentByType( 989 effectiveTagSet, component, 990 verifyConstraints=False, 991 matchTags=False, matchConstraints=False, 992 innerFlag=False 993 ) 994 995 return asn1Object, tail 996 997 def indefLenValueDecoder(self, substrate, asn1Spec, 998 tagSet=None, length=None, state=None, 999 decodeFun=None, substrateFun=None, 1000 **options): 1001 if asn1Spec is None: 1002 asn1Object = self.protoComponent.clone(tagSet=tagSet) 1003 else: 1004 asn1Object = asn1Spec.clone() 1005 1006 if substrateFun: 1007 return substrateFun(asn1Object, substrate, length) 1008 1009 if asn1Object.tagSet == tagSet: 1010 if LOG: 1011 LOG('decoding %s as explicitly tagged CHOICE' % (tagSet,)) 1012 1013 component, substrate = decodeFun( 1014 substrate, asn1Object.componentType.tagMapUnique, **options 1015 ) 1016 1017 # eat up EOO marker 1018 eooMarker, substrate = decodeFun( 1019 substrate, allowEoo=True, **options 1020 ) 1021 1022 if eooMarker is not eoo.endOfOctets: 1023 raise error.PyAsn1Error('No EOO seen before substrate ends') 1024 1025 else: 1026 if LOG: 1027 LOG('decoding %s as untagged CHOICE' % (tagSet,)) 1028 1029 component, substrate = decodeFun( 1030 substrate, asn1Object.componentType.tagMapUnique, 1031 tagSet, length, state, **options 1032 ) 1033 1034 effectiveTagSet = component.effectiveTagSet 1035 1036 if LOG: 1037 LOG('decoded component %s, effective tag set %s' % (component, effectiveTagSet)) 1038 1039 asn1Object.setComponentByType( 1040 effectiveTagSet, component, 1041 verifyConstraints=False, 1042 matchTags=False, matchConstraints=False, 1043 innerFlag=False 1044 ) 1045 1046 return asn1Object, substrate 1047 1048 1049class AnyDecoder(AbstractSimpleDecoder): 1050 protoComponent = univ.Any() 1051 1052 def valueDecoder(self, substrate, asn1Spec, 1053 tagSet=None, length=None, state=None, 1054 decodeFun=None, substrateFun=None, 1055 **options): 1056 if asn1Spec is None: 1057 isUntagged = True 1058 1059 elif asn1Spec.__class__ is tagmap.TagMap: 1060 isUntagged = tagSet not in asn1Spec.tagMap 1061 1062 else: 1063 isUntagged = tagSet != asn1Spec.tagSet 1064 1065 if isUntagged: 1066 fullSubstrate = options['fullSubstrate'] 1067 1068 # untagged Any container, recover inner header substrate 1069 length += len(fullSubstrate) - len(substrate) 1070 substrate = fullSubstrate 1071 1072 if LOG: 1073 LOG('decoding as untagged ANY, substrate %s' % debug.hexdump(substrate)) 1074 1075 if substrateFun: 1076 return substrateFun(self._createComponent(asn1Spec, tagSet, noValue, **options), 1077 substrate, length) 1078 1079 head, tail = substrate[:length], substrate[length:] 1080 1081 return self._createComponent(asn1Spec, tagSet, head, **options), tail 1082 1083 def indefLenValueDecoder(self, substrate, asn1Spec, 1084 tagSet=None, length=None, state=None, 1085 decodeFun=None, substrateFun=None, 1086 **options): 1087 if asn1Spec is None: 1088 isTagged = False 1089 1090 elif asn1Spec.__class__ is tagmap.TagMap: 1091 isTagged = tagSet in asn1Spec.tagMap 1092 1093 else: 1094 isTagged = tagSet == asn1Spec.tagSet 1095 1096 if isTagged: 1097 # tagged Any type -- consume header substrate 1098 header = null 1099 1100 if LOG: 1101 LOG('decoding as tagged ANY') 1102 1103 else: 1104 fullSubstrate = options['fullSubstrate'] 1105 1106 # untagged Any, recover header substrate 1107 header = fullSubstrate[:-len(substrate)] 1108 1109 if LOG: 1110 LOG('decoding as untagged ANY, header substrate %s' % debug.hexdump(header)) 1111 1112 # Any components do not inherit initial tag 1113 asn1Spec = self.protoComponent 1114 1115 if substrateFun and substrateFun is not self.substrateCollector: 1116 asn1Object = self._createComponent(asn1Spec, tagSet, noValue, **options) 1117 return substrateFun(asn1Object, header + substrate, length + len(header)) 1118 1119 if LOG: 1120 LOG('assembling constructed serialization') 1121 1122 # All inner fragments are of the same type, treat them as octet string 1123 substrateFun = self.substrateCollector 1124 1125 while substrate: 1126 component, substrate = decodeFun(substrate, asn1Spec, 1127 substrateFun=substrateFun, 1128 allowEoo=True, **options) 1129 if component is eoo.endOfOctets: 1130 break 1131 1132 header += component 1133 1134 else: 1135 raise error.SubstrateUnderrunError( 1136 'No EOO seen before substrate ends' 1137 ) 1138 1139 if substrateFun: 1140 return header, substrate 1141 1142 else: 1143 return self._createComponent(asn1Spec, tagSet, header, **options), substrate 1144 1145 1146# character string types 1147class UTF8StringDecoder(OctetStringDecoder): 1148 protoComponent = char.UTF8String() 1149 1150 1151class NumericStringDecoder(OctetStringDecoder): 1152 protoComponent = char.NumericString() 1153 1154 1155class PrintableStringDecoder(OctetStringDecoder): 1156 protoComponent = char.PrintableString() 1157 1158 1159class TeletexStringDecoder(OctetStringDecoder): 1160 protoComponent = char.TeletexString() 1161 1162 1163class VideotexStringDecoder(OctetStringDecoder): 1164 protoComponent = char.VideotexString() 1165 1166 1167class IA5StringDecoder(OctetStringDecoder): 1168 protoComponent = char.IA5String() 1169 1170 1171class GraphicStringDecoder(OctetStringDecoder): 1172 protoComponent = char.GraphicString() 1173 1174 1175class VisibleStringDecoder(OctetStringDecoder): 1176 protoComponent = char.VisibleString() 1177 1178 1179class GeneralStringDecoder(OctetStringDecoder): 1180 protoComponent = char.GeneralString() 1181 1182 1183class UniversalStringDecoder(OctetStringDecoder): 1184 protoComponent = char.UniversalString() 1185 1186 1187class BMPStringDecoder(OctetStringDecoder): 1188 protoComponent = char.BMPString() 1189 1190 1191# "useful" types 1192class ObjectDescriptorDecoder(OctetStringDecoder): 1193 protoComponent = useful.ObjectDescriptor() 1194 1195 1196class GeneralizedTimeDecoder(OctetStringDecoder): 1197 protoComponent = useful.GeneralizedTime() 1198 1199 1200class UTCTimeDecoder(OctetStringDecoder): 1201 protoComponent = useful.UTCTime() 1202 1203 1204tagMap = { 1205 univ.Integer.tagSet: IntegerDecoder(), 1206 univ.Boolean.tagSet: BooleanDecoder(), 1207 univ.BitString.tagSet: BitStringDecoder(), 1208 univ.OctetString.tagSet: OctetStringDecoder(), 1209 univ.Null.tagSet: NullDecoder(), 1210 univ.ObjectIdentifier.tagSet: ObjectIdentifierDecoder(), 1211 univ.Enumerated.tagSet: IntegerDecoder(), 1212 univ.Real.tagSet: RealDecoder(), 1213 univ.Sequence.tagSet: SequenceOrSequenceOfDecoder(), # conflicts with SequenceOf 1214 univ.Set.tagSet: SetOrSetOfDecoder(), # conflicts with SetOf 1215 univ.Choice.tagSet: ChoiceDecoder(), # conflicts with Any 1216 # character string types 1217 char.UTF8String.tagSet: UTF8StringDecoder(), 1218 char.NumericString.tagSet: NumericStringDecoder(), 1219 char.PrintableString.tagSet: PrintableStringDecoder(), 1220 char.TeletexString.tagSet: TeletexStringDecoder(), 1221 char.VideotexString.tagSet: VideotexStringDecoder(), 1222 char.IA5String.tagSet: IA5StringDecoder(), 1223 char.GraphicString.tagSet: GraphicStringDecoder(), 1224 char.VisibleString.tagSet: VisibleStringDecoder(), 1225 char.GeneralString.tagSet: GeneralStringDecoder(), 1226 char.UniversalString.tagSet: UniversalStringDecoder(), 1227 char.BMPString.tagSet: BMPStringDecoder(), 1228 # useful types 1229 useful.ObjectDescriptor.tagSet: ObjectDescriptorDecoder(), 1230 useful.GeneralizedTime.tagSet: GeneralizedTimeDecoder(), 1231 useful.UTCTime.tagSet: UTCTimeDecoder() 1232} 1233 1234# Type-to-codec map for ambiguous ASN.1 types 1235typeMap = { 1236 univ.Set.typeId: SetDecoder(), 1237 univ.SetOf.typeId: SetOfDecoder(), 1238 univ.Sequence.typeId: SequenceDecoder(), 1239 univ.SequenceOf.typeId: SequenceOfDecoder(), 1240 univ.Choice.typeId: ChoiceDecoder(), 1241 univ.Any.typeId: AnyDecoder() 1242} 1243 1244# Put in non-ambiguous types for faster codec lookup 1245for typeDecoder in tagMap.values(): 1246 if typeDecoder.protoComponent is not None: 1247 typeId = typeDecoder.protoComponent.__class__.typeId 1248 if typeId is not None and typeId not in typeMap: 1249 typeMap[typeId] = typeDecoder 1250 1251 1252(stDecodeTag, 1253 stDecodeLength, 1254 stGetValueDecoder, 1255 stGetValueDecoderByAsn1Spec, 1256 stGetValueDecoderByTag, 1257 stTryAsExplicitTag, 1258 stDecodeValue, 1259 stDumpRawValue, 1260 stErrorCondition, 1261 stStop) = [x for x in range(10)] 1262 1263 1264class Decoder(object): 1265 defaultErrorState = stErrorCondition 1266 #defaultErrorState = stDumpRawValue 1267 defaultRawDecoder = AnyDecoder() 1268 supportIndefLength = True 1269 1270 # noinspection PyDefaultArgument 1271 def __init__(self, tagMap, typeMap={}): 1272 self.__tagMap = tagMap 1273 self.__typeMap = typeMap 1274 # Tag & TagSet objects caches 1275 self.__tagCache = {} 1276 self.__tagSetCache = {} 1277 self.__eooSentinel = ints2octs((0, 0)) 1278 1279 def __call__(self, substrate, asn1Spec=None, 1280 tagSet=None, length=None, state=stDecodeTag, 1281 decodeFun=None, substrateFun=None, 1282 **options): 1283 1284 if LOG: 1285 LOG('decoder called at scope %s with state %d, working with up to %d octets of substrate: %s' % (debug.scope, state, len(substrate), debug.hexdump(substrate))) 1286 1287 allowEoo = options.pop('allowEoo', False) 1288 1289 # Look for end-of-octets sentinel 1290 if allowEoo and self.supportIndefLength: 1291 if substrate[:2] == self.__eooSentinel: 1292 if LOG: 1293 LOG('end-of-octets sentinel found') 1294 return eoo.endOfOctets, substrate[2:] 1295 1296 value = noValue 1297 1298 tagMap = self.__tagMap 1299 typeMap = self.__typeMap 1300 tagCache = self.__tagCache 1301 tagSetCache = self.__tagSetCache 1302 1303 fullSubstrate = substrate 1304 1305 while state is not stStop: 1306 1307 if state is stDecodeTag: 1308 if not substrate: 1309 raise error.SubstrateUnderrunError( 1310 'Short octet stream on tag decoding' 1311 ) 1312 1313 # Decode tag 1314 isShortTag = True 1315 firstOctet = substrate[0] 1316 substrate = substrate[1:] 1317 1318 try: 1319 lastTag = tagCache[firstOctet] 1320 1321 except KeyError: 1322 integerTag = oct2int(firstOctet) 1323 tagClass = integerTag & 0xC0 1324 tagFormat = integerTag & 0x20 1325 tagId = integerTag & 0x1F 1326 1327 if tagId == 0x1F: 1328 isShortTag = False 1329 lengthOctetIdx = 0 1330 tagId = 0 1331 1332 try: 1333 while True: 1334 integerTag = oct2int(substrate[lengthOctetIdx]) 1335 lengthOctetIdx += 1 1336 tagId <<= 7 1337 tagId |= (integerTag & 0x7F) 1338 if not integerTag & 0x80: 1339 break 1340 1341 substrate = substrate[lengthOctetIdx:] 1342 1343 except IndexError: 1344 raise error.SubstrateUnderrunError( 1345 'Short octet stream on long tag decoding' 1346 ) 1347 1348 lastTag = tag.Tag( 1349 tagClass=tagClass, tagFormat=tagFormat, tagId=tagId 1350 ) 1351 1352 if isShortTag: 1353 # cache short tags 1354 tagCache[firstOctet] = lastTag 1355 1356 if tagSet is None: 1357 if isShortTag: 1358 try: 1359 tagSet = tagSetCache[firstOctet] 1360 1361 except KeyError: 1362 # base tag not recovered 1363 tagSet = tag.TagSet((), lastTag) 1364 tagSetCache[firstOctet] = tagSet 1365 else: 1366 tagSet = tag.TagSet((), lastTag) 1367 1368 else: 1369 tagSet = lastTag + tagSet 1370 1371 state = stDecodeLength 1372 1373 if LOG: 1374 LOG('tag decoded into %s, decoding length' % tagSet) 1375 1376 if state is stDecodeLength: 1377 # Decode length 1378 if not substrate: 1379 raise error.SubstrateUnderrunError( 1380 'Short octet stream on length decoding' 1381 ) 1382 1383 firstOctet = oct2int(substrate[0]) 1384 1385 if firstOctet < 128: 1386 size = 1 1387 length = firstOctet 1388 1389 elif firstOctet > 128: 1390 size = firstOctet & 0x7F 1391 # encoded in size bytes 1392 encodedLength = octs2ints(substrate[1:size + 1]) 1393 # missing check on maximum size, which shouldn't be a 1394 # problem, we can handle more than is possible 1395 if len(encodedLength) != size: 1396 raise error.SubstrateUnderrunError( 1397 '%s<%s at %s' % (size, len(encodedLength), tagSet) 1398 ) 1399 1400 length = 0 1401 for lengthOctet in encodedLength: 1402 length <<= 8 1403 length |= lengthOctet 1404 size += 1 1405 1406 else: 1407 size = 1 1408 length = -1 1409 1410 substrate = substrate[size:] 1411 1412 if length == -1: 1413 if not self.supportIndefLength: 1414 raise error.PyAsn1Error('Indefinite length encoding not supported by this codec') 1415 1416 else: 1417 if len(substrate) < length: 1418 raise error.SubstrateUnderrunError('%d-octet short' % (length - len(substrate))) 1419 1420 state = stGetValueDecoder 1421 1422 if LOG: 1423 LOG('value length decoded into %d, payload substrate is: %s' % (length, debug.hexdump(length == -1 and substrate or substrate[:length]))) 1424 1425 if state is stGetValueDecoder: 1426 if asn1Spec is None: 1427 state = stGetValueDecoderByTag 1428 1429 else: 1430 state = stGetValueDecoderByAsn1Spec 1431 # 1432 # There're two ways of creating subtypes in ASN.1 what influences 1433 # decoder operation. These methods are: 1434 # 1) Either base types used in or no IMPLICIT tagging has been 1435 # applied on subtyping. 1436 # 2) Subtype syntax drops base type information (by means of 1437 # IMPLICIT tagging. 1438 # The first case allows for complete tag recovery from substrate 1439 # while the second one requires original ASN.1 type spec for 1440 # decoding. 1441 # 1442 # In either case a set of tags (tagSet) is coming from substrate 1443 # in an incremental, tag-by-tag fashion (this is the case of 1444 # EXPLICIT tag which is most basic). Outermost tag comes first 1445 # from the wire. 1446 # 1447 if state is stGetValueDecoderByTag: 1448 try: 1449 concreteDecoder = tagMap[tagSet] 1450 1451 except KeyError: 1452 concreteDecoder = None 1453 1454 if concreteDecoder: 1455 state = stDecodeValue 1456 1457 else: 1458 try: 1459 concreteDecoder = tagMap[tagSet[:1]] 1460 1461 except KeyError: 1462 concreteDecoder = None 1463 1464 if concreteDecoder: 1465 state = stDecodeValue 1466 else: 1467 state = stTryAsExplicitTag 1468 1469 if LOG: 1470 LOG('codec %s chosen by a built-in type, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag')) 1471 debug.scope.push(concreteDecoder is None and '?' or concreteDecoder.protoComponent.__class__.__name__) 1472 1473 if state is stGetValueDecoderByAsn1Spec: 1474 1475 if asn1Spec.__class__ is tagmap.TagMap: 1476 try: 1477 chosenSpec = asn1Spec[tagSet] 1478 1479 except KeyError: 1480 chosenSpec = None 1481 1482 if LOG: 1483 LOG('candidate ASN.1 spec is a map of:') 1484 1485 for firstOctet, v in asn1Spec.presentTypes.items(): 1486 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__)) 1487 1488 if asn1Spec.skipTypes: 1489 LOG('but neither of: ') 1490 for firstOctet, v in asn1Spec.skipTypes.items(): 1491 LOG(' %s -> %s' % (firstOctet, v.__class__.__name__)) 1492 LOG('new candidate ASN.1 spec is %s, chosen by %s' % (chosenSpec is None and '<none>' or chosenSpec.prettyPrintType(), tagSet)) 1493 1494 elif tagSet == asn1Spec.tagSet or tagSet in asn1Spec.tagMap: 1495 chosenSpec = asn1Spec 1496 if LOG: 1497 LOG('candidate ASN.1 spec is %s' % asn1Spec.__class__.__name__) 1498 1499 else: 1500 chosenSpec = None 1501 1502 if chosenSpec is not None: 1503 try: 1504 # ambiguous type or just faster codec lookup 1505 concreteDecoder = typeMap[chosenSpec.typeId] 1506 1507 if LOG: 1508 LOG('value decoder chosen for an ambiguous type by type ID %s' % (chosenSpec.typeId,)) 1509 1510 except KeyError: 1511 # use base type for codec lookup to recover untagged types 1512 baseTagSet = tag.TagSet(chosenSpec.tagSet.baseTag, chosenSpec.tagSet.baseTag) 1513 try: 1514 # base type or tagged subtype 1515 concreteDecoder = tagMap[baseTagSet] 1516 1517 if LOG: 1518 LOG('value decoder chosen by base %s' % (baseTagSet,)) 1519 1520 except KeyError: 1521 concreteDecoder = None 1522 1523 if concreteDecoder: 1524 asn1Spec = chosenSpec 1525 state = stDecodeValue 1526 1527 else: 1528 state = stTryAsExplicitTag 1529 1530 else: 1531 concreteDecoder = None 1532 state = stTryAsExplicitTag 1533 1534 if LOG: 1535 LOG('codec %s chosen by ASN.1 spec, decoding %s' % (state is stDecodeValue and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as explicit tag')) 1536 debug.scope.push(chosenSpec is None and '?' or chosenSpec.__class__.__name__) 1537 1538 if state is stDecodeValue: 1539 if not options.get('recursiveFlag', True) and not substrateFun: # deprecate this 1540 substrateFun = lambda a, b, c: (a, b[:c]) 1541 1542 options.update(fullSubstrate=fullSubstrate) 1543 1544 if length == -1: # indef length 1545 value, substrate = concreteDecoder.indefLenValueDecoder( 1546 substrate, asn1Spec, 1547 tagSet, length, stGetValueDecoder, 1548 self, substrateFun, 1549 **options 1550 ) 1551 1552 else: 1553 value, substrate = concreteDecoder.valueDecoder( 1554 substrate, asn1Spec, 1555 tagSet, length, stGetValueDecoder, 1556 self, substrateFun, 1557 **options 1558 ) 1559 1560 if LOG: 1561 LOG('codec %s yields type %s, value:\n%s\n...remaining substrate is: %s' % (concreteDecoder.__class__.__name__, value.__class__.__name__, isinstance(value, base.Asn1Item) and value.prettyPrint() or value, substrate and debug.hexdump(substrate) or '<none>')) 1562 1563 state = stStop 1564 break 1565 1566 if state is stTryAsExplicitTag: 1567 if (tagSet and 1568 tagSet[0].tagFormat == tag.tagFormatConstructed and 1569 tagSet[0].tagClass != tag.tagClassUniversal): 1570 # Assume explicit tagging 1571 concreteDecoder = explicitTagDecoder 1572 state = stDecodeValue 1573 1574 else: 1575 concreteDecoder = None 1576 state = self.defaultErrorState 1577 1578 if LOG: 1579 LOG('codec %s chosen, decoding %s' % (concreteDecoder and concreteDecoder.__class__.__name__ or "<none>", state is stDecodeValue and 'value' or 'as failure')) 1580 1581 if state is stDumpRawValue: 1582 concreteDecoder = self.defaultRawDecoder 1583 1584 if LOG: 1585 LOG('codec %s chosen, decoding value' % concreteDecoder.__class__.__name__) 1586 1587 state = stDecodeValue 1588 1589 if state is stErrorCondition: 1590 raise error.PyAsn1Error( 1591 '%s not in asn1Spec: %r' % (tagSet, asn1Spec) 1592 ) 1593 1594 if LOG: 1595 debug.scope.pop() 1596 LOG('decoder left scope %s, call completed' % debug.scope) 1597 1598 return value, substrate 1599 1600 1601#: Turns BER octet stream into an ASN.1 object. 1602#: 1603#: Takes BER octet-stream and decode it into an ASN.1 object 1604#: (e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative) which 1605#: may be a scalar or an arbitrary nested structure. 1606#: 1607#: Parameters 1608#: ---------- 1609#: substrate: :py:class:`bytes` (Python 3) or :py:class:`str` (Python 2) 1610#: BER octet-stream 1611#: 1612#: Keyword Args 1613#: ------------ 1614#: asn1Spec: any pyasn1 type object e.g. :py:class:`~pyasn1.type.base.PyAsn1Item` derivative 1615#: A pyasn1 type object to act as a template guiding the decoder. Depending on the ASN.1 structure 1616#: being decoded, *asn1Spec* may or may not be required. Most common reason for 1617#: it to require is that ASN.1 structure is encoded in *IMPLICIT* tagging mode. 1618#: 1619#: Returns 1620#: ------- 1621#: : :py:class:`tuple` 1622#: A tuple of pyasn1 object recovered from BER substrate (:py:class:`~pyasn1.type.base.PyAsn1Item` derivative) 1623#: and the unprocessed trailing portion of the *substrate* (may be empty) 1624#: 1625#: Raises 1626#: ------ 1627#: ~pyasn1.error.PyAsn1Error, ~pyasn1.error.SubstrateUnderrunError 1628#: On decoding errors 1629#: 1630#: Examples 1631#: -------- 1632#: Decode BER serialisation without ASN.1 schema 1633#: 1634#: .. code-block:: pycon 1635#: 1636#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03') 1637#: >>> str(s) 1638#: SequenceOf: 1639#: 1 2 3 1640#: 1641#: Decode BER serialisation with ASN.1 schema 1642#: 1643#: .. code-block:: pycon 1644#: 1645#: >>> seq = SequenceOf(componentType=Integer()) 1646#: >>> s, _ = decode(b'0\t\x02\x01\x01\x02\x01\x02\x02\x01\x03', asn1Spec=seq) 1647#: >>> str(s) 1648#: SequenceOf: 1649#: 1 2 3 1650#: 1651decode = Decoder(tagMap, typeMap) 1652 1653# XXX 1654# non-recursive decoding; return position rather than substrate 1655