# -*- coding: utf-8 -*-
# pylint: disable=ungrouped-imports,unsubscriptable-object
"""Bro/Zeek data types."""
import abc
import ctypes
import datetime
import decimal
import ipaddress
import json
import warnings
from typing import TYPE_CHECKING, Any, Generic, List, Set, TypeVar, Union, cast, overload
from mypy_extensions import TypedDict
from zlogging._aux import decimal_toascii, expand_typing, float_toascii
from zlogging._compat import enum
from zlogging._exc import (BroDeprecationWarning, ZeekNotImplemented, ZeekTypeError, ZeekValueError,
ZeekValueWarning)
from zlogging.enum import globals as enum_generator
_T = TypeVar('_T')
_S = TypeVar('_S', bound='_SimpleType')
if TYPE_CHECKING:
from collections import OrderedDict
from ctypes import c_int64 as int64
from ctypes import c_uint16 as uint16
from ctypes import c_uint64 as uint64
from datetime import datetime as DateTimeType
from datetime import timedelta as TimeDeltaType
from decimal import Decimal
from ipaddress import IPv4Address, IPv4Network, IPv6Address, IPv6Network
from json import JSONEncoder
from typing import NoReturn, Optional, Type
from typing_extensions import Literal
AnyStr = Union[str, bytes]
ByteString = Union[bytes, bytearray, memoryview]
IPAddress = Union[IPv4Address, IPv6Address]
IPNetwork = Union[IPv4Network, IPv6Network]
__all__ = [
'AddrType', 'BoolType', 'CountType', 'DoubleType', 'EnumType',
'IntervalType', 'IntType', 'PortType', 'RecordType', 'SetType',
'StringType', 'SubnetType', 'TimeType', 'VectorType',
]
[docs]class BaseType(metaclass=abc.ABCMeta):
"""Base Bro/Zeek data type.
Args:
empty_field: Placeholder for empty field.
unset_field: Placeholder for unset field.
set_separator: Separator for ``set``/``vector`` fields.
*args: Arbitrary positional arguments.
**kwargs: Arbitrary keyword arguments.
"""
#: Placeholder for empty field.
empty_field: 'bytes'
str_empty_field: 'str'
#: Placeholder for unset field.
unset_field: 'bytes'
str_unset_field: 'str'
#: Separator for ``set``/``vector`` fields.
set_separator: 'bytes'
str_set_separator: 'str'
@property
@abc.abstractmethod
def python_type(self) -> 'Any':
"""Corresponding Python type annotation."""
@property
@abc.abstractmethod
def zeek_type(self) -> 'str':
"""Corresponding Zeek type name."""
@property
def bro_type(self) -> 'str':
"""Corresponding Bro type name."""
warnings.warn("Use of 'bro_type' is deprecated. "
"Please use 'zeek_type' instead.", BroDeprecationWarning)
return self.zeek_type
def __init__(self, # pylint: disable=unused-argument,keyword-arg-before-vararg
empty_field: 'Optional[AnyStr]' = None, unset_field: 'Optional[AnyStr]' = None,
set_separator: 'Optional[AnyStr]' = None, *args: 'Any', **kwargs: 'Any') -> 'None':
if empty_field is None:
self.empty_field = b'(empty)'
self.str_empty_field = '(empty)'
elif isinstance(empty_field, str):
self.empty_field = empty_field.encode('ascii')
self.str_empty_field = empty_field
else:
self.empty_field = empty_field
self.str_empty_field = empty_field.decode('ascii')
if unset_field is None:
self.unset_field = b'-'
self.str_unset_field = '-'
elif isinstance(unset_field, str):
self.unset_field = unset_field.encode('ascii')
self.str_unset_field = unset_field
else:
self.unset_field = unset_field
self.str_unset_field = unset_field.decode('ascii')
if set_separator is None:
self.set_separator = b','
self.str_set_separator = ','
elif isinstance(set_separator, str):
self.set_separator = set_separator.encode('ascii')
self.str_set_separator = set_separator
else:
self.set_separator = set_separator
self.str_set_separator = set_separator.decode('ascii')
self._name = type(self).__name__
[docs] def __call__(self, data: 'Any') -> 'Any':
"""Parse ``data`` from string.
This is a proxy method which calls to :meth:`~zlogging.types.BaseType.parse`
of the type implementation.
"""
if data is None:
return data
return self.parse(data)
[docs] def __str__(self) -> 'str':
"""Returns the corresponding Zeek type name."""
return self.zeek_type
def __repr__(self) -> 'str':
return (f'{self._name}(empty_field={self.str_empty_field!r}, '
f'unset_field={self.str_unset_field!r}, set_separator={self.str_set_separator!r})')
[docs] @abc.abstractmethod
def parse(self, data: 'Any') -> 'Any':
"""Parse ``data`` from string."""
[docs] @abc.abstractmethod
def tojson(self, data: 'Any') -> 'Any':
"""Serialize ``data`` as JSON log format."""
[docs] @abc.abstractmethod
def toascii(self, data: 'Any') -> 'str':
"""Serialize ``data`` as ASCII log format."""
[docs]class _SimpleType(BaseType): # pylint: disable=abstract-method
"""Simple data type.
In Bro/Zeek script language, such simple type includes ``bool``, ``count``,
``int``, ``double``, ``time``, ``interval``, ``string``, ``addr``,
``port``, ``subnet`` and ``enum``.
To support arbitrary typing as required in :class:`~zlogging.loader.JSONParser`,
``any``, the arbitrary date type is also included.
"""
[docs]class AnyType(_SimpleType):
"""Bro/Zeek ``any`` data type.
Args:
empty_field: Placeholder for empty field.
unset_field: Placeholder for unset field.
set_separator: Separator for ``set``/``vector`` fields.
json_encoder: JSON encoder class for :meth:`~zlogging.types.AnyType.tojson`
method calls.
*args: Arbitrary positional arguments.
**kwargs: Arbitrary keyword arguments.
Note:
The :class:`~zlogging.types.AnyType` is only used for arbitrary typing
as required in :class:`~zlogging.loader.JSONParser`. It is **NOT** a
valid type of Bro/Zeek logging framework.
"""
#: JSON encoder class for :meth:`~zlogging.types.AnyType.tojson` method calls.
json_encoder: 'Type[JSONEncoder]'
@property
def python_type(self) -> 'Any':
"""Corresponding Python type annotation."""
return Any
@property
def zeek_type(self) -> 'Literal["any"]':
"""Corresponding Zeek type name."""
return 'any'
def __init__(self, # pylint: disable=unused-argument,keyword-arg-before-vararg
empty_field: 'Optional[AnyStr]' = None, unset_field: 'Optional[AnyStr]' = None,
set_separator: 'Optional[AnyStr]' = None, json_encoder: 'Optional[Type[JSONEncoder]]' = None,
*args: 'Any', **kwargs: 'Any') -> 'None':
if json_encoder is None:
json_encoder = json.JSONEncoder
self.json_encoder = json_encoder
super().__init__(empty_field=empty_field, unset_field=unset_field, set_separator=set_separator)
[docs] def parse(self, data: '_T') -> 'Optional[_T]':
"""Parse ``data`` from string.
Args:
data: raw data
Returns:
The parsed data. If ``data`` is *unset*, :data:`None` will
be returned.
"""
r_data = data.encode('ascii') if isinstance(data, str) else data
if r_data == self.unset_field:
return None
return data
[docs] def tojson(self, data: 'Any') -> 'Any':
"""Serialize ``data`` as JSON log format.
Args:
data: raw data
Returns:
The JSON representation of data.
Notes:
If the data is not JSON serialisable, i.e. :func:`json.dumps`
raises :exc:`TypeError`, the method will return a :obj:`dict`
object with ``data`` representing :obj:`str` sanitised raw data
and ``error`` representing the error message.
"""
try:
json.dumps(data, cls=self.json_encoder)
except TypeError as error:
return {
'data': str(data),
'error': str(error),
}
return data
[docs] def toascii(self, data: 'Any') -> 'str':
"""Serialize ``data`` as ASCII log format.
Args:
data: raw data
Returns:
The ASCII representation of data.
"""
if data is None:
return self.str_unset_field
return str(data)
[docs]class BoolType(_SimpleType):
"""Bro/Zeek ``bool`` data type.
Args:
empty_field: Placeholder for empty field.
unset_field: Placeholder for unset field.
set_separator: Separator for ``set``/``vector`` fields.
*args: Arbitrary positional arguments.
**kwargs: Arbitrary keyword arguments.
"""
@property
def python_type(self)-> 'Type[bool]':
"""Corresponding Python type annotation."""
return bool
@property
def zeek_type(self) -> 'Literal["bool"]':
"""Corresponding Zeek type name."""
return 'bool'
@overload
def parse(self, data: 'Literal["T", b"T"]') -> 'Literal[True]': ...
@overload
def parse(self, data: 'Literal["F", b"F"]') -> 'Literal[False]': ...
@overload
def parse(self, data: 'AnyStr') -> 'Optional[bool]': ...
[docs] def parse(self, data: 'Union[AnyStr, bool]') -> 'Optional[bool]':
"""Parse ``data`` from string.
Args:
data: raw data
Returns:
The parsed boolean data. If ``data`` is *unset*, :data:`None` will
be returned.
Raises:
ZeekValueError: If ``data`` is NOT *unset* and NOT ``T`` (:data:`True`)
nor ``F`` (:data:`False`) in Bro/Zeek script language.
"""
if isinstance(data, bool):
return data
if isinstance(data, str):
data = data.encode('ascii')
if data == self.unset_field:
return None
if data == b'T':
return True
if data == b'F':
return False
raise ZeekValueError('invalid bool value: %s' % data.decode('ascii')) # pylint: disable=consider-using-f-string
@overload
def tojson(self, data: 'Literal[True]') -> 'Literal[True]': ...
@overload
def tojson(self, data: 'Literal[False]') -> 'Literal[False]': ...
@overload
def tojson(self, data: 'None') -> 'None': ...
[docs] def tojson(self, data: 'Optional[bool]') -> 'Optional[bool]':
"""Serialize ``data`` as JSON log format.
Args:
data: raw data
Returns:
The JSON serialisable boolean data.
"""
return data
@overload
def toascii(self, data: 'Literal[True]') -> 'Literal["T"]': ...
@overload
def toascii(self, data: 'Literal[False]') -> 'Literal["F"]': ...
@overload
def toascii(self, data: 'None') -> 'str': ...
[docs] def toascii(self, data: 'Optional[bool]') -> 'str':
"""Serialize ``data`` as ASCII log format.
Args:
data: raw data
Returns:
str: ``T`` if :data:`True`, ``F`` if :data:`False`.
"""
if data is None:
return self.str_unset_field
return 'T' if data else 'F'
[docs]class CountType(_SimpleType):
"""Bro/Zeek ``count`` data type.
Args:
empty_field: Placeholder for empty field.
unset_field: Placeholder for unset field.
set_separator: Separator for ``set``/``vector`` fields.
*args: Arbitrary positional arguments.
**kwargs: Arbitrary keyword arguments.
"""
@property
def python_type(self) -> 'Type[uint64]':
"""Corresponding Python type annotation."""
return ctypes.c_uint64
@property
def zeek_type(self) -> 'Literal["count"]':
"""Corresponding Zeek type name."""
return 'count'
@overload
def parse(self, data: 'AnyStr') -> 'Optional[uint64]': ...
@overload
def parse(self, data: 'int') -> 'uint64': ...
@overload
def parse(self, data: 'uint64') -> 'uint64': ...
[docs] def parse(self, data: 'Union[AnyStr, int, uint64]') -> 'Optional[uint64]':
"""Parse ``data`` from string.
Args:
data: raw data
Returns:
The parsed numeral data. If ``data`` is *unset*, :data:`None` will
be returned.
"""
if isinstance(data, ctypes.c_uint64):
return data
if isinstance(data, int):
return ctypes.c_uint64(data)
if isinstance(data, str):
data = data.encode('ascii')
if data == self.unset_field:
return None
return ctypes.c_uint64(int(data))
@overload
def tojson(self, data: 'uint64') -> 'int': ...
@overload
def tojson(self, data: 'None') -> 'None': ...
[docs] def tojson(self, data: 'Optional[uint64]') -> 'Optional[int]':
"""Serialize ``data`` as JSON log format.
Args:
data: raw data
Returns:
int: The JSON serialisable numeral data.
"""
if data is None:
return None
return data.value
[docs] def toascii(self, data: 'Optional[uint64]') -> 'str':
"""Serialize ``data`` as ASCII log format.
Args:
data: raw data
Returns:
str: The ASCII representation of numeral data.
"""
if data is None:
return self.str_unset_field
return str(data.value)
[docs]class IntType(_SimpleType):
"""Bro/Zeek ``int`` data type.
Args:
empty_field: Placeholder for empty field.
unset_field: Placeholder for unset field.
set_separator: Separator for ``set``/``vector`` fields.
*args: Arbitrary positional arguments.
**kwargs: Arbitrary keyword arguments.
"""
@property
def python_type(self) -> 'Type[int64]':
"""Corresponding Python type annotation."""
return ctypes.c_int64
@property
def zeek_type(self) -> 'Literal["int"]':
"""Corresponding Zeek type name."""
return 'int'
@overload
def parse(self, data: 'AnyStr') -> 'Optional[int64]': ...
@overload
def parse(self, data: 'int') -> 'int64': ...
@overload
def parse(self, data: 'int64') -> 'int64': ...
[docs] def parse(self, data: 'Union[AnyStr, int, int64]') -> 'Optional[int64]':
"""Parse ``data`` from string.
Args:
data: raw data
Returns:
The parsed numeral data. If ``data`` is *unset*, :data:`None` will
be returned.
"""
if isinstance(data, ctypes.c_int64):
return data
if isinstance(data, int):
return ctypes.c_int64(data)
if isinstance(data, str):
data = data.encode('ascii')
if data == self.unset_field:
return None
return ctypes.c_int64(int(data))
@overload
def tojson(self, data: 'int64') -> 'int': ...
@overload
def tojson(self, data: 'None') -> 'None': ...
[docs] def tojson(self, data: 'Optional[int64]') -> 'Optional[int]':
"""Serialize ``data`` as JSON log format.
Args:
data: raw data
Returns:
int: The JSON serialisable numeral data.
"""
if data is None:
return None
return data.value
[docs] def toascii(self, data: 'Optional[int64]') -> 'str':
"""Serialize ``data`` as ASCII log format.
Args:
data: raw data
Returns:
str: The ASCII representation of numeral data.
"""
if data is None:
return self.str_unset_field
return str(data.value)
[docs]class DoubleType(_SimpleType):
"""Bro/Zeek ``double`` data type.
Args:
empty_field: Placeholder for empty field.
unset_field: Placeholder for unset field.
set_separator: Separator for ``set``/``vector`` fields.
*args: Arbitrary positional arguments.
**kwargs: Arbitrary keyword arguments.
"""
@property
def python_type(self) -> 'Type[Decimal]':
"""Corresponding Python type annotation."""
return decimal.Decimal
@property
def zeek_type(self) -> 'Literal["double"]':
"""Corresponding Zeek type name."""
return 'double'
@overload
def parse(self, data: 'AnyStr') -> 'Optional[Decimal]': ...
@overload
def parse(self, data: 'Union[int, float]') -> 'Decimal': ...
@overload
def parse(self, data: 'Decimal') -> 'Decimal': ...
[docs] def parse(self, data: 'Union[AnyStr, int, float, Decimal]') -> 'Optional[Decimal]':
"""Parse ``data`` from string.
Args:
data: raw data
Returns:
The parsed numeral data. If ``data`` is *unset*, :data:`None` will
be returned.
"""
if isinstance(data, decimal.Decimal):
return data
if isinstance(data, (int, float)):
with decimal.localcontext() as ctx:
value = decimal.Decimal(data)
return value
if isinstance(data, str):
data = data.encode('ascii')
if data == self.unset_field:
return None
with decimal.localcontext() as ctx:
ctx.prec = 6
value = decimal.Decimal(data.decode('ascii'))
return value
@overload
def tojson(self, data: 'Decimal') -> 'float': ...
@overload
def tojson(self, data: 'None') -> 'None': ...
[docs] def tojson(self, data: 'Optional[Decimal]') -> 'Optional[float]':
"""Serialize ``data`` as JSON log format.
Args:
data: raw data
Returns:
float: The JSON serialisable numeral data.
"""
if data is None:
return None
return float(data)
[docs] def toascii(self, data: 'Optional[Decimal]') -> 'str':
"""Serialize ``data`` as ASCII log format.
Args:
data: raw data
Returns:
str: The ASCII representation of numeral data.
"""
if data is None:
return self.str_unset_field
return decimal_toascii(data, self.str_unset_field)
[docs]class TimeType(_SimpleType):
"""Bro/Zeek ``time`` data type.
Args:
empty_field: Placeholder for empty field.
unset_field: Placeholder for unset field.
set_separator: Separator for ``set``/``vector`` fields.
*args: Arbitrary positional arguments.
**kwargs: Arbitrary keyword arguments.
"""
@property
def python_type(self) -> 'Type[DateTimeType]':
"""Any: Corresponding Python type annotation."""
return datetime.datetime
@property
def zeek_type(self) -> 'Literal["time"]':
"""str: Corresponding Zeek type name."""
return 'time'
@overload
def parse(self, data: 'AnyStr') -> 'Optional[DateTimeType]': ...
@overload
def parse(self, data: 'float') -> 'DateTimeType': ...
@overload
def parse(self, data: 'DateTimeType') -> 'DateTimeType': ...
[docs] def parse(self, data: 'Union[AnyStr, float, DateTimeType]') -> 'Optional[DateTimeType]':
"""Parse ``data`` from string.
Args:
data: raw data
Returns:
The parsed numeral data. If ``data`` is *unset*, :data:`None` will
be returned.
"""
if isinstance(data, datetime.datetime):
return data
if isinstance(data, float):
return datetime.datetime.fromtimestamp(data)
if isinstance(data, str):
data = data.encode('ascii')
if data == self.unset_field:
return None
with decimal.localcontext() as ctx:
ctx.prec = 6
value = decimal.Decimal(data.decode('ascii'))
return datetime.datetime.fromtimestamp(float(value))
@overload
def tojson(self, data: 'DateTimeType') -> 'float': ...
@overload
def tojson(self, data: 'None') -> 'None': ...
[docs] def tojson(self, data: 'Optional[DateTimeType]') -> 'Optional[float]':
"""Serialize ``data`` as JSON log format.
Args:
data: raw data
Returns:
float: The JSON serialisable numeral data.
"""
if data is None:
return None
return data.timestamp()
[docs] def toascii(self, data: 'Optional[DateTimeType]') -> 'str':
"""Serialize ``data`` as ASCII log format.
Args:
data: raw data
Returns:
str: The ASCII representation of numeral data.
"""
if data is None:
return self.str_unset_field
return float_toascii(data.timestamp(), self.str_unset_field)
[docs]class IntervalType(_SimpleType):
"""Bro/Zeek ``interval`` data type.
Args:
empty_field: Placeholder for empty field.
unset_field: Placeholder for unset field.
set_separator: Separator for ``set``/``vector`` fields.
*args: Arbitrary positional arguments.
**kwargs: Arbitrary keyword arguments.
Attributes:
empty_field (bytes): Placeholder for empty field.
unset_field (bytes): Placeholder for unset field.
set_separator (bytes): Separator for ``set``/``vector`` fields.
"""
@property
def python_type(self) -> 'Type[TimeDeltaType]':
"""Any: Corresponding Python type annotation."""
return datetime.timedelta
@property
def zeek_type(self) -> 'Literal["interval"]':
"""str: Corresponding Zeek type name."""
return 'interval'
@overload
def parse(self, data: 'AnyStr') -> 'Optional[TimeDeltaType]': ...
@overload
def parse(self, data: 'float') -> 'TimeDeltaType': ...
@overload
def parse(self, data: 'TimeDeltaType') -> 'TimeDeltaType': ...
[docs] def parse(self, data: 'Union[AnyStr, float, TimeDeltaType]') -> 'Optional[TimeDeltaType]':
"""Parse ``data`` from string.
Args:
data: raw data
Returns:
The parsed numeral data. If ``data`` is *unset*, :data:`None` will
be returned.
"""
if isinstance(data, datetime.timedelta):
return data
if isinstance(data, float):
data = str(data) # process as string
if isinstance(data, str):
data = data.encode('ascii')
if data == self.unset_field:
return None
if b'.' in data:
int_part, flt_part = data.split(b'.', maxsplit=1)
else:
int_part, flt_part = data, b'000000'
flt_part = flt_part.ljust(6, b'0')[:6]
return datetime.timedelta(seconds=int(int_part),
milliseconds=int(flt_part[:3]),
microseconds=int(flt_part[3:]))
@overload
def tojson(self, data: 'TimeDeltaType') -> 'float': ...
@overload
def tojson(self, data: 'None') -> 'None': ...
[docs] def tojson(self, data: 'Optional[TimeDeltaType]') -> 'Optional[float]':
"""Serialize ``data`` as JSON log format.
Args:
data: raw data
Returns:
int: The JSON serialisable numeral data.
"""
if data is None:
return None
return data.total_seconds()
[docs] def toascii(self, data: 'Optional[TimeDeltaType]') -> 'str':
"""Serialize ``data`` as ASCII log format.
Args:
data: raw data
Returns:
str: The ASCII representation of numeral data.
"""
if data is None:
return self.str_unset_field
return float_toascii(data.total_seconds(), self.str_unset_field)
[docs]class StringType(_SimpleType):
"""Bro/Zeek ``string`` data type.
Args:
empty_field: Placeholder for empty field.
unset_field: Placeholder for unset field.
set_separator: Separator for ``set``/``vector`` fields.
*args: Arbitrary positional arguments.
**kwargs: Arbitrary keyword arguments.
"""
@property
def python_type(self) -> 'Any':
"""Any: Corresponding Python type annotation."""
return Union[bytes, memoryview, bytearray]
@property
def zeek_type(self) -> 'Literal["string"]':
"""str: Corresponding Zeek type name."""
return 'string'
[docs] def parse(self, data: 'Union[AnyStr, ByteString]') -> 'Optional[bytes]':
"""Parse ``data`` from string.
Args:
data: raw data
Returns:
The parsed string data. If ``data`` is *unset*, :data:`None` will
be returned.
"""
if isinstance(data, bytearray):
data = bytes(data)
if isinstance(data, memoryview):
data = data.tobytes()
if isinstance(data, str):
data = data.encode('ascii')
if data == self.empty_field:
return b''
if data == self.unset_field:
return None
return data
@overload
def tojson(self, data: 'ByteString') -> 'str': ...
@overload
def tojson(self, data: 'None') -> 'None': ...
[docs] def tojson(self, data: 'Optional[ByteString]') -> 'Optional[str]':
"""Serialize ``data`` as JSON log format.
Args:
data: raw data
Returns:
str: The JSON serialisable string data encoded in ASCII.
"""
if data is None:
return None
if isinstance(data, bytearray):
data = bytes(data)
if isinstance(data, memoryview):
data = data.tobytes()
return data.decode('ascii')
[docs] def toascii(self, data: 'Optional[ByteString]') -> 'str':
"""Serialize ``data`` as ASCII log format.
Args:
data: raw data
Returns:
str: The ASCII encoded string data.
"""
if data is None:
return self.str_unset_field
if isinstance(data, bytearray):
data = bytes(data)
if isinstance(data, memoryview):
data = data.tobytes()
if data:
return data.decode('ascii')
return self.str_empty_field
[docs]class AddrType(_SimpleType):
"""Bro/Zeek ``addr`` data type.
Args:
empty_field: Placeholder for empty field.
unset_field: Placeholder for unset field.
set_separator: Separator for ``set``/``vector`` fields.
*args: Arbitrary positional arguments.
**kwargs: Arbitrary keyword arguments.
"""
@property
def python_type(self) -> 'Any':
"""Any: Corresponding Python type annotation."""
return Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
@property
def zeek_type(self) -> 'str':
"""str: Corresponding Zeek type name."""
return 'addr'
@overload
def parse(self, data: 'AnyStr') -> 'Optional[IPAddress]': ...
@overload
def parse(self, data: 'IPAddress') -> 'IPAddress': ...
[docs] def parse(self, data: 'Union[AnyStr, IPAddress]') -> 'Optional[IPAddress]':
"""Parse ``data`` from string.
Args:
data: raw data
Returns:
The parsed IP address. If ``data`` is *unset*, :data:`None` will
be returned.
"""
if isinstance(data, (ipaddress.IPv4Address, ipaddress.IPv6Address)):
return data
if isinstance(data, str):
data = data.encode('ascii')
if data == self.unset_field:
return None
return ipaddress.ip_address(data.decode('ascii'))
@overload
def tojson(self, data: 'IPAddress') -> 'str': ...
@overload
def tojson(self, data: 'None') -> 'None': ...
[docs] def tojson(self, data: 'Optional[IPAddress]') -> 'Optional[str]':
"""Serialize ``data`` as JSON log format.
Args:
data: raw data
Returns:
str: The JSON serialisable IP address string.
"""
if data is None:
return None
return str(data)
[docs] def toascii(self, data: 'Optional[IPAddress]') -> 'str':
"""Serialize ``data`` as ASCII log format.
Args:
data: raw data
Returns:
str: The ASCII representation of the IP address.
"""
if data is None:
return self.str_unset_field
return str(data)
[docs]class PortType(_SimpleType):
"""Bro/Zeek ``port`` data type.
Args:
empty_field: Placeholder for empty field.
unset_field: Placeholder for unset field.
set_separator: Separator for ``set``/``vector`` fields.
*args: Arbitrary positional arguments.
**kwargs: Arbitrary keyword arguments.
"""
@property
def python_type(self) -> 'Type[uint16]':
"""Any: Corresponding Python type annotation."""
return ctypes.c_uint16
@property
def zeek_type(self) -> 'Literal["port"]':
"""str: Corresponding Zeek type name."""
return 'port'
@overload
def parse(self, data: 'AnyStr') -> 'Optional[uint16]': ...
@overload
def parse(self, data: 'int') -> 'uint16': ...
@overload
def parse(self, data: 'uint16') -> 'uint16': ...
[docs] def parse(self, data: 'Union[AnyStr, int, uint16]') -> 'Optional[uint16]':
"""Parse ``data`` from string.
Args:
data: raw data
Returns:
The parsed port number. If ``data`` is *unset*, :data:`None` will
be returned.
"""
if isinstance(data, ctypes.c_uint16):
return data
if isinstance(data, int):
return ctypes.c_uint16(data)
if isinstance(data, str):
data = data.encode('ascii')
if data == self.unset_field:
return None
return ctypes.c_uint16(int(data))
@overload
def tojson(self, data: 'uint16') -> 'int': ...
@overload
def tojson(self, data: 'None') -> 'None': ...
[docs] def tojson(self, data: 'Optional[uint16]') -> 'Optional[int]':
"""Serialize ``data`` as JSON log format.
Args:
data: raw data
Returns:
int: The JSON serialisable port number string.
"""
if data is None:
return None
return data.value
[docs] def toascii(self, data: 'Optional[uint16]') -> 'str':
"""Serialize ``data`` as ASCII log format.
Args:
data: raw data
Returns:
str: The ASCII representation of the port number.
"""
if data is None:
return self.str_unset_field
return str(data.value)
[docs]class SubnetType(_SimpleType):
"""Bro/Zeek ``subnet`` data type.
Args:
empty_field: Placeholder for empty field.
unset_field: Placeholder for unset field.
set_separator: Separator for ``set``/``vector`` fields.
*args: Arbitrary positional arguments.
**kwargs: Arbitrary keyword arguments.
"""
@property
def python_type(self) -> 'Any':
"""Any: Corresponding Python type annotation."""
return Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
@property
def zeek_type(self) -> 'Literal["subnet"]':
"""str: Corresponding Zeek type name."""
return 'subnet'
@overload
def parse(self, data: 'AnyStr') -> 'Optional[IPNetwork]': ...
@overload
def parse(self, data: 'IPNetwork') -> 'IPNetwork': ...
[docs] def parse(self, data: 'Union[AnyStr, IPNetwork]') -> 'Optional[IPNetwork]':
"""Parse ``data`` from string.
Args:
data: raw data
Returns:
The parsed IP network. If ``data`` is *unset*, :data:`None` will
be returned.
"""
if isinstance(data, (ipaddress.IPv4Network, ipaddress.IPv6Network)):
return data
if isinstance(data, str):
data = data.encode('ascii')
if data == self.unset_field:
return None
return ipaddress.ip_network(data.decode('ascii'))
@overload
def tojson(self, data: 'IPNetwork') -> 'str': ...
@overload
def tojson(self, data: 'None') -> 'None': ...
[docs] def tojson(self, data: 'Optional[IPNetwork]') -> 'Optional[str]':
"""Serialize ``data`` as JSON log format.
Args:
data: raw data
Returns:
str: The JSON serialisable IP network string.
"""
if data is None:
return None
return str(data)
[docs] def toascii(self, data: 'Optional[IPNetwork]') -> 'str':
"""Serialize ``data`` as ASCII log format.
Args:
data: raw data
Returns:
str: The ASCII representation of the IP network.
"""
if data is None:
return self.str_unset_field
return str(data)
[docs]class EnumType(_SimpleType):
"""Bro/Zeek ``enum`` data type.
Args:
empty_field: Placeholder for empty field.
unset_field: Placeholder for unset field.
set_separator: Separator for ``set``/``vector`` fields.
namespaces: Namespaces to be loaded.
bare: If :data:`True`, do not load ``zeek`` namespace by default.
enum_hook: Additional enum to be included in the namespace.
*args: Arbitrary positional arguments.
**kwargs: Arbitrary keyword arguments.
"""
#: Namespaces to be loaded.
enum_namespaces: 'dict[str, enum.Enum]'
@property
def python_type(self) -> 'Any':
"""Any: Corresponding Python type annotation."""
return enum.Enum
@property
def zeek_type(self) -> 'str':
"""str: Corresponding Zeek type name."""
return 'enum'
def __init__(self, # pylint: disable=unused-argument,keyword-arg-before-vararg
empty_field: 'Optional[AnyStr]' = None,
unset_field: 'Optional[AnyStr]' = None,
set_separator: 'Optional[AnyStr]' = None,
namespaces: 'Optional[list[str]]' = None,
bare: bool = False,
enum_hook: 'Optional[dict[str, enum.Enum]]' = None,
*args: 'Any', **kwargs: 'Any') -> 'None':
super().__init__(empty_field=empty_field, unset_field=unset_field, set_separator=set_separator)
if namespaces is None:
namespaces = []
self.enum_namespaces = enum_generator(*namespaces, bare=bare)
if enum_hook is not None:
self.enum_namespaces.update(enum_hook)
def __repr__(self) -> 'str':
return (f'{self._name}(empty_field={self.str_empty_field!r}, unset_field={self.str_unset_field!r}, '
f'set_separator={self.str_set_separator!r}, enum_namespaces={self.enum_namespaces!r})')
@overload
def parse(self, data: 'AnyStr') -> 'Optional[enum.Enum]': ...
@overload
def parse(self, data: 'enum.Enum') -> 'enum.Enum': ...
[docs] def parse(self, data: 'Union[AnyStr, enum.Enum]') -> 'Optional[enum.Enum]':
"""Parse ``data`` from string.
Args:
data: raw data
Returns:
The parsed enum data. If ``data`` is *unset*, :data:`None` will
be returned.
Warns:
ZeekValueWarning: If ``date`` is not defined in the enum namespace.
"""
if isinstance(data, enum.Enum):
return data
if isinstance(data, str):
data = data.encode('ascii')
if data == self.unset_field:
return None
data_str = data.decode('ascii')
item = self.enum_namespaces.get(data_str)
if item is None:
warnings.warn('unrecognised enum value: %s' % data_str, ZeekValueWarning) # pylint: disable=consider-using-f-string
unknown = enum.IntFlag('<unknown>', {
data_str: enum.auto(),
}, module='zlogging.enum', qualname='zlogging.enum.<unknown>')
item = getattr(unknown, data_str)
return item
@overload
def tojson(self, data: 'enum.Enum') -> 'str': ...
@overload
def tojson(self, data: 'None') -> 'None': ... # type: ignore[misc]
[docs] def tojson(self, data: 'Optional[enum.Enum]') -> 'Optional[str]':
"""Serialize ``data`` as JSON log format.
Args:
data: raw data
Returns:
str: The JSON serialisable enum data.
"""
if data is None:
return None
return data.name
[docs] def toascii(self, data: 'Optional[enum.Enum]') -> 'str':
"""Serialize ``data`` as ASCII log format.
Args:
data: raw data
Returns:
str: The ASCII representation of the enum data.
"""
if data is None:
return self.str_unset_field
return data.name
[docs]class _GenericType(BaseType, Generic[_S]): # pylint: disable=abstract-method
"""Generic data type.
In Bro/Zeek script language, such generic type includes ``set`` and
``vector``, which are also known as *container* types.
"""
#: Data type of container's elements.
element_type: '_S'
[docs]class SetType(_GenericType, Generic[_S]):
"""Bro/Zeek ``set`` data type.
Args:
empty_field: Placeholder for empty field.
unset_field: Placeholder for unset field.
set_separator: Separator for ``set``/``vector`` fields.
element_type: Data type of container's elements.
*args: Arbitrary positional arguments.
**kwargs: Arbitrary keyword arguments.
Raises:
:exc:`ZeekTypeError`: If ``element_type`` is not supplied.
:exc:`ZeekValueError`: If ``element_type`` is not a valid Bro/Zeek data type.
Example:
As a *generic* data type, the class supports the typing proxy as introduced
:pep:`484`:
.. code-block:: python
>>> SetType[StringType]
which is the same **at runtime** as following:
.. code-block:: python
>>> SetType(element_type=StringType())
Note:
A valid ``element_type`` should be a *simple* data type, i.e. a subclass
of :class:`~zlogging.types._SimpleType`.
"""
@property
def python_type(self) -> 'Any':
"""Any: Corresponding Python type annotation."""
python_type = self.element_type.python_type
return Set[python_type] # type: ignore[valid-type]
@property
def zeek_type(self) -> 'str':
"""str: Corresponding Zeek type name."""
return 'set[%s]' % self.element_type.zeek_type # pylint: disable=consider-using-f-string
def __init__(self, # pylint: disable=unused-argument,keyword-arg-before-vararg
empty_field: 'Optional[AnyStr]' = None,
unset_field: 'Optional[AnyStr]' = None,
set_separator: 'Optional[AnyStr]' = None,
element_type: 'Optional[Union[_S, Type[_S]]]' = None,
*args: 'Any', **kwargs: 'Any') -> 'None':
super().__init__(empty_field=empty_field, unset_field=unset_field, set_separator=set_separator)
if element_type is None:
raise ZeekTypeError("__init__() missing 1 required positional argument: 'element_type'")
if not isinstance(element_type, _SimpleType):
if isinstance(element_type, type) and issubclass(element_type, _SimpleType):
element_type = element_type(empty_field=empty_field, unset_field=unset_field, set_separator=set_separator) # pylint: disable=line-too-long
else:
raise ZeekValueError('invalid element type: %s' % type(element_type).__name__) # pylint: disable=consider-using-f-string
self.element_type = cast('_S', element_type)
def __repr__(self) -> 'str':
return (f'{self._name}(empty_field={self.str_empty_field}, unset_field={self.str_unset_field}, '
f'set_separator={self.str_set_separator}, element_type={self.element_type})')
@overload
def parse(self, data: 'AnyStr') -> 'Optional[set[_S]]': ...
@overload
def parse(self, data: 'set[_S]') -> 'set[_S]': ...
[docs] def parse(self, data: 'Union[AnyStr, set[_S]]') -> 'Optional[set[_S]]':
"""Parse ``data`` from string.
Args:
data: raw data
Returns:
The parsed set data. If ``data`` is *unset*, :data:`None` will
be returned.
"""
if isinstance(data, set):
return {self.element_type(element) for element in data}
if isinstance(data, str):
data = data.encode('ascii')
if data == self.unset_field:
return None
if data == self.empty_field:
return set()
return {self.element_type(element) for element in data.split(self.set_separator)}
@overload
def tojson(self, data: 'set[_S]') -> 'list[Optional[_T]]': ...
@overload
def tojson(self, data: 'None') -> 'None': ...
[docs] def tojson(self, data: 'Optional[set[_S]]') -> 'Optional[list[Optional[_T]]]':
"""Serialize ``data`` as JSON log format.
Args:
data: raw data
Returns:
The JSON serialisable set data.
"""
if data is None:
return None
return sorted(self.element_type.tojson(element) for element in data)
[docs] def toascii(self, data: 'Optional[set[_S]]') -> 'str':
"""Serialize ``data`` as ASCII log format.
Args:
data: raw data
Returns:
The ASCII representation of the set data.
"""
if data is None:
return self.str_unset_field
if not data:
return self.str_empty_field
return self.str_set_separator.join(sorted(self.element_type.toascii(element) for element in data))
[docs]class VectorType(_GenericType, Generic[_S]):
"""Bro/Zeek ``vector`` data type.
Args:
empty_field: Placeholder for empty field.
unset_field: Placeholder for unset field.
set_separator: Separator for ``set``/``vector`` fields.
element_type: Data type of container's elements.
*args: Arbitrary positional arguments.
**kwargs: Arbitrary keyword arguments.
Raises:
:exc:`ZeekTypeError`: If ``element_type`` is not supplied.
:exc:`ZeekValueError`: If ``element_type`` is not a valid Bro/Zeek data type.
Example:
As a *generic* data type, the class supports the typing proxy as introduced
:pep:`484`:
.. code-block:: python
>>> VectorType[StringType]
which is the same **at runtime** as following:
.. code-block:: python
>>> VectorType(element_type=StringType())
Note:
A valid ``element_type`` should be a *simple* data type, i.e. a subclass
of :class:`~zlogging.types._SimpleType`.
"""
@property
def python_type(self) -> 'Any':
"""Any: Corresponding Python type annotation."""
python_type = self.element_type.python_type
return List[python_type] # type: ignore[valid-type]
@property
def zeek_type(self) -> 'str':
"""str: Corresponding Zeek type name."""
return 'vector[%s]' % self.element_type.zeek_type # pylint: disable=consider-using-f-string
def __init__(self, # pylint: disable=unused-argument,keyword-arg-before-vararg
empty_field: 'Optional[AnyStr]' = None,
unset_field: 'Optional[AnyStr]' = None,
set_separator: 'Optional[AnyStr]' = None,
element_type: 'Optional[Union[_S, Type[_S]]]' = None,
*args: 'Any', **kwargs: 'Any'):
super().__init__(empty_field=empty_field, unset_field=unset_field, set_separator=set_separator)
if element_type is None:
raise ZeekTypeError("__init__() missing 1 required positional argument: 'element_type'")
if not isinstance(element_type, _SimpleType):
if isinstance(element_type, type) and issubclass(element_type, _SimpleType):
element_type = element_type(empty_field=empty_field, unset_field=unset_field, set_separator=set_separator) # pylint: disable=line-too-long
else:
raise ZeekValueError('invalid element type: %s' % type(element_type).__name__) # pylint: disable=consider-using-f-string
self.element_type = cast('_S', element_type)
def __repr__(self) -> 'str':
return (f'{self._name}(empty_field={self.str_empty_field}, unset_field={self.str_unset_field}, '
f'set_separator={self.str_set_separator}, element_type={self.element_type})')
@overload
def parse(self, data: 'AnyStr') -> 'Optional[list[_S]]': ...
@overload
def parse(self, data: 'list[_S]') -> 'list[_S]': ...
[docs] def parse(self, data: 'Union[AnyStr, list[_S]]') -> 'Optional[list[_S]]':
"""Parse ``data`` from string.
Args:
data: raw data
Returns:
The parsed list data. If ``data`` is *unset*, :data:`None` will
be returned.
"""
if isinstance(data, list):
return [self.element_type(element) for element in data]
if isinstance(data, str):
data = data.encode('ascii')
if data == self.unset_field:
return None
if data == self.empty_field:
return []
return [self.element_type(element) for element in data.split(self.set_separator)]
@overload
def tojson(self, data: 'list[_S]') -> 'list[Optional[_T]]': ...
@overload
def tojson(self, data: 'None') -> 'None': ...
[docs] def tojson(self, data: 'Optional[list[_S]]') -> 'Optional[list[Optional[_T]]]':
"""Serialize ``data`` as JSON log format.
Args:
data: raw data
Returns:
list: The JSON serialisable list data.
"""
if data is None:
return None
return list(self.element_type.tojson(element) for element in data)
[docs] def toascii(self, data: 'Optional[list[_S]]') -> 'str':
"""Serialize ``data`` as ASCII log format.
Args:
data: raw data
Returns:
str: The ASCII representation of the list data.
"""
if data is None:
return self.str_unset_field
if not data:
return self.str_empty_field
return self.str_set_separator.join(self.element_type.toascii(element) for element in data)
[docs]class _VariadicType(BaseType): # pylint: disable=abstract-method
"""Variadic data type.
In Bro/Zeek script language, such variadic type refers to ``record``, which
is also a *container* type.
"""
#: Data type of container's elements.
element_mapping: 'OrderedDict[str, Union[_SimpleType, _GenericType]]'
[docs] def parse(self, data: 'Any') -> 'NoReturn':
"""Not supported for a variadic data type.
Args:
data: data to process
Raises:
:exc:`ZeekNotImplemented`: If try to call such method.
"""
raise ZeekNotImplemented
[docs] def tojson(self, data: 'Any') -> 'NoReturn':
"""Not supported for a variadic data type.
Args:
data: data to process
Raises:
:exc:`ZeekNotImplemented`: If try to call such method.
"""
raise ZeekNotImplemented
[docs] def toascii(self, data: 'Any') -> 'NoReturn':
"""Not supported for a variadic data type.
Args:
data: data to process
Raises:
:exc:`ZeekNotImplemented`: If try to call such method.
"""
raise ZeekNotImplemented
[docs]class RecordType(_VariadicType):
"""Bro/Zeek ``record`` data type.
Args:
empty_field: Placeholder for empty field.
unset_field: Placeholder for unset field.
set_separator: Separator for ``set``/``vector`` fields.
element_mapping: Data type of container's elements.
*args: Arbitrary positional arguments.
**kwargs: Arbitrary keyword arguments.
Raises:
:exc:`ZeekTypeError`: If ``element_mapping`` is not supplied.
:exc:`ZeekValueError`: If ``element_mapping`` is not a valid Bro/Zeek
data type; or in case of inconsistency from ``empty_field``,
``unset_field`` and ``set_separator`` of each field.
Note:
A valid ``element_mapping`` should be a *simple* or *generic* data type,
i.e. a subclass of :class:`~zlogging.types._SimpleType` or
:class:`~zlogging.types._GenericType`.
See Also:
See :func:`~zlogging._aux_expand_typing` for more information about
processing the fields.
"""
@property
def python_type(self) -> 'Any':
"""Corresponding Python type annotation."""
dict_entries = {
field: element_type.python_type
for field, element_type in self.element_mapping.items()
} # type: dict[str, Any]
return TypedDict('record', dict_entries, total=False)
@property
def zeek_type(self) -> 'Literal["record"]':
"""Corresponding Zeek type name."""
return 'record'
def __new__(cls, *args: 'Any', **kwargs: 'Any') -> 'RecordType': # pylint: disable=unused-argument
cls._expanded = expand_typing(cls, ZeekValueError)
return super().__new__(cls)
def __init__(self, # pylint: disable=unused-argument,keyword-arg-before-vararg
empty_field: 'Optional[AnyStr]' = None,
unset_field: 'Optional[AnyStr]' = None,
set_separator: 'Optional[AnyStr]' = None,
*args: 'Any', **element_mapping: 'Union[Type[_SimpleType], _SimpleType, _GenericType]') -> 'None':
super().__init__(empty_field=empty_field, unset_field=unset_field, set_separator=set_separator)
expanded = self._expanded
if expanded['_inited']:
if self.empty_field != expanded['empty_field']:
raise ZeekValueError("inconsistent value of 'empty_field': %r and %r" % (self.empty_field, expanded['empty_field'])) # pylint: disable=line-too-long,consider-using-f-string
if self.unset_field != expanded['unset_field']:
raise ZeekValueError("inconsistent value of 'unset_field': %r and %r" % (self.unset_field, expanded['unset_field'])) # pylint: disable=line-too-long,consider-using-f-string
if self.set_separator != expanded['set_separator']:
raise ZeekValueError("inconsistent value of 'set_separator': %r and %r" % (self.set_separator, expanded['set_separator'])) # pylint: disable=line-too-long,consider-using-f-string
fields = expanded['fields']
for field, expanded_type in fields.items():
if isinstance(expanded_type, (_SimpleType, _GenericType)):
fields[field] = expanded_type
else:
raise ZeekValueError('invalid element type of field %r: %s' % (field, type(expanded_type).__name__)) # pylint: disable=consider-using-f-string
for field, element_type in element_mapping.items():
if not isinstance(element_type, (_SimpleType, _GenericType)):
if isinstance(element_type, type) and issubclass(element_type, _SimpleType):
element_type = element_type(empty_field=empty_field, unset_field=unset_field, set_separator=set_separator) # pylint: disable=line-too-long
else:
raise ZeekValueError('invalid element type of field %r: %s' % (field, type(element_type).__name__)) # pylint: disable=consider-using-f-string
else:
if self.empty_field != element_type.empty_field:
raise ZeekValueError("inconsistent value of 'empty_field': %r and %r" % (self.empty_field, element_type.empty_field)) # pylint: disable=line-too-long,consider-using-f-string
if self.unset_field != element_type.unset_field:
raise ZeekValueError("inconsistent value of 'unset_field': %r and %r" % (self.unset_field, element_type.unset_field)) # pylint: disable=line-too-long,consider-using-f-string
if self.set_separator != element_type.set_separator:
raise ZeekValueError("inconsistent value of 'set_separator': %r and %r" % (self.set_separator, element_type.set_separator)) # pylint: disable=line-too-long,consider-using-f-string
existed = fields.get(field)
if existed is not None and element_type.zeek_type != existed.zeek_type:
raise ZeekValueError(f'inconsistent data type of {field!r} field: {element_type!r} and {existed!r}')
fields[field] = element_type
self.element_mapping = fields
def __repr__(self) -> 'str':
return (f'{self._name}(empty_field={self.str_empty_field!r}, unset_field={self.str_unset_field!r}, '
f'set_separator={self.str_set_separator!r}, element_mapping={self.element_mapping!r})')