Source code for zlogging.dumper

# -*- coding: utf-8 -*-
"""Bro/Zeek log dumper."""

import abc
import json
import os
import time
from typing import TYPE_CHECKING

from zlogging._aux import unicode_escape
from zlogging._exc import ASCIIWriterError, JSONWriterError, WriterFormatError
from zlogging.model import Model

__all__ = [
    'write', 'write_ascii', 'write_json',
    'dumps', 'dumps_ascii', 'dumps_json',
    'dump', 'dump_ascii', 'dump_json',
    'ASCIIWriter', 'JSONWriter',
]

if TYPE_CHECKING:
    from io import TextIOWrapper as TextFile
    from json import JSONEncoder
    from os import PathLike
    from typing import Any, Iterable, Literal, Optional, Type, Union

    AnyStr = Union[str, bytes]


[docs]class BaseWriter(metaclass=abc.ABCMeta): """Basic log writer.""" @property @abc.abstractmethod def format(self) -> str: """Log file format."""
[docs] def write(self, filename: 'PathLike[str]', data: 'Iterable[Model]') -> 'int': """Write log file. Args: filename: Log file name. data: Log records as an :class:`~typing.Iterable` of :class:`~zlogging.model.Model` per line. Returns: The file offset after writing. """ with open(filename, 'w', encoding='ascii') as file: offset = self.write_file(file, data) return offset
[docs] @abc.abstractmethod def write_file(self, file: 'TextFile', data: 'Iterable[Model]') -> 'int': """Write log file. Args: file: Log file object opened in text mode. data: Log records as an :class:`~typing.Iterable` of :class:`~zlogging.model.Model` per line. Returns: The file offset after writing. """
[docs] @abc.abstractmethod def write_line(self, file: 'TextFile', data: 'Model', lineno: 'Optional[int]' = 0) -> 'int': """Write log line as one-line record. Args: file: Log file object opened in text mode. data: Log record. lineno: Line number of current line. Returns: The file offset after writing. """
[docs] @abc.abstractmethod def dump_file(self, data: 'Iterable[Model]') -> 'str': """Serialise records to a log line. Args: data: Log records as an :class:`~typing.Iterable` of :class:`~zlogging.model.Model` per line. Returns: The converted log string. """
[docs] @abc.abstractmethod def dump_line(self, data: 'Model', lineno: 'Optional[int]' = 0) -> 'str': """Serialise one-line record to a log line. Args: data (:obj:`~zlogging.model.Model`): Log record. lineno: Line number of current line. Returns: The converted log string. """
[docs] def dump(self, data: 'Iterable[Model]', file: 'TextFile') -> 'int': """Write log file. Args: data: Log records as an :class:`~typing.Iterable` of :class:`~zlogging.model.Model` per line. file: Log file object opened in text mode. Returns: The file offset after writing. """ return self.write_file(file, data)
[docs] def dumps(self, data: 'Iterable[Model]') -> 'str': """Serialise records to a log line. Args: data: Log records as an :class:`~typing.Iterable` of :class:`~zlogging.model.Model` per line. Returns: The converted log string. """ return self.dump_file(data)
[docs]class JSONWriter(BaseWriter): """JSON log writer. Args: encoder: JSON encoder class. """ #: JSON encoder class. encoder: 'Type[JSONEncoder]' @property def format(self) -> 'Literal["json"]': """Log file format.""" return 'json' def __init__(self, encoder: 'Optional[Type[JSONEncoder]]' = None) -> 'None': if encoder is None: encoder = json.JSONEncoder self.encoder = encoder
[docs] def write_file(self, file: 'TextFile', data: 'Iterable[Model]') -> 'int': """Write log file. Args: file: Log file object opened in text mode. data: Log records as an :class:`~typing.Iterable` of :class:`~zlogging.model.Model` per line. Returns: The file offset after writing. """ offset = -1 for index, line in enumerate(data, start=1): offset = self.write_line(file, line, lineno=index) return offset
[docs] def write_line(self, file: 'TextFile', data: 'Model', lineno: 'Optional[int]' = 0) -> 'int': """Write log line as one-line record. Args: file: Log file object opened in text mode. data: Log record. lineno: Line number of current line. Returns: The file offset after writing. Raises: :exc:`JSONWriterError`: If failed to serialise ``data`` as JSON. """ try: return file.write('%s\n' % json.dumps(data.tojson(), cls=self.encoder)) # pylint: disable=consider-using-f-string except TypeError as error: raise JSONWriterError(str(error), lineno=lineno) from error
[docs] def dump_file(self, data: 'Optional[Iterable[Model]]' = None) -> 'str': """Serialise records to a log line. Args: data: Log records as an :class:`~typing.Iterable` of :class:`~zlogging.model.Model` per line. Returns: The converted log string. """ if data is None: return '' return ''.join(self.dump_line(line, lineno=index) for index, line in enumerate(data, start=1))
[docs] def dump_line(self, data: 'Model', lineno: 'Optional[int]' = 0) -> 'str': """Serialise one-line record to a log line. Args: data: Log record. lineno: Line number of current line. Returns: The converted log string. Raises: :exc:`JSONWriterError`: If failed to serialise ``data`` as JSON. """ try: return '%s\n' % json.dumps(data.tojson(), cls=self.encoder) # pylint: disable=consider-using-f-string except TypeError as error: raise JSONWriterError(str(error), lineno=lineno) from error
[docs]class ASCIIWriter(BaseWriter): """ASCII log writer. Args: separator: Field separator when writing log lines. empty_field: Placeholder for empty field. unset_field: Placeholder for unset field. set_separator: Separator for ``set``/``vector`` fields. """ #: Field separator when writing log lines. separator: 'bytes' str_separator: 'str' #: Placeholder for empty field. empty_field: 'bytes' str_empty_field: 'str' #: Placeholder for unset field. unset_field: 'bytes' str_unset_field: 'str' #: Separator for ``set``/``vector`` fields. set_separator: 'bytes' str_set_separator: 'str' @property def format(self) -> str: """Log file format.""" return 'ascii' def __init__(self, separator: 'Optional[AnyStr]' = None, empty_field: 'Optional[AnyStr]' = None, unset_field: 'Optional[AnyStr]' = None, set_separator: 'Optional[AnyStr]' = None) -> 'None': if separator is None: self.separator = b'\x09' self.str_separator = '\x09' elif isinstance(separator, bytes): self.separator = separator self.str_separator = separator.decode('ascii') else: self.separator = separator.encode('ascii') self.str_separator = separator if empty_field is None: self.empty_field = b'(empty)' self.str_empty_field = '(empty)' elif isinstance(empty_field, str): self.empty_field = empty_field.encode('ascii') self.str_empty_field = empty_field else: self.empty_field = empty_field self.str_empty_field = empty_field.decode('ascii') if unset_field is None: self.unset_field = b'-' self.str_unset_field = '-' elif isinstance(unset_field, str): self.unset_field = unset_field.encode('ascii') self.str_unset_field = unset_field else: self.unset_field = unset_field self.str_unset_field = unset_field.decode('ascii') if set_separator is None: self.set_separator = b',' self.str_set_separator = ',' elif isinstance(set_separator, str): self.set_separator = set_separator.encode('ascii') self.str_set_separator = set_separator else: self.set_separator = set_separator self.str_set_separator = set_separator.decode('ascii')
[docs] def write_file(self, file: 'TextFile', data: 'Iterable[Model]') -> 'int': """Write log file. Args: file: Log file object opened in text mode. data: Log records as an :class:`~typing.Iterable` of :class:`~zlogging.model.Model` per line. Returns: The file offset after writing. """ if data: line = next(data) # type: ignore[call-overload] self.write_head(file, line) self.write_line(file, line, lineno=1) lineno_start = 2 else: self.write_head(file) lineno_start = 1 for index, line in enumerate(data, start=lineno_start): self.write_line(file, line, lineno=index) return self.write_tail(file)
[docs] def write_line(self, file: 'TextFile', data: 'Model', lineno: 'Optional[int]' = 0) -> 'int': """Write log line as one-line record. Args: file: Log file object opened in text mode. data: Log record. lineno: Line number of current line. Returns: The file offset after writing. Raises: :exc:`ASCIIWriterError`: If failed to serialise ``data`` as ASCII. """ try: return file.write('%s\n' % self.str_separator.join(data.toascii().values())) # pylint: disable=consider-using-f-string except TypeError as error: raise ASCIIWriterError(str(error), lineno=lineno) from error
[docs] def write_head(self, file: 'TextFile', data: 'Optional[Model]' = None) -> 'int': """Write header fields of ASCII log file. Args: file: Log file object opened in text mode. data: Log record. Returns: The file offset after writing. """ separator = self.str_separator if data is None: empty_field = self.str_empty_field unset_field = self.str_unset_field set_separator = self.str_set_separator fields = '' types = '' else: empty_field = data.empty_field.decode('ascii') unset_field = data.unset_field.decode('ascii') set_separator = data.set_separator.decode('ascii') line_fields = data.fields fields = separator.join(line_fields.keys()) types = separator.join(field.zeek_type for field in line_fields.values()) file.write('#separator %s\n' % unicode_escape(self.separator)) # pylint: disable=consider-using-f-string file.write('#set_separator%s%s\n' % (separator, set_separator)) # pylint: disable=consider-using-f-string file.write('#empty_field%s%s\n' % (separator, empty_field)) # pylint: disable=consider-using-f-string file.write('#unset_field%s%s\n' % (separator, unset_field)) # pylint: disable=consider-using-f-string file.write('#path%s%s\n' % (separator, os.path.splitext(file.name)[0])) # pylint: disable=consider-using-f-string file.write('#open%s%s\n' % (separator, time.strftime(r'%Y-%m-%d-%H-%M-%S'))) # pylint: disable=consider-using-f-string file.write('#fields%s%s\n' % (separator, fields)) # pylint: disable=consider-using-f-string return file.write('#types%s%s\n' % (separator, types)) # pylint: disable=consider-using-f-string
[docs] def write_tail(self, file: 'TextFile') -> 'int': """Write trailing fields of ASCII log file. Args: file: Log file object opened in text mode. Returns: The file offset after writing. """ return file.write('#close%s%s\n' % (self.str_separator, time.strftime(r'%Y-%m-%d-%H-%M-%S'))) # pylint: disable=consider-using-f-string
[docs] def dump_file(self, data: 'Optional[Iterable[Model]]' = None, name: 'Optional[str]' = None) -> 'str': # pylint: disable=arguments-differ """Serialise records to a log line. Args: data: Log records as an :class:`~typing.Iterable` of :class:`~zlogging.model.Model` per line. name: Log file name. Returns: The converted log string. """ if data: data_iter = iter(data) line = next(data_iter) buffer = self.dump_head(line, name=name) buffer += self.dump_line(line, lineno=1) buffer += ''.join(self.dump_line(line, lineno=index) for index, line in enumerate(data_iter, start=2)) else: buffer = self.dump_head(name=name) buffer += self.dump_tail() return buffer
[docs] def dump_line(self, data: Model, lineno: 'Optional[int]' = 0) -> 'str': """Serialise one-line record to a log line. Args: data: Log record. lineno: Line number of current line. Returns: The converted log string. Raises: :exc:`ASCIIWriterError`: If failed to serialise ``data`` as ASCII. """ try: return '%s\n' % self.str_separator.join(data.toascii().values()) # pylint: disable=consider-using-f-string except TypeError as error: raise ASCIIWriterError(str(error), lineno=lineno) from error
[docs] def dump_head(self, data: 'Optional[Model]' = None, name: 'Optional[str]' = None) -> 'str': """Serialise header fields of ASCII log file. Args: data: Log record. name: Log file name. Returns: The converted log string. """ if name is None: name = '<unknown>' separator = self.str_separator if data is None: empty_field = self.str_empty_field unset_field = self.str_unset_field set_separator = self.str_set_separator fields = '' types = '' else: empty_field = data.empty_field.decode('ascii') unset_field = data.unset_field.decode('ascii') set_separator = data.set_separator.decode('ascii') line_fields = data.fields fields = separator.join(line_fields.keys()) types = separator.join(field.zeek_type for field in line_fields.values()) buffer = '#separator %s\n' % unicode_escape(self.separator) # pylint: disable=consider-using-f-string buffer += '#set_separator%s%s\n' % (separator, set_separator) # pylint: disable=consider-using-f-string buffer += '#empty_field%s%s\n' % (separator, empty_field) # pylint: disable=consider-using-f-string buffer += '#unset_field%s%s\n' % (separator, unset_field) # pylint: disable=consider-using-f-string buffer += '#path%s%s\n' % (separator, os.path.splitext(name)[0]) # pylint: disable=consider-using-f-string buffer += '#open%s%s\n' % (separator, time.strftime(r'%Y-%m-%d-%H-%M-%S')) # pylint: disable=consider-using-f-string buffer += '#fields%s%s\n' % (separator, fields) # pylint: disable=consider-using-f-string buffer += '#types%s%s\n' % (separator, types) # pylint: disable=consider-using-f-string return buffer
[docs] def dump_tail(self) -> 'str': """Serialise trailing fields of ASCII log file. Returns: The converted log string. """ return '#close%s%s\n' % (self.str_separator, time.strftime(r'%Y-%m-%d-%H-%M-%S')) # pylint: disable=consider-using-f-string
[docs]def write_json(data: 'Iterable[Model]', filename: 'PathLike[str]', writer: 'Optional[Type[JSONWriter]]' = None, # pylint: disable=unused-argument,keyword-arg-before-vararg encoder: 'Optional[Type[JSONEncoder]]' = None, *args: 'Any', **kwargs: 'Any') -> 'None': """Write JSON log file. Args: data: Log records as an :class:`~typing.Iterable` of :class:`~zlogging.model.Model` per line. filename: Log file name. writer: Writer class. encoder: JSON encoder class. *args: Arbitrary positional arguments. **kwargs: Arbitrary keyword arguments. """ if writer is None: writer = JSONWriter json_writer = writer(encoder=encoder) json_writer.write(filename, data)
[docs]def dump_json(data: 'Iterable[Model]', file: 'TextFile', writer: 'Optional[Type[JSONWriter]]' = None, # pylint: disable=unused-argument,keyword-arg-before-vararg encoder: 'Optional[Type[JSONEncoder]]' = None, *args: 'Any', **kwargs: 'Any') -> 'None': """Write JSON log file. Args: data: Log records as an :class:`~typing.Iterable` of :class:`~zlogging.model.Model` per line. file: Log file object opened in text mode. writer: Writer class. encoder: JSON encoder class. *args: Arbitrary positional arguments. **kwargs: Arbitrary keyword arguments. """ if writer is None: writer = JSONWriter json_writer = writer(encoder=encoder) json_writer.write_file(file, data)
[docs]def dumps_json(data: 'Optional[Iterable[Model]]' = None, writer: 'Optional[Type[JSONWriter]]' = None, # pylint: disable=unused-argument,keyword-arg-before-vararg encoder: 'Optional[Type[JSONEncoder]]' = None, *args: 'Any', **kwargs: 'Any') -> 'str': """Write JSON log string. Args: data: Log records as an :class:`~typing.Iterable` of :class:`~zlogging.model.Model` per line. writer: Writer class. encoder: JSON encoder class. *args: Arbitrary positional arguments. **kwargs: Arbitrary keyword arguments. Returns: The JSON log string. """ if writer is None: writer = JSONWriter json_writer = writer(encoder=encoder) return json_writer.dump_file(data)
[docs]def write_ascii(data: 'Iterable[Model]', filename: 'PathLike[str]', # pylint: disable=unused-argument,keyword-arg-before-vararg writer: 'Optional[Type[ASCIIWriter]]' = None, separator: 'Optional[AnyStr]' = None, empty_field: 'Optional[AnyStr]' = None, unset_field: 'Optional[AnyStr]' = None, set_separator: 'Optional[AnyStr]' = None, *args: 'Any', **kwargs: 'Any') -> 'None': """Write ASCII log file. Args: data: Log records as an :class:`~typing.Iterable` of :class:`~zlogging.model.Model` per line. filename: Log file name. writer: Writer class. separator: Field separator when writing log lines. empty_field: Placeholder for empty field. unset_field: Placeholder for unset field. set_separator: Separator for ``set``/``vector`` fields. *args: Arbitrary positional arguments. **kwargs: Arbitrary keyword arguments. """ if writer is None: writer = ASCIIWriter ascii_writer = writer(separator=separator, empty_field=empty_field, unset_field=unset_field, set_separator=set_separator) ascii_writer.write(filename, data)
[docs]def dump_ascii(data: 'Iterable[Model]', file: 'TextFile', # pylint: disable=unused-argument,keyword-arg-before-vararg writer: 'Optional[Type[ASCIIWriter]]' = None, separator: 'Optional[AnyStr]' = None, empty_field: 'Optional[AnyStr]' = None, unset_field: 'Optional[AnyStr]' = None, set_separator: 'Optional[AnyStr]' = None, *args: 'Any', **kwargs: 'Any') -> 'None': """Write ASCII log file. Args: data: Log records as an :class:`~typing.Iterable` of :class:`~zlogging.model.Model` per line. file: Log file object opened in text mode. writer: Writer class. separator: Field separator when writing log lines. empty_field: Placeholder for empty field. unset_field: Placeholder for unset field. set_separator: Separator for ``set``/``vector`` fields. *args: Arbitrary positional arguments. **kwargs: Arbitrary keyword arguments. """ if writer is None: writer = ASCIIWriter ascii_writer = writer(separator=separator, empty_field=empty_field, unset_field=unset_field, set_separator=set_separator) ascii_writer.write_file(file, data)
[docs]def dumps_ascii(data: 'Optional[Iterable[Model]]' = None, # pylint: disable=unused-argument,keyword-arg-before-vararg writer: 'Optional[Type[ASCIIWriter]]' = None, separator: 'Optional[AnyStr]' = None, empty_field: 'Optional[AnyStr]' = None, unset_field: 'Optional[AnyStr]' = None, set_separator: 'Optional[AnyStr]' = None, *args: 'Any', **kwargs: 'Any') -> 'str': """Write ASCII log string. Args: data: Log records as an :class:`~typing.Iterable` of :class:`~zlogging.model.Model` per line. writer: Writer class. separator: Field separator when writing log lines. empty_field: Placeholder for empty field. unset_field: Placeholder for unset field. set_separator: Separator for ``set``/``vector`` fields. *args: Arbitrary positional arguments. **kwargs: Arbitrary keyword arguments. Returns: The JSON log string. """ if writer is None: writer = ASCIIWriter ascii_writer = writer(separator=separator, empty_field=empty_field, unset_field=unset_field, set_separator=set_separator) return ascii_writer.dump_file(data)
[docs]def write(data: 'Iterable[Model]', filename: 'PathLike[str]', format: 'str', *args: 'Any', **kwargs: 'Any') -> 'None': # pylint: disable=keyword-arg-before-vararg,redefined-builtin """Write Bro/Zeek log file. Args: data: Log records as an :class:`~typing.Iterable` of :class:`~zlogging.model.Model` per line. filename: Log file name. format: Log format. *args: See :func:`~zlogging.dumper.write_json` and :func:`~zlogging.dumper.write_ascii` for more information. **kwargs: See :func:`~zlogging.dumper.write_json` and :func:`~zlogging.dumper.write_ascii` for more information. Raises: :exc:`WriterFormatError`: If ``format`` is not supported. """ if format == 'ascii': return write_ascii(data, filename, *args, **kwargs) if format == 'json': return write_json(data, filename, *args, **kwargs) raise WriterFormatError('unsupported format: %s' % format) # pylint: disable=consider-using-f-string
[docs]def dump(data: 'Iterable[Model]', file: 'TextFile', format: 'str', *args: 'Any', **kwargs: 'Any') -> 'None': # pylint: disable=keyword-arg-before-vararg,redefined-builtin """Write Bro/Zeek log file. Args: data: Log records as an :class:`~typing.Iterable` of :class:`~zlogging.model.Model` per line. format: Log format. file: Log file object opened in text mode. *args: See :func:`~zlogging.dumper.dump_json` and :func:`~zlogging.dumper.dump_ascii` for more information. **kwargs: See :func:`~zlogging.dumper.dump_json` and :func:`~zlogging.dumper.dump_ascii` for more information. Raises: :exc:`WriterFormatError`: If ``format`` is not supported. """ if format == 'ascii': return dump_ascii(data, file, *args, **kwargs) if format == 'json': return dump_json(data, file, *args, **kwargs) raise WriterFormatError('unsupported format: %s' % format) # pylint: disable=consider-using-f-string
[docs]def dumps(data: 'Iterable[Model]', format: 'str', *args: 'Any', **kwargs: 'Any') -> 'str': # pylint: disable=keyword-arg-before-vararg,redefined-builtin """Write Bro/Zeek log string. Args: data: Log records as an :class:`~typing.Iterable` of :class:`~zlogging.model.Model` per line. format: Log format. *args: See :func:`~zlogging.dumper.dumps_json` and :func:`~zlogging.dumper.dumps_ascii` for more information. **kwargs: See :func:`~zlogging.dumper.dumps_json` and :func:`~zlogging.dumper.dumps_ascii` for more information. Raises: :exc:`WriterFormatError`: If ``format`` is not supported. """ if format == 'ascii': return dumps_ascii(data, *args, **kwargs) if format == 'json': return dumps_json(data, *args, **kwargs) raise WriterFormatError('unsupported format: %s' % format) # pylint: disable=consider-using-f-string