# Copyright (C) 2024-2025 DAI-Labor and others
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
"""Events used to labeling data. The event handler can be used by the user to create
events. Each event contains a string with conditions used to determine whether a data
point should be labeled.
Author: Jonathan Ackerschewski, Fabian Hofmann
Modified: 04.11.24
"""
import logging
import sys
from datetime import datetime
from typing import Callable, Self, Optional
import pyparsing as pp
[docs]
class Event:
"""Specific event, with a start and an end timestamp, its label, and conditions
whether a data point should be labeled as such (function evaluating to true if
that is the case, false otherwise).
"""
start_time: datetime
end_time: datetime
label: str
_condition_fn: Callable[[list[dict]], bool]
def __init__(
self,
start_time: datetime,
end_time: datetime,
label: str,
condition_fn: Callable[[list[dict]], bool],
):
"""Creates an event, which takes place between start_time and end_time. Data
points evaluated true by the condition function should be labeled with the
provided label.
:param start_time: Start time of event.
:param end_time: End time of event.
:param label: Label by which data points falling in the event should be labeled.
:param condition_fn: Condition function used to evaluate if a given data point
falls within the event.
"""
self.start_time = start_time
self.end_time = end_time
self.label = label
self._condition_fn = condition_fn
[docs]
def evaluate(self, timestamp: datetime, data: list[dict]) -> bool:
"""Evaluates a single data point using the condition function provided in the
constructor. The timestamp is used to determine whether the data point falls
within the events time frame. Additional meta information can be provided by
passing multiple dictionaries in the data parameter. The dictionaries will be
searched in the provided order. The first value found will be used for
comparisons in the condition function.
:param timestamp: The timestamp of the data point.
:param data: A single data point and additional meta information. The data
is searched in the provided order. E.g. if two dictionaries contain the key
timestamp and the condition function uses this key, then the value of the first
dictionary will be used.
"""
if self.start_time <= timestamp <= self.end_time:
return self._condition_fn(data)
else:
return False
[docs]
class EventParser:
"""Parser for conditions of events. It takes an expression, parses it, and returns
a function, which evaluates if a given data point fulfils the condition.
The condition has to follow the following grammar:
exp := pars + (binary_op + pars)? |
unary_op + pars
pars := operand | '(' + exp + ')'
operand := word + comparator + word
word := [any character except [] !"'<=>\\()] |
'[' + [any character except []!"'<=>\\()] + ']' Note that whitespaces are
allowed with brackets
comparator := '=' | 'in'
binary_op := 'and' | 'or'
unary_op := 'not'
For comparators, the feature in the dictionary is always expected on the left side
of the comparator, except with the 'in' operator, where it is expected on the right.
Some example expressions are:
ip.addr = 10.1.1.1 When the function is called with a dictionary, it will
be searched for the key ip.addr. Its value will be
compared to 10.1.1.1
tcp in protocols The dictionary will be searched for the key protocols.
The function 'tcp in <value of protocols>' will be
evaluated.
ip.addr = 10.1.1.1 and tcp in protocols
(ip.addr = 10.1.1.1 or ip.addr = 192.168.1.1) and tcp in protocols
not (ip.addr = 10.1.1.1 or ip.addr = 192.168.1.1) and tcp in protocols
The returned function can be called using a list of dictionaries. The dictionaries
will be searched in the provided order and the first occurrence of a feature in one
of the dictionaries will be used. This can be used to provide meta information about
a data point additionally to the data point itself.
"""
# The comparators and operators used by the parser
_comparators: list[str] = ["=", "in"]
_binary_operators: list[str] = ["and", "or"]
_unary_operators: list[str] = ["not"]
# Identifiers used inside the parser to mark specific tokens
_var1: str = "var1"
_var2: str = "var2"
_op: str = "op"
_parser: pp.ParserElement
def __init__(self):
"""Creates the grammar for the conditions. The specific grammar can be seen
in the description of this class.
"""
pp.ParserElement.enablePackrat()
sys.setrecursionlimit(3000)
base_word = pp.Word(pp.printables, exclude_chars="[] !\"'<=>\\()")
bracket_word = pp.Word(pp.printables + " ", exclude_chars="[]!\"'<=>\\()")
comparator = pp.oneOf(self._comparators)
boperator = pp.oneOf(self._binary_operators)
uoperator = pp.oneOf(self._unary_operators)
lpar = pp.Suppress("(")
rpar = pp.Suppress(")")
lbr = pp.Suppress("[")
rbr = pp.Suppress("]")
self._parser = pp.Forward()
word = base_word | lbr + bracket_word + rbr
operand = pp.Group(word(self._var1) + comparator(self._op) + word(self._var2))
pars = operand | lpar + self._parser + rpar
self._parser <<= pp.Group(
pars(self._var1)
+ pp.Optional(boperator(self._op) + self._parser(self._var2))
) | pp.Group(uoperator(self._op) + pars(self._var1))
[docs]
def parse(self, expression: str) -> Callable[[list[dict]], bool]:
"""Parses the given expression and returns a function that evaluates data points
passed to it. This method can raise a parse error if the expression is invalid.
:param expression: Expression (condition) to parse.
:raises ParseError: Any condition/expression does not follow parser's grammar.
"""
tree = self._parser.parseString(expression, parseAll=True)
return self._process_child(tree)
def _process_child(
self, tree: pp.ParseResults
) -> Optional[Callable[[list[dict]], bool]]:
"""Recursively processes the provided parse tree and returns a function that
evaluates the data points passed to it.
:param tree: The parse tree to process
"""
result = {}
if isinstance(tree, pp.ParseResults):
# Evaluate all children first
if tree.haskeys():
for key, _ in reversed(tree.asDict().items()):
result[key] = self._process_child(tree.pop())
else:
return self._process_child(tree.pop())
else:
# On leaf node do nothing
return None
return self._create_fn(tree.asDict(), result)
def _create_fn(
self, dictionary: dict, result: dict
) -> Callable[[list[dict]], bool]:
"""Creates a function based on the provided input. The provided dictionary
should contain any of the keys _op, _var1, and _var2 from this class.
When the key _op is any of the binary operators, _var1 and _var2 have to be
present in the result dictionary and contain a function. If the operator is
any unary operator, only _var1 needs to be present in the result dictionary.
If the operator is any comparator, the result dictionary is ignored and the
dictionary needs to contain the _var1 and _var2 keys.
The returned function can raise key errors if the dictionaries passed to it
do not contain the keys required by the generated function.
:param dictionary: Dictionary used to evaluate the function to generate.
:param result: Dictionary containing results of previous runs of this function.
:raises NotImplementedError: Operators used are unsupported by this class.
Only happens when class has been modified/overridden.
"""
if self._op not in dictionary:
return list(result.values())[0]
operation = dictionary[self._op]
if operation in self._binary_operators:
return self._create_binary_fn(result, operation)
if operation in self._unary_operators:
return self._create_unary_fn(result, operation)
if operation in self._comparators:
return self._create_comparator_fn(dictionary, operation)
raise NotImplementedError(
f"Operation {operation} not supported in EventParser."
)
def _create_binary_fn(
self, result: dict, operation: str
) -> Callable[[list[dict]], bool]:
"""Creates the binary function part. The results of previous runs of the
create_fn method are used in this function to combine them using binary
functions (and, or operators).
:param result: Dictionary containing the results of previous runs.
:param operation: Operation to perform.
:raises NotImplementedError: Operators used are unsupported by this class.
Only happens when class has been modified/overridden.
"""
match operation:
case "and":
return lambda data: result[self._var1](data) and result[self._var2](
data
)
case "or":
return lambda data: result[self._var1](data) or result[self._var2](data)
case _:
raise NotImplementedError(
f"Operation {operation} not supported in EventParser."
)
def _create_unary_fn(
self, result: dict, operation: str
) -> Callable[[list[dict]], bool]:
"""Creates the unary function part. The results of previous runs of the
create_fn method are used in this function to combine them using unary
functions (not operator).
:param result: Dictionary containing the results of previous runs
:param operation: Operation to perform.
:raises NotImplementedError: Operators used are unsupported by this class.
Only happens when class has been modified/overridden.
"""
match operation:
case "not":
return lambda data: not result[self._var1](data)
case _:
raise NotImplementedError(
f"Operation {operation} not supported in EventParser."
)
def _create_comparator_fn(
self, dictionary: dict, operation: str
) -> Callable[[list[dict]], bool]:
"""Creates the comparisons of the function. The dictionary should contain
a feature, operation, and value. Based on the comparator, a function is
returned, which performs the desired comparison.
:param dictionary: Dictionary containing feature and value to use for the
generated function.
:param operation: Operation to perform.
:raises NotImplementedError: Operators used are unsupported by this class.
Only happens when class has been modified/overridden.
"""
match operation:
case "=":
return (
lambda data: _get_value(dictionary[self._var1][0], data)
== dictionary[self._var2][0]
)
case "in":
return (
lambda data: dictionary[self._var1][0]
in _get_value(dictionary[self._var2][0], data)
if _get_value(dictionary[self._var2][0], data)
else False
)
case _:
raise NotImplementedError(
f"Operation {operation} not supported in EventParser."
)
[docs]
class EventHandler:
"""Event handler used to create events and automatically label data points. Events
can be added to the class using the add_event() method. The events are matched to
data points in added order, i.e. first matching event will be used to label
a given data point.
"""
_parser: EventParser
_events: list[Event]
_default_label: str
_label_feature: str
_error_label: str
_hide_errors: bool
def __init__(
self,
default_label: str = "benign",
label_feature: str = "label",
error_label: str = "error",
hide_errors: bool = False,
name: str = "",
):
"""Creates an event handler used to label data points.
:param default_label: Label used when no event matches data point.
:param label_feature: Feature in data point for which label will be set.
:param error_label: Error label to use if an error is encountered during
processing.
:param hide_errors: Catches any key errors occurring in process() method when
encountering data points not containing features used by the conditions of
an event, only printing them out in the logs instead of exciting, labeling
the data point as erroneous.
:param name: Name of event handler for logging purposes.
"""
self._logger = logging.getLogger(name)
self._parser = EventParser()
self._events = []
self._default_label = default_label
self._label_feature = label_feature
self._error_label = error_label
self._hide_errors = hide_errors
[docs]
def add_event(
self, start_time: datetime, end_time: datetime, label: str, condition: str = ""
) -> Self:
"""Adds an event to the event handler. The events will be evaluated in the
order they are provided. Each event has a start and end time, a label that will
be used to label data points that fall under that event, and an optional
condition. The condition is a string and has to follow a certain grammar:
exp := pars + (binary_op + pars)? |
unary_op + pars
pars := operand | '(' + exp + ')'
operand := word + comparator + word
word := [any character except [] !"'<=>\\()] |
'[' + [any character except []!"'<=>\\()] + ']' Note that whitespaces
are allowed with
brackets
comparator := '=' | 'in'
binary_op := 'and' | 'or'
unary_op := 'not'
For comparators, the feature in the dictionary is always expected on the left
side of the comparator, except with the 'in' operator, where it is expected
on the right.
Some example expressions are:
ip.addr = 10.1.1.1 When the function is called with a dictionary, it will
be searched for the key ip.addr. Its value will be
compared to 10.1.1.1
tcp in protocols The dictionary will be searched for the key protocols.
The function 'tcp in <value of protocols>' will be
evaluated.
Concatenation examples are:
ip.addr = 10.1.1.1 and tcp in protocols
(ip.addr = 10.1.1.1 or ip.addr = 192.168.1.1) and tcp in protocols
not (ip.addr = 10.1.1.1 or ip.addr = 192.168.1.1) and tcp in protocols
The returned function can be called using a list of dictionaries. The
dictionaries will be searched in the provided order and the first occurrence
of a feature in one of the dictionaries will be used. This can be used to
provide meta information about a data point additionally to the data point
itself.
:param start_time: Start time of event.
:param end_time: End time of event.
:param label: Label of event.
:param condition: Condition(s) data points have to fulfill for this event.
"""
self._logger.debug(f"Adding new event with condition {condition}")
if condition:
self._events.append(
Event(start_time, end_time, label, self._parser.parse(condition))
)
else:
self._events.append(Event(start_time, end_time, label, lambda _: True))
return self
[docs]
def process(
self,
timestamp: datetime,
data_point: dict,
meta_data: list[dict] = None,
) -> dict:
"""Iterates through all events and checks for each event if it applies to
the provided data point. If it does, the data point will be labeled with the
label provided by the event. If no event matches the data point, it will be
labeled with the default label.
:param timestamp: Timestamp of data point.
:param data_point: Data point to label.
:param meta_data: Additional meta information to label data point. Has
preference over data point when checking conditions.
processing and errors are suppressed.
:return: Labelled data point.
:raises KeyError: Data points does not contain feature used by a conditions and
errors are not suppressed, i.e. redirected to log + data point is assigned
the error label.
"""
if meta_data is None:
meta_data = []
data = meta_data + [data_point]
for event in self._events:
try:
if event.evaluate(timestamp, data):
data_point[self._label_feature] = event.label
return data_point
except KeyError as e:
if not self._hide_errors:
raise e
self._logger.error(f"Error while evaluating event: {e}")
data_point[self._label_feature] = self._error_label
return data_point
data_point[self._label_feature] = self._default_label
return data_point
def _get_value(feature: str, data: list[dict]) -> object:
"""Iterates through the provided dictionaries and returns the value of the first
occurrence of the provided feature.
:param feature: The feature to find.
:param data: List of dictionaries to search for the feature.
"""
for dictionary in data:
if feature in dictionary:
return dictionary[feature]
raise KeyError(f"Could not find {feature} in {data}")