# -*- coding: utf-8 -*-
# pylint: disable=ungrouped-imports,unsubscriptable-object
"""Bro/Zeek log loader."""
import abc
import collections
import datetime
import io
import json
import re
import warnings
from typing import TYPE_CHECKING, TypeVar, cast
from zlogging._aux import readline
from zlogging._data import ASCIIInfo, JSONInfo
from zlogging._exc import (ASCIIParserError, ASCIIParserWarning, JSONParserError, JSONParserWarning,
ParserError, ZeekValueError)
from zlogging.model import new_model
from zlogging.types import (AddrType, AnyType, BaseType, BoolType, CountType, DoubleType, EnumType,
IntervalType, IntType, PortType, SetType, StringType, SubnetType,
TimeType, VectorType)
__all__ = [
'parse', 'parse_ascii', 'parse_json',
'loads', 'loads_ascii', 'loads_json',
'load', 'load_ascii', 'load_json',
'ASCIIParser', 'JSONParser',
]
_S = TypeVar('_S', bound='_SimpleType')
if TYPE_CHECKING:
from collections import OrderedDict
from io import BufferedReader as BinaryFile
from os import PathLike
from typing import Any, Optional, Type, Union
from typing_extensions import Literal
from zlogging._data import Info
from zlogging.model import Model
from zlogging.types import _SimpleType
AnyStr = Union[str, bytes]
[docs]class BaseParser(metaclass=abc.ABCMeta):
"""Basic log parser."""
@property
@abc.abstractmethod
def format(self) -> 'str':
"""Log file format."""
[docs] def parse(self, filename: 'PathLike[str]', model: 'Optional[Type[Model]]' = None) -> 'Info':
"""Parse log file.
Args:
filename: Log file name.
model: Field declrations of current log.
Returns:
The parsed log as an :class:`~zlogging._data.ASCIIInfo` or :class:`~zlogging._data.JSONInfo`.
"""
with open(filename, 'rb') as file:
data = self.parse_file(file, model=model)
return data
[docs] @abc.abstractmethod
def parse_file(self, file: 'BinaryFile', model: 'Optional[Type[Model]]' = None) -> 'Info':
"""Parse log file.
Args:
file: Log file object opened in binary mode.
model: Field declrations of current log.
Returns:
:class:`~zlogging._data.Info`: The parsed log as a :class:`~zlogging.model.Model` per line.
"""
[docs] @abc.abstractmethod
def parse_line(self, line: 'bytes', lineno: 'Optional[int]' = 0,
model: 'Optional[Type[Model]]' = None) -> 'Model':
"""Parse log line as one-line record.
Args:
line: A simple line of log.
lineno: Line number of current line.
model: Field declrations of current log.
Returns:
The parsed log as a plain :class:`~zlogging.model.Model`.
"""
[docs] def load(self, file: 'BinaryFile') -> 'Info':
"""Parse log file.
Args:
file: Log file object opened in binary mode.
Returns:
:class:`~zlogging._data.Info`: The parsed log as a :class:`~zlogging.model.Model` per line.
"""
return self.parse_file(file)
[docs] def loads(self, line: 'bytes', lineno: 'Optional[int]' = 0) -> 'Model':
"""Parse log line as one-line record.
Args:
line: A simple line of log.
lineno: Line number of current line.
Returns:
The parsed log as a plain :class:`~zlogging.model.Model`.
"""
return self.parse_line(line, lineno)
[docs]class JSONParser(BaseParser):
"""JSON log parser.
Args:
model: Field declrations for :class:`~zlogging.loader.JSONParser`,
as in JSON logs the field typing information are omitted by
the Bro/Zeek logging framework.
Warns:
JSONParserWarning: If ``model`` is not specified.
"""
#: Field declrations for: class: `~zlogging.loader.JSONParser`,
#: as in JSON logs the field typing information are omitted by
#: the Bro/Zeek logging framework.
model: 'Optional[Type[Model]]'
@property
def format(self) -> 'Literal["json"]':
"""Log file format."""
return 'json'
def __init__(self, model: 'Optional[Type[Model]]' = None) -> 'None':
if model is None:
warnings.warn('missing log data model specification', JSONParserWarning)
self.model = model
if TYPE_CHECKING:
def parse(self, filename: 'PathLike[str]', model: 'Optional[Type[Model]]' = None) -> 'JSONInfo': # pylint: disable=signature-differs,line-too-long
...
[docs] def parse_file(self, file: 'BinaryFile', model: 'Optional[Type[Model]]' = None) -> 'JSONInfo':
"""Parse log file.
Args:
file: Log file object opened in binary mode.
model: Field declrations of current log.
Returns:
The parsed log as a :class:`~zlogging.model.Model` per line.
"""
data = []
for index, line in enumerate(file, start=1):
data.append(self.parse_line(line, lineno=index, model=model))
return JSONInfo(
data=data
)
[docs] def parse_line(self, line: 'bytes', lineno: 'Optional[int]' = 0,
model: 'Optional[Type[Model]]' = None) -> 'Model':
"""Parse log line as one-line record.
Args:
line: A simple line of log.
lineno: Line number of current line.
model: Field declrations of current log.
Returns:
The parsed log as a plain :class:`~zlogging.model.Model`.
Raises:
:exc:`JSONParserError`: If failed to serialise the ``line`` from JSON.
"""
try:
data = json.loads(line) # type: dict[str, Any]
except json.JSONDecodeError as error:
raise JSONParserError(error.msg, lineno) from error
model_cls = model or self.model
if model_cls is None:
model_cls = new_model('<unknown>', **{field: AnyType() for field in data.keys()})
return model_cls(**data)
[docs]class ASCIIParser(BaseParser):
"""ASCII log parser.
Args:
type_hook: Bro/Zeek type parser hooks. User may customise subclasses of
:class:`~zlogging.types.BaseType` to modify parsing behaviours.
enum_namespaces: Namespaces to be loaded.
bare: If :data:`True`, do not load ``zeek`` namespace by default.
"""
#: Bro/Zeek type parser hooks.
__type__: 'dict[str, Type[BaseType]]'
#: Namespaces to be loaded.
enum_namespaces: 'list[str]'
#: If :data:`True`, do not load ``zeek`` namespace by default.
bare: 'bool'
@property
def format(self) -> 'Literal["ascii"]':
"""Log file format."""
return 'ascii'
def __init__(self, type_hook: 'Optional[dict[str, Type[BaseType]]]' = None,
enum_namespaces: 'Optional[list[str]]' = None, bare: bool = False) -> 'None':
self.__type__ = {
'bool': BoolType,
'count': CountType,
'int': IntType,
'double': DoubleType,
'time': TimeType,
'interval': IntervalType,
'string': StringType,
'addr': AddrType,
'port': PortType,
'subnet': SubnetType,
'enum': EnumType,
'set': SetType,
'vector': VectorType,
} # type: dict[str, Type[BaseType]]
if type_hook is not None:
self.__type__.update(type_hook)
self.enum_namespaces = enum_namespaces or []
self.bare = bare
if TYPE_CHECKING:
def parse(self, filename: 'PathLike[str]', model: 'Optional[Type[Model]]' = None) -> 'ASCIIInfo': # pylint: disable=signature-differs,line-too-long
...
[docs] def parse_file(self, file: 'BinaryFile', model: 'Optional[Type[Model]]' = None) -> 'ASCIIInfo':
"""Parse log file.
Args:
file: Log file object opened in binary mode.
model: Field declrations of current log. This parameter is
only kept for API compatibility with its base class
:class:`~zlogging.loader.BaseLoader`, and will **NOT**
be used at runtime.
Returns:
The parsed log as a :class:`~zlogging.model.Model` per line.
Warns:
ASCIIParserWarning: If the ASCII log file exited with error, see
:attr:`ASCIIInfo.exit_with_error <zlogging._data.ASCIIInfo.exit_with_error>`
for more information.
"""
# data separator
separator = readline(file, b' ', maxsplit=1)[1].decode('unicode_escape').encode('ascii')
# set separator
set_separator = readline(file, separator, maxsplit=1)[1]
# empty field
empty_field = readline(file, separator, maxsplit=1)[1]
# unset field
unset_field = readline(file, separator, maxsplit=1)[1]
# log path
path = readline(file, separator, maxsplit=1, decode=True)[1]
# log open time
open_time = datetime.datetime.strptime(readline(file, separator, maxsplit=1, decode=True)[1],
r'%Y-%m-%d-%H-%M-%S')
# log model
model_line = readline(file, separator, decode=True)[1:]
# log filed types
types_line = readline(file, separator, decode=True)[1:]
field_parser = [] # type: list[tuple[str, BaseType]]
model_fields = collections.OrderedDict()
for (field, type_) in zip(model_line, types_line):
match_set = re.match(r'set\[(?P<type>.+?)\]', type_)
if match_set is not None:
set_type = match_set.group('type')
ele_type = cast('Type[_SimpleType]', self.__type__[set_type])
type_cls = SetType(empty_field, unset_field, set_separator,
element_type=ele_type(empty_field, unset_field, set_separator))
field_parser.append((field, type_cls))
model_fields[field] = type_cls
continue
match_vector = re.match(r'^vector\[(?P<type>.+?)\]', type_)
if match_vector is not None:
vec_type = match_vector.group('type')
ele_type = cast('Type[_SimpleType]', self.__type__[vec_type])
type_cls = VectorType(empty_field, unset_field, set_separator,
element_type=ele_type(empty_field, unset_field, set_separator)) # type: ignore[assignment] # pylint: disable=line-too-long
field_parser.append((field, type_cls))
model_fields[field] = type_cls
continue
if type_ == 'enum':
type_cls = EnumType(empty_field, unset_field, set_separator,
namespaces=self.enum_namespaces, bare=self.bare) # type: ignore[assignment]
field_parser.append((field, type_cls))
model_fields[field] = type_cls
continue
ele_type = cast('Type[_SimpleType]', self.__type__[type_])
type_cls = ele_type(empty_field, unset_field, set_separator) # type: ignore[assignment]
field_parser.append((field, type_cls))
model_fields[field] = type_cls
model_cls = new_model(path, **model_fields)
if TYPE_CHECKING:
close_time = datetime.datetime.now()
exit_with_error = True
data = []
for index, line in enumerate(file, start=1):
if line.startswith(b'#'):
exit_with_error = False
close_time = datetime.datetime.strptime(line.strip().split(separator)[1].decode(),
r'%Y-%m-%d-%H-%M-%S')
break
parsed = self.parse_line(line, lineno=index, model=model_cls, parser=field_parser)
data.append(parsed)
if exit_with_error:
warnings.warn('log file exited with error', ASCIIParserWarning)
close_time = datetime.datetime.now()
return ASCIIInfo(
path=cast('PathLike[str]', path),
open=open_time,
close=close_time,
data=data,
exit_with_error=exit_with_error,
)
[docs] def parse_line(self, line: 'bytes', lineno: 'Optional[int]' = 0, # pylint: disable=arguments-differ
model: 'Optional[Type[Model]]' = None, separator: 'Optional[bytes]' = b'\x09',
parser: 'Optional[list[tuple[str, BaseType]]]' = None) -> 'Model':
"""Parse log line as one-line record.
Args:
line: A simple line of log.
lineno: Line number of current line.
model: Field declrations of current log.
separator: Data separator.
parser: Field data type parsers.
Returns:
The parsed log as a plain :obj:`dict`.
Raises:
:exc:`ASCIIParserError`: If ``parser`` is not provided; or failed to
serialise ``line`` as ASCII.
"""
if parser is None:
raise ASCIIParserError("parse_line() missing 1 required positional argument: 'parser'")
data = collections.OrderedDict() # type: OrderedDict[str, Any]
for i, s in enumerate(line.strip().split(separator)):
field_name, field_type = parser[i]
try:
data[field_name] = field_type(s)
except ZeekValueError as error:
raise ASCIIParserError(str(error), lineno, field_name) from error
if model is None:
model = new_model('<unknown>', **{field: AnyType() for field in data.keys()})
return model(**data)
[docs]def parse_json(filename: 'PathLike[str]', parser: 'Optional[Type[JSONParser]]' = None, # pylint: disable=unused-argument,keyword-arg-before-vararg
model: 'Optional[Type[Model]]' = None, *args: 'Any', **kwargs: 'Any') -> 'JSONInfo':
"""Parse JSON log file.
Args:
filename: Log file name.
parser: Parser class.
model: Field declarations for :class:`~zlogging.loader.JSONParser`,
as in JSON logs the field typing information are omitted by the
Bro/Zeek logging framework.
*args: Arbitrary positional arguments.
**kwargs: Arbitrary keyword arguments.
Returns:
The parsed JSON log data.
"""
if parser is None:
parser = JSONParser
json_parser = parser(model)
return json_parser.parse(filename)
[docs]def load_json(file: 'BinaryFile', parser: 'Optional[Type[JSONParser]]' = None, # pylint: disable=unused-argument,keyword-arg-before-vararg
model: 'Optional[Type[Model]]' = None, *args: 'Any', **kwargs: 'Any') -> 'JSONInfo':
"""Parse JSON log file.
Args:
file: Log file object opened in binary mode.
parser: Parser class.
model: Field declarations for :class:`~zlogging.loader.JSONParser`,
as in JSON logs the field typing information are omitted by the
Bro/Zeek logging framework.
*args: Arbitrary positional arguments.
**kwargs: Arbitrary keyword arguments.
Returns:
The parsed JSON log data.
"""
if parser is None:
parser = JSONParser
json_parser = parser(model)
return json_parser.parse_file(file)
[docs]def loads_json(data: 'AnyStr', parser: 'Optional[Type[JSONParser]]' = None, # pylint: disable=unused-argument,keyword-arg-before-vararg
model: 'Optional[Type[Model]]' = None, *args: 'Any', **kwargs: 'Any') -> 'JSONInfo':
"""Parse JSON log string.
Args:
data: Log string as binary or encoded string.
parser: Parser class.
model: Field declarations for :class:`~zlogging.loader.JSONParser`,
as in JSON logs the field typing information are omitted by the
Bro/Zeek logging framework.
*args: Arbitrary positional arguments.
**kwargs: Arbitrary keyword arguments.
Returns:
The parsed JSON log data.
"""
if isinstance(data, str):
data = data.encode('ascii')
if parser is None:
parser = JSONParser
json_parser = parser(model)
with io.BytesIO(data) as file:
info = json_parser.parse_file(file) # type: ignore[arg-type]
return info
[docs]def parse_ascii(filename: 'PathLike[str]', parser: 'Optional[Type[ASCIIParser]]' = None, # pylint: disable=unused-argument,keyword-arg-before-vararg
type_hook: 'Optional[dict[str, Type[BaseType]]]' = None,
enum_namespaces: 'Optional[list[str]]' = None,
bare: 'bool' = False, *args: 'Any', **kwargs: 'Any') -> 'ASCIIInfo':
"""Parse ASCII log file.
Args:
filename: Log file name.
parser: Parser class.
type_hook: Bro/Zeek type parser hooks. User may customise subclasses of
:class:`~zlogging.types.BaseType` to modify parsing behaviours.
enum_namespaces: Namespaces to be loaded.
bare: If :data:`True`, do not load ``zeek`` namespace by default.
*args: Arbitrary positional arguments.
**kwargs: Arbitrary keyword arguments.
Returns:
The parsed ASCII log data.
"""
if parser is None:
parser = ASCIIParser
ascii_parser = parser(type_hook, enum_namespaces, bare)
return ascii_parser.parse(filename)
[docs]def load_ascii(file: 'BinaryFile', parser: 'Optional[Type[ASCIIParser]]' = None, # pylint: disable=unused-argument,keyword-arg-before-vararg
type_hook: 'Optional[dict[str, Type[BaseType]]]' = None,
enum_namespaces: 'Optional[list[str]]' = None,
bare: 'bool' = False, *args: 'Any', **kwargs: 'Any') -> 'ASCIIInfo':
"""Parse ASCII log file.
Args:
file: Log file object opened in binary mode.
parser: Parser class.
type_hook: Bro/Zeek type parser hooks. User may customise subclasses of
:class:`~zlogging.types.BaseType` to modify parsing behaviours.
enum_namespaces: Namespaces to be loaded.
bare: If :data:`True`, do not load ``zeek`` namespace by default.
*args: Arbitrary positional arguments.
**kwargs: Arbitrary keyword arguments.
Returns:
The parsed ASCII log data.
"""
if parser is None:
parser = ASCIIParser
ascii_parser = parser(type_hook, enum_namespaces, bare)
return ascii_parser.parse_file(file)
[docs]def loads_ascii(data: 'AnyStr', parser: 'Optional[Type[ASCIIParser]]' = None, # pylint: disable=unused-argument,keyword-arg-before-vararg
type_hook: 'Optional[dict[str, Type[BaseType]]]' = None,
enum_namespaces: 'Optional[list[str]]' = None,
bare: 'bool' = False, *args: 'Any', **kwargs: 'Any') -> 'ASCIIInfo':
"""Parse ASCII log string.
Args:
data: Log string as binary or encoded string.
parser: Parser class.
type_hook: Bro/Zeek type parser hooks. User may customise subclasses of
:class:`~zlogging.types.BaseType` to modify parsing behaviours.
enum_namespaces: Namespaces to be loaded.
bare: If :data:`True`, do not load ``zeek`` namespace by default.
*args: Arbitrary positional arguments.
**kwargs: Arbitrary keyword arguments.
Returns:
The parsed ASCII log data.
"""
if isinstance(data, str):
data = data.encode('ascii')
if parser is None:
parser = ASCIIParser
ascii_parser = parser(type_hook, enum_namespaces, bare)
with io.BytesIO(data) as file:
info = ascii_parser.parse_file(file) # type: ignore[arg-type]
return info
[docs]def parse(filename: 'PathLike[str]', *args: 'Any', **kwargs: 'Any') -> 'Union[JSONInfo, ASCIIInfo]':
"""Parse Bro/Zeek log file.
Args:
filename: Log file name.
*args: See :func:`~zlogging.loader.parse_json` and
:func:`~zlogging.loader.parse_ascii` for more information.
**kwargs: See :func:`~zlogging.loader.parse_json` and
:func:`~zlogging.loader.parse_ascii` for more information.
Returns:
The parsed JSON log data.
Raises:
:exc:`ParserError`: If the format of the log file is unknown.
"""
with open(filename, 'rb') as file:
char = file.read(1)
if char == b'#':
return parse_ascii(filename, *args, **kwargs)
if char == b'{':
return parse_json(filename, *args, **kwargs)
raise ParserError('unknown format')
[docs]def load(file: 'BinaryFile', *args: 'Any', **kwargs: 'Any') -> 'Union[JSONInfo, ASCIIInfo]':
"""Parse Bro/Zeek log file.
Args:
file: Log file object opened in binary mode.
*args: See :func:`~zlogging.loader.load_json` and
:func:`~zlogging.loader.load_ascii` for more information.
**kwargs: See :func:`~zlogging.loader.load_json` and
:func:`~zlogging.loader.load_ascii` for more information.
Returns:
The parsed JSON log data.
Raises:
:exc:`ParserError`: If the format of the log file is unknown.
"""
tell = file.tell()
char = file.read(1)
file.seek(tell, io.SEEK_SET)
if char == b'#':
return load_ascii(file, *args, **kwargs)
if char == b'{':
return load_json(file, *args, **kwargs)
raise ParserError('unknown format')
[docs]def loads(data: 'AnyStr', *args: 'Any', **kwargs: 'Any') -> 'Union[JSONInfo, ASCIIInfo]':
"""Parse Bro/Zeek log string.
Args:
data: Log string as binary or encoded string.
*args: See :func:`~zlogging.loader.loads_json` and
:func:`~zlogging.loader.loads_ascii` for more information.
**kwargs: See :func:`~zlogging.loader.loads_json` and
:func:`~zlogging.loader.loads_ascii` for more information.
Returns:
The parsed JSON log data.
Raises:
:exc:`ParserError`: If the format of the log file is unknown.
"""
if isinstance(data, str):
data = data.encode('ascii')
if data.startswith(b'#'):
return loads_ascii(data, *args, **kwargs)
if data.startswith(b'{'):
return loads_json(data, *args, **kwargs)
raise ParserError('unknown format')