Source code for zlogging._aux

# -*- coding: utf-8 -*-
"""Auxiliary functions."""

import collections
import decimal
import itertools
import math
import textwrap
from typing import TYPE_CHECKING, cast, overload

from typing_inspect import get_args, get_origin, is_generic_type, is_typevar

if TYPE_CHECKING:
    from collections import OrderedDict
    from decimal import Decimal
    from io import BufferedReader as BinaryFile
    from typing import Optional, Type, TypeVar, Union

    from typing_extensions import Literal

    from zlogging._typing import ExpandedTyping
    from zlogging.model import Model
    from zlogging.types import _VariadicType

__all__ = ['readline', 'decimal_toascii', 'float_toascii', 'unicode_escape', 'expand_typing']


@overload
def readline(file: 'BinaryFile', seperator: 'bytes' = ..., maxsplit: 'int' = ...,  # type: ignore[misc]
             decode: 'Literal[False]' = ...) -> 'list[bytes]': ...  # pylint: disable=redefined-outer-name

@overload
def readline(file: 'BinaryFile', seperator: 'bytes' = ..., maxsplit: 'int' = ...,
             decode: 'Literal[True]' = ...) -> 'list[str]': ...  # pylint: disable=redefined-outer-name

[docs]def readline(file: 'BinaryFile', separator: 'bytes' = b'\x09', # type: ignore[misc] maxsplit: 'int' = -1, decode: 'bool' = False) -> 'Union[list[str], list[bytes]]': # pylint: disable=redefined-outer-name """Wrapper for :meth:`file.readline` function. Args: file: Log file object opened in binary mode. separator: Data separator. maxsplit: Maximum number of splits to do; see :meth:`bytes.split` and :meth:`str.split` for more information. decode: If decide the buffered string with ``ascii`` encoding. Returns: The splitted line as a :obj:`list` of :obj:`bytes`, or as :obj:`str` if ``decode`` if set to ``True``. """ line = file.readline().strip() if decode: return line.decode('ascii').split(separator.decode('ascii'), maxsplit=maxsplit) return line.split(separator, maxsplit)
[docs]def decimal_toascii(data: 'Decimal', infinite: 'Optional[str]' = None) -> 'str': """Convert :obj:`decimal.Decimal` to ASCII. Args: data: A :obj:`decimal.Decimal` object. infinite: The ASCII representation of infinite numbers (``NaN`` and infinity). Returns: The converted ASCII string. Example: When converting a :obj:`decimal.Decimal` object, for example: .. code-block:: python >>> d = decimal.Decimal('-123.123456789') the function will preserve only **6 digits** of its fractional part, i.e.: .. code-block:: python >>> decimal_toascii(d) '-123.123456' Note: Infinite numbers, i.e. ``NaN`` and infinity (``inf``), will be converted as the value specified in ``infinite``, in default the string representation of the number itself, i.e.: * ``NaN`` -> ``'NaN'`` * Infinity -> ``'Infinity'`` """ if data.is_infinite(): if infinite is None: return str(data) return infinite tpl = data.as_tuple() # type: decimal.DecimalTuple exp = tpl.exponent if exp >= 0: return '%s%s%s.000000' % ('-' if tpl.sign else '', # pylint: disable=consider-using-f-string ''.join(map(str, tpl.digits)), '0' * exp) expabs = abs(exp) dgtlen = len(tpl.digits) if expabs >= dgtlen: diff = expabs - dgtlen if diff == 0: diff = 1 return '%s%s.%s%s' % ('-' if tpl.sign else '', # pylint: disable=consider-using-f-string '0' * diff, ''.join(map(str, tpl.digits[:6])), '0' * (6 - dgtlen)) buf_int = '' buf_flt = '' for index, digit in enumerate(reversed(tpl.digits), start=1): if index <= expabs: buf_flt = str(digit) + buf_flt else: buf_int = str(digit) + buf_int return '%s%s.%s%s' % ('-' if tpl.sign else '', # pylint: disable=consider-using-f-string buf_int, buf_flt[:6], '0' * (6 - len(buf_flt)))
[docs]def float_toascii(data: 'float', infinite: 'Optional[str]' = None) -> 'str': """Convert :obj:`float` to ASCII. Args: data: A :obj:`float` number. infinite: The ASCII representation of infinite numbers (``NaN`` and infinity). Returns: The converted ASCII string. Example: When converting a :obj:`float` number, for example: .. code-block:: python >>> f = -123.123456789 the function will preserve only **6 digits** of its fractional part, i.e.: .. code-block:: python >>> float_toascii(f) '-123.123456' Note: Infinite numbers, i.e. ``NaN`` and infinity (``inf``), will be converted as the value specified in ``infinite``, in default the string representation of the number itself, i.e.: * ``NaN`` -> ``'nan'`` * Infinity -> ``'inf'`` """ if not math.isfinite(data): if infinite is None: return str(data) return infinite int_part, flt_part = str(data).split('.') return '%s.%s%s' % (int_part, # pylint: disable=consider-using-f-string flt_part[:6], '0' * (6 - len(flt_part)))
[docs]def unicode_escape(string: 'bytes') -> 'str': """Conterprocess of :meth:`bytes.decode('unicode_escape') <bytes.decode>`. Args: string: The bytestring to be escaped. Returns: The escaped bytestring as an encoded string Example: >>> b'\\x09'.decode('unicode_escape') '\\\\t' >>> unicode_escape(b'\\t') '\\\\x09' """ return ''.join(map(lambda s: '\\x%s' % s, textwrap.wrap(string.hex(), 2))) # pylint: disable=consider-using-f-string
[docs]def expand_typing(cls: 'Union[Model, Type[Model], _VariadicType, Type[_VariadicType]]', exc: 'Optional[Type[ValueError]]' = None) -> 'ExpandedTyping': """Expand typing annotations. Args: cls: a variadic class which supports :pep:`484` style attribute typing annotations exc: exception to be used in case of inconsistent values for ``unset_field``, ``empty_field`` and ``set_separator`` Returns: The returned dictionary contains the following directives: * ``fields``: a mapping proxy of field names and their corresponding data types, i.e. an instance of a :class:`~zlogging.types.BaseType` subclass * ``record_fields``: a mapping proxy for fields of ``record`` data type, i.e. an instance of :class:`~zlogging.types.RecordType` * ``unset_fields``: placeholder for unset field * ``empty_fields``: placeholder for empty field * ``set_separator``: separator for ``set``/``vector`` fields Warns: BroDeprecationWarning: Use of ``bro_*`` prefixed typing annotations. Raises: :exc:`ValueError`: In case of inconsistent values for ``unset_field``, ``empty_field`` and ``set_separator``. Example: Define a custom log data model from :class:`~zlogging.model.Model` using the prefines Bro/Zeek data types, or subclasses of :class:`~zlogging.types.BaseType`: .. code-block:: python class MyLog(Model): field_one = StringType() field_two = SetType(element_type=PortType) Or you may use type annotations as :pep:`484` introduced when declaring data models. All available type hints can be found in :mod:`zlogging.typing`: .. code-block:: python class MyLog(Model): field_one: zeek_string field_two: zeek_set[zeek_port] However, when mixing annotations and direct assignments, annotations will take proceedings, i.e. the function shall process first typing annotations then ``cls`` attribute assignments. Should there be any conflicts, the ``exc`` will be raised. Note: Fields of :class:`zlogging.types.RecordType` type will be expanded as plain fields of the ``cls``, i.e. for the variadic class as below: .. code-block:: python class MyLog(Model): record = RecrodType(one=StringType(), two=VectorType(element_type=CountType())) will have the following fields: * ``record.one`` -> ``string`` data type * ``record.two`` -> ``vector[count]`` data type """ from zlogging.types import (BaseType, _GenericType, # pylint: disable=import-outside-toplevel _SimpleType, _VariadicType) if exc is None: exc = ValueError inited = False unset_field = b'-' empty_field = b'(empty)' set_separator = b',' def register(name: str, field: 'Union[_SimpleType, _GenericType]') -> None: """Field registry.""" existed = fields.get(name) if existed is not None and field.zeek_type != existed.zeek_type: raise exc(f'inconsistent data type of {name!r} field: {field!r} and {existed!r}') # type: ignore[misc] fields[name] = field fields = collections.OrderedDict() # type: OrderedDict[str, Union[_SimpleType, _GenericType]] record_fields = collections.OrderedDict() # type: OrderedDict[str, _VariadicType] for name, attr in itertools.chain(getattr(cls, '__annotations__', {}).items(), cls.__dict__.items()): # type instances if isinstance(attr, BaseType): if isinstance(attr, _VariadicType): for elm_name, elm_field in attr.element_mapping.items(): register(f'{name}.{elm_name}', elm_field) record_fields[name] = attr else: register(name, attr) # type: ignore[arg-type] # uninitialised type classes elif isinstance(attr, type) and issubclass(attr, BaseType): attr = attr() # simple typing types elif is_typevar(attr): if TYPE_CHECKING: attr = cast('TypeVar', attr) bound = attr.__bound__ if bound and issubclass(bound, _SimpleType): attr = bound() else: continue # generic typing types elif is_generic_type(attr) and issubclass(attr, _GenericType): origin = get_origin(attr) parameter = get_args(attr)[0] # uninitialised type classes if isinstance(parameter, type) and issubclass(parameter, _SimpleType): element_type = parameter() # simple typing types elif is_typevar(parameter): if TYPE_CHECKING: parameter = cast('TypeVar', parameter) bound = parameter.__bound__ if bound and issubclass(bound, _SimpleType): element_type = bound() else: element_type = bound # type: ignore[assignment] else: element_type = parameter # type: ignore[assignment] attr = origin(element_type=element_type)\ else: continue if not inited: unset_field = attr.unset_field empty_field = attr.empty_field set_separator = attr.set_separator inited = True continue if unset_field != attr.unset_field: raise exc(f"inconsistent value of 'unset_field': {unset_field!r} and {attr.unset_field!r}") if empty_field != attr.empty_field: raise exc(f"inconsistent value of 'empty_field': {empty_field!r} and {attr.empty_field!r}") if set_separator != attr.set_separator: raise exc("inconsistent value of 'set_separator': {set_separator!r} and {attr.set_separator!r}") return { '_inited': inited, 'fields': fields, 'record_fields': record_fields, 'unset_field': unset_field, 'empty_field': empty_field, 'set_separator': set_separator, }