| |
| |
| |
| |
| |
| |
| import warnings |
|
|
| from pyasn1 import error |
| from pyasn1.codec.ber import encoder |
| from pyasn1.type import univ |
| from pyasn1.type import useful |
|
|
| __all__ = ['Encoder', 'encode'] |
|
|
|
|
| class BooleanEncoder(encoder.IntegerEncoder): |
| def encodeValue(self, value, asn1Spec, encodeFun, **options): |
| if value == 0: |
| substrate = (0,) |
| else: |
| substrate = (255,) |
| return substrate, False, False |
|
|
|
|
| class RealEncoder(encoder.RealEncoder): |
| def _chooseEncBase(self, value): |
| m, b, e = value |
| return self._dropFloatingPoint(m, b, e) |
|
|
|
|
| |
|
|
| class TimeEncoderMixIn(object): |
| Z_CHAR = ord('Z') |
| PLUS_CHAR = ord('+') |
| MINUS_CHAR = ord('-') |
| COMMA_CHAR = ord(',') |
| DOT_CHAR = ord('.') |
| ZERO_CHAR = ord('0') |
|
|
| MIN_LENGTH = 12 |
| MAX_LENGTH = 19 |
|
|
| def encodeValue(self, value, asn1Spec, encodeFun, **options): |
| |
| |
| |
| |
| |
| |
|
|
| if asn1Spec is not None: |
| value = asn1Spec.clone(value) |
|
|
| numbers = value.asNumbers() |
|
|
| if self.PLUS_CHAR in numbers or self.MINUS_CHAR in numbers: |
| raise error.PyAsn1Error('Must be UTC time: %r' % value) |
|
|
| if numbers[-1] != self.Z_CHAR: |
| raise error.PyAsn1Error('Missing "Z" time zone specifier: %r' % value) |
|
|
| if self.COMMA_CHAR in numbers: |
| raise error.PyAsn1Error('Comma in fractions disallowed: %r' % value) |
|
|
| if self.DOT_CHAR in numbers: |
|
|
| isModified = False |
|
|
| numbers = list(numbers) |
|
|
| searchIndex = min(numbers.index(self.DOT_CHAR) + 4, len(numbers) - 1) |
|
|
| while numbers[searchIndex] != self.DOT_CHAR: |
| if numbers[searchIndex] == self.ZERO_CHAR: |
| del numbers[searchIndex] |
| isModified = True |
|
|
| searchIndex -= 1 |
|
|
| searchIndex += 1 |
|
|
| if searchIndex < len(numbers): |
| if numbers[searchIndex] == self.Z_CHAR: |
| |
| del numbers[searchIndex - 1] |
| isModified = True |
|
|
| if isModified: |
| value = value.clone(numbers) |
|
|
| if not self.MIN_LENGTH < len(numbers) < self.MAX_LENGTH: |
| raise error.PyAsn1Error('Length constraint violated: %r' % value) |
|
|
| options.update(maxChunkSize=1000) |
|
|
| return encoder.OctetStringEncoder.encodeValue( |
| self, value, asn1Spec, encodeFun, **options |
| ) |
|
|
|
|
| class GeneralizedTimeEncoder(TimeEncoderMixIn, encoder.OctetStringEncoder): |
| MIN_LENGTH = 12 |
| MAX_LENGTH = 20 |
|
|
|
|
| class UTCTimeEncoder(TimeEncoderMixIn, encoder.OctetStringEncoder): |
| MIN_LENGTH = 10 |
| MAX_LENGTH = 14 |
|
|
|
|
| class SetOfEncoder(encoder.SequenceOfEncoder): |
| def encodeValue(self, value, asn1Spec, encodeFun, **options): |
| chunks = self._encodeComponents( |
| value, asn1Spec, encodeFun, **options) |
|
|
| |
| if len(chunks) > 1: |
| zero = b'\x00' |
| maxLen = max(map(len, chunks)) |
| paddedChunks = [ |
| (x.ljust(maxLen, zero), x) for x in chunks |
| ] |
| paddedChunks.sort(key=lambda x: x[0]) |
|
|
| chunks = [x[1] for x in paddedChunks] |
|
|
| return b''.join(chunks), True, True |
|
|
|
|
| class SequenceOfEncoder(encoder.SequenceOfEncoder): |
| def encodeValue(self, value, asn1Spec, encodeFun, **options): |
|
|
| if options.get('ifNotEmpty', False) and not len(value): |
| return b'', True, True |
|
|
| chunks = self._encodeComponents( |
| value, asn1Spec, encodeFun, **options) |
|
|
| return b''.join(chunks), True, True |
|
|
|
|
| class SetEncoder(encoder.SequenceEncoder): |
| @staticmethod |
| def _componentSortKey(componentAndType): |
| """Sort SET components by tag |
| |
| Sort regardless of the Choice value (static sort) |
| """ |
| component, asn1Spec = componentAndType |
|
|
| if asn1Spec is None: |
| asn1Spec = component |
|
|
| if asn1Spec.typeId == univ.Choice.typeId and not asn1Spec.tagSet: |
| if asn1Spec.tagSet: |
| return asn1Spec.tagSet |
| else: |
| return asn1Spec.componentType.minTagSet |
| else: |
| return asn1Spec.tagSet |
|
|
| def encodeValue(self, value, asn1Spec, encodeFun, **options): |
|
|
| substrate = b'' |
|
|
| comps = [] |
| compsMap = {} |
|
|
| if asn1Spec is None: |
| |
| inconsistency = value.isInconsistent |
| if inconsistency: |
| raise error.PyAsn1Error( |
| f"ASN.1 object {value.__class__.__name__} is inconsistent") |
|
|
| namedTypes = value.componentType |
|
|
| for idx, component in enumerate(value.values()): |
| if namedTypes: |
| namedType = namedTypes[idx] |
|
|
| if namedType.isOptional and not component.isValue: |
| continue |
|
|
| if namedType.isDefaulted and component == namedType.asn1Object: |
| continue |
|
|
| compsMap[id(component)] = namedType |
|
|
| else: |
| compsMap[id(component)] = None |
|
|
| comps.append((component, asn1Spec)) |
|
|
| else: |
| |
| for idx, namedType in enumerate(asn1Spec.componentType.namedTypes): |
|
|
| try: |
| component = value[namedType.name] |
|
|
| except KeyError: |
| raise error.PyAsn1Error('Component name "%s" not found in %r' % (namedType.name, value)) |
|
|
| if namedType.isOptional and namedType.name not in value: |
| continue |
|
|
| if namedType.isDefaulted and component == namedType.asn1Object: |
| continue |
|
|
| compsMap[id(component)] = namedType |
| comps.append((component, asn1Spec[idx])) |
|
|
| for comp, compType in sorted(comps, key=self._componentSortKey): |
| namedType = compsMap[id(comp)] |
|
|
| if namedType: |
| options.update(ifNotEmpty=namedType.isOptional) |
|
|
| chunk = encodeFun(comp, compType, **options) |
|
|
| |
| if namedType and namedType.openType: |
| wrapType = namedType.asn1Object |
| if wrapType.tagSet and not wrapType.isSameTypeWith(comp): |
| chunk = encodeFun(chunk, wrapType, **options) |
|
|
| substrate += chunk |
|
|
| return substrate, True, True |
|
|
|
|
| class SequenceEncoder(encoder.SequenceEncoder): |
| omitEmptyOptionals = True |
|
|
|
|
| TAG_MAP = encoder.TAG_MAP.copy() |
|
|
| TAG_MAP.update({ |
| univ.Boolean.tagSet: BooleanEncoder(), |
| univ.Real.tagSet: RealEncoder(), |
| useful.GeneralizedTime.tagSet: GeneralizedTimeEncoder(), |
| useful.UTCTime.tagSet: UTCTimeEncoder(), |
| |
| univ.SetOf.tagSet: SetOfEncoder(), |
| univ.Sequence.typeId: SequenceEncoder() |
| }) |
|
|
| TYPE_MAP = encoder.TYPE_MAP.copy() |
|
|
| TYPE_MAP.update({ |
| univ.Boolean.typeId: BooleanEncoder(), |
| univ.Real.typeId: RealEncoder(), |
| useful.GeneralizedTime.typeId: GeneralizedTimeEncoder(), |
| useful.UTCTime.typeId: UTCTimeEncoder(), |
| |
| univ.Set.typeId: SetEncoder(), |
| univ.SetOf.typeId: SetOfEncoder(), |
| univ.Sequence.typeId: SequenceEncoder(), |
| univ.SequenceOf.typeId: SequenceOfEncoder() |
| }) |
|
|
|
|
| class SingleItemEncoder(encoder.SingleItemEncoder): |
| fixedDefLengthMode = False |
| fixedChunkSize = 1000 |
|
|
| TAG_MAP = TAG_MAP |
| TYPE_MAP = TYPE_MAP |
|
|
|
|
| class Encoder(encoder.Encoder): |
| SINGLE_ITEM_ENCODER = SingleItemEncoder |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| encode = Encoder() |
|
|
| |
|
|
| def __getattr__(attr: str): |
| if newAttr := {"tagMap": "TAG_MAP", "typeMap": "TYPE_MAP"}.get(attr): |
| warnings.warn(f"{attr} is deprecated. Please use {newAttr} instead.", DeprecationWarning) |
| return globals()[newAttr] |
| raise AttributeError(attr) |
|
|