Source code for zlogging.model

# -*- coding: utf-8 -*-
"""Bro/Zeek log data model."""

import abc
import collections
import types
from typing import TYPE_CHECKING

from zlogging._aux import expand_typing
from zlogging._exc import ModelFormatError, ModelTypeError, ModelValueError

__all__ = [
    'Model', 'new_model',
]

if TYPE_CHECKING:
    from collections import OrderedDict
    from typing import Any, Optional, Type, Union

    from zlogging.types import _GenericType, _SimpleType, _VariadicType


[docs]class Model(metaclass=abc.ABCMeta): """Log data model. Args: *args: Arbitrary positional arguments. **kwargs: Arbitrary keyword arguments. Warns: BroDeprecationWarning: Use of ``bro_*`` type annotations. Raises: :exc:`ModelValueError`: In case of inconsistency between field data types, or values of ``unset_field``, ``empty_field`` and ``set_separator``. :exc:`ModelTypeError`: Wrong parameters when initialisation. Note: Customise the :meth:`Model.__post_init__ <zlogging.model.Model.__post_init__>` method in your subclassed data model to implement your own ideas. Example: Define a custom log data model using the prefines Bro/Zeek data types, or subclasses of :class:`~zlogging.types.BaseType`: .. code-block:: python class MyLog(Model): field_one = StringType() field_two = SetType(element_type=PortType) Or you may use type annotations as :pep:`484` introduced when declaring data models. All available type hints can be found in :mod:`zlogging.typing`: .. code-block:: python class MyLog(Model): field_one: zeek_string field_two: zeek_set[zeek_port] However, when mixing annotations and direct assignments, annotations will take proceedings, i.e. the :class:`Model` class shall process first annotations then assignments. Should there be any conflicts, ``ModelError`` will be raised. See Also: See :func:`~zlogging._aux.expand_typing` for more information about processing the fields. """ #: Fields of the data model. __fields__: 'OrderedDict[str, Union[_SimpleType, _GenericType]]' #: Fields of ``record`` data type in the data model. __record_fields__: 'OrderedDict[str, _VariadicType]' #: Placeholder for empty field. __empty_field__: 'bytes' #: Placeholder for unset field. __unset_field__: 'bytes' #: Separator for set/vector fields. __set_separator__: 'bytes' @property def fields(self) -> 'OrderedDict[str, Union[_SimpleType, _GenericType]]': """Fields of the data model.""" return self.__fields__ @property def unset_field(self) -> 'bytes': """Placeholder for empty field.""" return self.__unset_field__ @property def empty_field(self) -> 'bytes': """Placeholder for unset field.""" return self.__empty_field__ @property def set_separator(self) -> 'bytes': """Separator for set/vector fields.""" return self.__set_separator__ def __new__(cls, *args: 'Any', **kwargs: 'Any') -> 'Model': # pylint: disable=unused-argument expanded = expand_typing(cls, ModelValueError) cls.__fields__ = expanded['fields'] cls.__record_fields__ = expanded['record_fields'] cls.__doc__ = 'Initialise ``%s`` data model.' % cls.__name__ # pylint: disable=consider-using-f-string cls.__unset_field__ = expanded['unset_field'] cls.__empty_field__ = expanded['empty_field'] cls.__set_separator__ = expanded['set_separator'] return super().__new__(cls) def __init__(self, *args: 'Any', **kwargs: 'Any') -> 'None': init_args = collections.OrderedDict() field_names = list(self.__fields__) if len(args) > len(field_names): raise ModelTypeError('__init__() takes %d positional arguments but %d were given' % (len(field_names), len(args))) # pylint: disable=line-too-long,consider-using-f-string for index, arg in enumerate(args): name = field_names[index] init_args[name] = self.__fields__[name](arg) for arg, val in kwargs.items(): if arg in init_args: raise ModelTypeError('__init__() got multiple values for argument %r' % arg) # pylint: disable=consider-using-f-string if arg not in field_names: if arg in self.__record_fields__ and isinstance(val, dict): for arg_nam, arg_val in val.items(): name = '%s.%s' % (arg, arg_nam) # pylint: disable=consider-using-f-string if name not in self.__fields__: raise ModelTypeError('__init__() got an unexpected keyword argument %r' % name) # pylint: disable=consider-using-f-string init_args[name] = self.__fields__[name](arg_val) continue raise ModelTypeError('__init__() got an unexpected keyword argument %r' % arg) # pylint: disable=consider-using-f-string init_args[arg] = self.__fields__[arg](val) diff = [] # type: list[str] for field in field_names: if field in init_args: continue diff.append(field) if diff: length = len(diff) if length == 1: diff_args = repr(diff[0]) elif length == 2: diff_args = '%r and %r' % tuple(diff) # pylint: disable=consider-using-f-string else: diff_args = '%s, and %r' % (', '.join(map(repr, diff[:-1])), diff[-1]) # pylint: disable=consider-using-f-string raise ModelTypeError('missing %d required positional arguments: %s' % (length, diff_args)) # pylint: disable=consider-using-f-string for key, val in init_args.items(): setattr(self, key, val) self.__post_init__()
[docs] def __post_init__(self) -> 'None': """Post-processing customisation."""
def __str__(self) -> 'str': fields = [] # type: list[str] for field in self.__fields__: value = getattr(self, field) fields.append('%s=%s' % (field, value)) # pylint: disable=consider-using-f-string return '%s(%s)' % (type(self).__name__, ', '.join(fields)) # pylint: disable=consider-using-f-string def __repr__(self) -> 'str': fields = [] # type: list[str] for field in self.__fields__: value = getattr(self, field) fields.append('%s=%s' % (field, value)) # pylint: disable=consider-using-f-string return '%s(%s)' % (type(self).__name__, ', '.join(fields)) # pylint: disable=consider-using-f-string
[docs] def __call__(self, format: 'str') -> 'Any': # pylint: disable=redefined-builtin """Serialise data model with given format. Args: format: Serialisation format. Returns: The serialised data. Raises: :exc:`ModelFormatError`: If ``format`` is not supproted, i.e. :meth:`Mode.to{format}` does not exist. """ func = 'to%s' % format # pylint: disable=consider-using-f-string if hasattr(self, func): return getattr(self, func)() raise ModelFormatError('unsupported format: %s' % format) # pylint: disable=consider-using-f-string
[docs] def tojson(self) -> 'OrderedDict[str, Any]': """Serialise data model as JSON log format. Returns: An :obj:`OrderedDict` mapping each field and serialised JSON serialisable data. """ fields = collections.OrderedDict() for field, type_cls in self.__fields__.items(): value = getattr(self, field) fields[field] = type_cls.tojson(value) return fields
[docs] def toascii(self) -> 'OrderedDict[str, str]': """Serialise data model as ASCII log format. Returns: An :obj:`OrderedDict` mapping each field and serialised text data. """ fields = collections.OrderedDict() for field, type_cls in self.__fields__.items(): value = getattr(self, field) fields[field] = type_cls.toascii(value) return fields
[docs] def asdict(self, dict_factory: 'Optional[Type[dict]]' = None) -> 'dict[str, Any]': """Convert data model as a dictionary mapping field names to field values. Args: dict_factory: If given, ``dict_factory`` will be used instead of built-in :obj:`dict`. Returns: A dictionary mapping field names to field values. """ if dict_factory is None: dict_factory = dict fields = dict_factory() for field in self.__fields__: value = getattr(self, field) fields[field] = value return fields
[docs] def astuple(self, tuple_factory: 'Optional[Type[tuple]]' = None) -> 'tuple[Any, ...]': """Convert data model as a tuple of field values. Args: tuple_factory: If given, ``tuple_factory`` will be used instead of built-in :class:`~collections.namedtuple`. Returns: A tuple of field values. """ field_names = [] # type: list[str] field_value = [] # type: list[Any] for field in self.__fields__: value = getattr(self, field) field_names.append(field) field_value.append(value) if tuple_factory is None: model_name = type(self).__name__ named_tuple = collections.namedtuple(model_name, field_names) # type: ignore[misc] return named_tuple(*field_value) return tuple_factory(field_value)
[docs]def new_model(name: 'str', **fields: 'Any') -> 'Type[Model]': """Create a data model dynamically with the appropriate fields. Args: name: data model name **fields: defined fields of the data model Returns: Created data model. Examples: Typically, we define a data model by subclassing the :obj:`Model` class, as following: .. code-block:: python class MyLog(Model): field_one = StringType() field_two = SetType(element_type=PortType) when defining dynamically with :func:`new_model`, the definition above can be rewrote to: .. code-block:: python MyLog = new_model('MyLog', field_one=StringType(), field_two=SetType(element_type=PortType)) """ def gen_body(ns: 'dict[str, Any]') -> 'None': """Generate ``exec_body``.""" for name, type_cls in fields.items(): ns[name] = type_cls return types.new_class(name, (Model,), exec_body=gen_body)