Datasources
Data Handler
Core class wrapper component for stream processing both finite and infinite data handler into sample-wise data points, each being passed to further (ML) tasks once and in order. For this a data source is required (see the docstring of the data source module), that provides the origin of any data points being processed for further (ML) tasks, and a data processor that prepares the data samples by applying processing steps to them (see the docstring of the data source module).
- class daisy.data_sources.data_handler.DataHandler(data_source: DataSource, data_processor: DataProcessor, name: str = '', multithreading: bool = False, buffer_size: int = 1024)[source]
Bases:
object
A wrapper around a customizable data source that yields data points as objects as they come, before stream processing using another, customizable data processor. Data points, which can be from arbitrary sources, are thus processed and converted into numpy vectors/arrays for ML tasks. Note that there is also the option to keep the object/dict format in case stream processing.
Supports the processing of data points in both synchronous and asynchronous fashion by default.
- close()[source]
Shuts down any thread running in the background to load data into the data handler iff in multithreading mode. Can be reopened (and closed) and arbitrary amount of times.
- open()[source]
Opens the data handler for data point retrieval. Must be called before data can be retrieved; in multithreading mode also starts the loader thread as daemon.
- Returns:
Event object to check whether data handler has completed processing
every data point and may be closed. Only useful when iterating through a source manually since __iter__() automatically stops yielding objects when completed.
Data Processor
Base class data processor and its relevant pre-built processing function steps for generic data in the form of objects and dictionaries. The data processor processes individual data points using any number of user-defined functions. These functions can either be defined from scratch or chosen from a list of pre-built ones.
- class daisy.data_sources.data_processor.DataProcessor(name: str = '')[source]
Bases:
object
Base class for generic data stream processing, in pipelined fashion. The processing steps are implemented as functions independent of each other, carried out in one specific order for each data point in the stream. For these functions, there is also a list of pre-built processing steps as methods for ease of use, but any customized function can also be added using add_func().
Extension of this base class merely manipulate _functions through the use of add_func() to provide additional pre-built processing steps.
- add_func(func: Callable[[object], object]) Self [source]
Adds a function to the processor to the end of its function list.
- Parameters:
func – The function to add to the processor.
- dict_to_array(nn_aggregator: Callable[[str, object], object]) Self [source]
Adds a function to the processor that takes a data point which is a dictionary and lazily transforms it into a numpy array without further processing, aggregating any value that is list into a singular value based on a pre-defined function that operates only on each singular feature.
- Parameters:
nn_aggregator – Aggregator, which maps non-numerical features to integers
or floats on a value-by-value basis.
- dict_to_json()[source]
Adds a function to the processor that takes a data point which is a dictionary and converts it to a JSON string.
- flatten_dict(separator: str = '.') Self [source]
Adds a function to the processor that creates a flat dictionary (a dictionary without sub-dictionaries) from the given dictionary. The keys of sub-dictionaries are merged into the parent dictionary by combining the keys and adding a separator: {a: {b: c, d: e}, f: g} becomes {a.b: c, a.d: e, f: g} assuming the separator as ‘.’. However, redundant parent keys are greedily eliminated from the dictionary and further collisions cause an error.
- Parameters:
separator – Separator to use.
- keep_dict_feature(features: list) Self [source]
Adds a function to the processor that takes a data point as a dictionary and keeps all given features.
- Parameters:
features – List of features to keep.
- Returns:
Dictionary of data point with features kept.
- process(o_point: object) object [source]
Processes the given data point using the provided functions. The functions are carried out in the order they were added. If no functions were provided, the data point is returned unchanged (noop).
Note process() is usually called by the data handler during data processing and should not be called directly.
- remove_dict_features(features: list) Self [source]
Adds a function to the processor that takes a data point as a dictionary and removes all given features from it.
- Parameters:
features – List of features to remove.
- select_dict_features(features: list, default_value=None) Self [source]
Adds a function to the processor that takes a data point which is a dictionary and selects features to keep. If a feature should be kept but isn’t present in the data point, it will be added with the default value.
- Parameters:
features – List of features to select.
default_value – Default value if feature is not in data point.
- daisy.data_sources.data_processor.flatten_dict(dictionary: (<class 'dict'>, <class 'list'>), separator: str = '.', par_key: str = '') dict [source]
Creates a flat dictionary (a dictionary without sub-dictionaries) from the given dictionary. The keys of sub-dictionaries are merged into the parent dictionary by combining the keys and adding a separator: {a: {b: c, d: e}, f: g} becomes {a.b: c, a.d: e, f: g} assuming the separator as ‘.’. However, redundant parent keys are greedily eliminated from the dictionary.
- Parameters:
dictionary – Dictionary to flatten.
separator – Separator to use.
par_key – Key of the parent dictionary.
- Returns:
Flat dictionary with keys merged and seperated using the separator.
- Raises:
ValueError – If there are key-collisions by greedily flattening the
dictionary.
- daisy.data_sources.data_processor.keep_feature(d_point: dict, f_features: list) dict [source]
Takes a data point as a dictionary and removes all features not in the given list.
- Parameters:
d_point – Dictionary of data point.
f_features – List of features to keep.
- Returns:
Dictionary of data point with features kept.
- daisy.data_sources.data_processor.remove_feature(d_point: dict, f_features: list) dict [source]
Takes a data point as a dictionary and removes all given features from it.
- Parameters:
d_point – Dictionary of data point.
f_features – List of features to remove.
- Returns:
Dictionary of data point with features removed.
- daisy.data_sources.data_processor.select_feature(d_point: dict, f_features: list, default_value=None) dict [source]
Takes a data point as a dictionary and selects features to keep. If a feature should be kept but isn’t present in the data point, it will be added with the default value.
- Parameters:
d_point – Dictionary of data point.
f_features – List of features to select.
default_value – Default value if feature is not in original data point.
- Returns:
Dictionary of data point with selected features.
Data Relay
A number of useful tools that build on top of the data handler module, to provide relays of data points, either over a network over communication endpoints or directly to local file(s) on disk. Both wrap around DataHandler and thus process the data stream as it yields data points. Can be used for arbitrarily large arbitrary data streams.
- class daisy.data_sources.data_relay.CSVFileRelay(data_handler: DataHandler, target_file: str | Path, name: str = '', header_buffer_size: int = 1000, headers: tuple[str, ...] = None, overwrite_file: bool = False, separator: str = ',', default_missing_value: object = '')[source]
Bases:
object
A union of a data handler and a (csv) file handler to retrieve data points from the former and storing them in the file. This allows the pre-processing of data points from a stream and re-using them at a later time by replaying the file.
Note that such a relay requires an intact dictionary containing values for all fields of the data point header’s parameters.
- start(blocking: bool = False)[source]
Starts the csv file relay along the data source itself. Non-blocking, as the relay is started in the background to allow the stopping of it afterward.
- Parameters:
blocking – Whether the relay should block until all data points have
been processed. :return: Event object to check whether file relay has completed processing every data point and may be closed. Only useful when calling start() non-blocking, otherwise it is implicitly used to wait for completion.
- class daisy.data_sources.data_relay.DataHandlerRelay(data_handler: DataHandler, endpoint: StreamEndpoint, name: str = '')[source]
Bases:
object
A union of a data handler and a stream endpoint to retrieve data points from the former and relay them over the latter. This allows the disaggregation of the actual data handler from the other processing steps. For example, the relay could be deployed with our without an actual processor on another host and the data is forwarded over the network to another host running a data handler with a SimpleRemoteDataSource to receive and further process the data. This chain could also be continued beyond a single host pair.
- start(blocking: bool = False)[source]
Starts the data handler relay along any other objects in this union (data handler, endpoint). Non-blocking, as the relay is started in the background to allow the stopping of it afterward.
- Parameters:
blocking – Whether the relay should block until all data points have
been processed. :return: Event object to check whether relay has completed processing every data point and may be closed. Only useful when calling start() non-blocking, otherwise it is implicitly used to wait for completion.
Data Source
A collection of the core interface and base classes for the first component of any data handler (see the docstring of the data handler class), that provides the origin of any data points being processed for further (ML) tasks. Supports generic generators, but also remote communication endpoints that hand over generic data points in streaming-manner, and any other implementations of the DataSource class. Note each different kind of data may need its own implementation of the DataSource.
- class daisy.data_sources.data_source.CSVFileDataSource(files: str | list[str], name: str = '')[source]
Bases:
DataSource
This implementation of the DataSource reads one or multiple CSV files and yields their content. The output of this class are dictionaries containing the headers (first row) of the CSV files as the keys and the line as the values. Each CSV is, therefore, expected to have a header line as the first row.
- class daisy.data_sources.data_source.DataSource(name: str = '')[source]
Bases:
ABC
An abstract wrapper around a generator-like structure that has to yield data points as objects as they come for processing. That generator may be infinite or finite, as long as it is bounded on both sides by the following two methods that must be implemented:
open(): Enables the “generator” to provision data points.
close(): Closes the “generator”.
Note that as DataHandler wraps itself around given data sources to retrieve objects, open() and close() do not need to be implemented to be idempotent and arbitrarily permutable. Same can be assumed for __iter__() as it will only be called when the data source has been opened already. At the same time, __iter__() must be exhausted after close() has been called.
- class daisy.data_sources.data_source.SimpleDataSource(generator: Iterator[object], name: str = '')[source]
Bases:
DataSource
The simplest productive data source — an actual wrapper around a generator that is always open and cannot be closed, yielding data points as objects as they are yielded. Can be infinite or finite; no matter, no control over the generator is natively supported.
- class daisy.data_sources.data_source.SimpleRemoteDataSource(endpoint: StreamEndpoint, name: str = '')[source]
Bases:
DataSource
The simple wrapper implementation to support and handle remote streaming endpoints of the Endpoint module as data sources. Considered infinite in nature, as it allows the generation of data point objects from a connected endpoint, until the client closes the data source.
Events
Events used to labeling data. The event handler can be used by the user to create events. Each event contains a string with conditions used to determine whether a data point should be labeled.
- class daisy.data_sources.events.Event(start_time: datetime, end_time: datetime, label: str, condition_fn: Callable[[list[dict]], bool])[source]
Bases:
object
Specific event, with a start and an end timestamp, its label, and conditions whether a data point should be labeled as such (function evaluating to true if that is the case, false otherwise).
- end_time: datetime
- evaluate(timestamp: datetime, data: list[dict]) bool [source]
Evaluates a single data point using the condition function provided in the constructor. The timestamp is used to determine whether the data point falls within the events time frame. Additional meta information can be provided by passing multiple dictionaries in the data parameter. The dictionaries will be searched in the provided order. The first value found will be used for comparisons in the condition function.
- Parameters:
timestamp – The timestamp of the data point.
data – A single data point and additional meta information. The data
is searched in the provided order. E.g. if two dictionaries contain the key timestamp and the condition function uses this key, then the value of the first dictionary will be used.
- label: str
- start_time: datetime
- class daisy.data_sources.events.EventHandler(default_label: str = 'benign', label_feature: str = 'label', error_label: str = 'error', hide_errors: bool = False, name: str = '')[source]
Bases:
object
Event handler used to create events and automatically label data points. Events can be added to the class using the add_event() method. The events are matched to data points in added order, i.e. first matching event will be used to label a given data point.
- add_event(start_time: datetime, end_time: datetime, label: str, condition: str = '') Self [source]
Adds an event to the event handler. The events will be evaluated in the order they are provided. Each event has a start and end time, a label that will be used to label data points that fall under that event, and an optional condition. The condition is a string and has to follow a certain grammar:
- exp := pars + (binary_op + pars)? |
unary_op + pars
pars := operand | ‘(’ + exp + ‘)’ operand := word + comparator + word word := [any character except [] !”’<=>()] |
- ‘[’ + [any character except []!”’<=>()] + ‘]’ Note that whitespaces
are allowed with brackets
comparator := ‘=’ | ‘in’ binary_op := ‘and’ | ‘or’ unary_op := ‘not’
For comparators, the feature in the dictionary is always expected on the left side of the comparator, except with the ‘in’ operator, where it is expected on the right.
Some example expressions are: ip.addr = 10.1.1.1 When the function is called with a dictionary, it will
be searched for the key ip.addr. Its value will be compared to 10.1.1.1
- tcp in protocols The dictionary will be searched for the key protocols.
The function ‘tcp in <value of protocols>’ will be evaluated.
Concatenation examples are: ip.addr = 10.1.1.1 and tcp in protocols (ip.addr = 10.1.1.1 or ip.addr = 192.168.1.1) and tcp in protocols not (ip.addr = 10.1.1.1 or ip.addr = 192.168.1.1) and tcp in protocols
The returned function can be called using a list of dictionaries. The dictionaries will be searched in the provided order and the first occurrence of a feature in one of the dictionaries will be used. This can be used to provide meta information about a data point additionally to the data point itself.
- Parameters:
start_time – Start time of event.
end_time – End time of event.
label – Label of event.
condition – Condition(s) data points have to fulfill for this event.
- process(timestamp: datetime, data_point: dict, meta_data: list[dict] = None) dict [source]
Iterates through all events and checks for each event if it applies to the provided data point. If it does, the data point will be labeled with the label provided by the event. If no event matches the data point, it will be labeled with the default label.
- Parameters:
timestamp – Timestamp of data point.
data_point – Data point to label.
meta_data – Additional meta information to label data point. Has
preference over data point when checking conditions. processing and errors are suppressed. :return: Labelled data point. :raises KeyError: Data points does not contain feature used by a conditions and errors are not suppressed, i.e. redirected to log + data point is assigned the error label.
- class daisy.data_sources.events.EventParser[source]
Bases:
object
Parser for conditions of events. It takes an expression, parses it, and returns a function, which evaluates if a given data point fulfils the condition.
The condition has to follow the following grammar: exp := pars + (binary_op + pars)? |
unary_op + pars
pars := operand | ‘(’ + exp + ‘)’ operand := word + comparator + word word := [any character except [] !”’<=>()] |
- ‘[’ + [any character except []!”’<=>()] + ‘]’ Note that whitespaces are
allowed with brackets
comparator := ‘=’ | ‘in’ binary_op := ‘and’ | ‘or’ unary_op := ‘not’
For comparators, the feature in the dictionary is always expected on the left side of the comparator, except with the ‘in’ operator, where it is expected on the right.
- Some example expressions are:
- ip.addr = 10.1.1.1 When the function is called with a dictionary, it will
be searched for the key ip.addr. Its value will be compared to 10.1.1.1
- tcp in protocols The dictionary will be searched for the key protocols.
The function ‘tcp in <value of protocols>’ will be evaluated.
ip.addr = 10.1.1.1 and tcp in protocols (ip.addr = 10.1.1.1 or ip.addr = 192.168.1.1) and tcp in protocols not (ip.addr = 10.1.1.1 or ip.addr = 192.168.1.1) and tcp in protocols
The returned function can be called using a list of dictionaries. The dictionaries will be searched in the provided order and the first occurrence of a feature in one of the dictionaries will be used. This can be used to provide meta information about a data point additionally to the data point itself.
- parse(expression: str) Callable[[list[dict]], bool] [source]
Parses the given expression and returns a function that evaluates data points passed to it. This method can raise a parse error if the expression is invalid.
- Parameters:
expression – Expression (condition) to parse.
- Raises:
ParseError – Any condition/expression does not follow parser’s grammar.
Network Traffic
Implementations of the data handler helper interface that allows the processing and provisioning of pyshark packets, either via file inputs, live capture, or a remote source that generates packets in either fashion.
LivePysharkDataSource - DataSource which simply yields captured packets from a
list of interfaces. * PcapDataSource - DataSource which is able to load pcap files sequentially and yield their packets. * PysharkProcessor - Offers additional processing step options to process pyshark packet objects.
There is also a module specialized for traffic of cohda boxes (V2X), that offers additional functionalities:
demo_202303- Event tags for labeling purposes for the March23 dataset.
- class daisy.data_sources.network_traffic.LivePysharkDataSource(name: str = '', interfaces: list = 'any', bpf_filter: str = '')[source]
Bases:
DataSource
The wrapper implementation to support and handle pyshark live captures as data sources. Considered infinite in nature, as it allows the generation of pyshark packets, until the capture is stopped. Beware that you might have to use root privileges to obtain data from this data source. If privileges are missing pyshark might not return any data points or warnings.
- class daisy.data_sources.network_traffic.PcapDataSource(*file_names: str, try_counter: int = 3, name: str = '')[source]
Bases:
DataSource
The wrapper implementation to support and handle any number of pcap files as data sources. Finite: finishes after all files have been processed. Warning: Not entirely compliant with the data source abstract class: Neither fully thread safe, nor does its __iter__() method shut down after close() has been called. Due to its finite nature acceptable however, as this data source is nearly always only closed once all data points have been retrieved.
- class daisy.data_sources.network_traffic.PysharkProcessor(name: str = '')[source]
Bases:
DataProcessor
Extension of the data processor base class with pre-built processing steps specifically for pyshark packets.
- classmethod create_simple_processor(name: str = '', f_features: list[str, ...] = ('meta.len', 'meta.time', 'meta.time_epoch', 'meta.protocols', 'ip.addr', 'sll.halen', 'sll.pkttype', 'sll.eth', 'sll.hatype', 'sll.unused', 'ipv6.tclass', 'ipv6.flow', 'ipv6.nxt', 'ipv6.src_host', 'ipv6.host', 'ipv6.hlim', 'sll.ltype', 'cohda.Type', 'cohda.Ret', 'cohda.llc.MKxIFMsg.Ret', 'ipv6.addr', 'ipv6.dst', 'ipv6.plen', 'tcp.stream', 'tcp.payload', 'tcp.urgent_pointer', 'tcp.port', 'tcp.options.nop', 'tcp.options.timestamp', 'tcp.flags', 'tcp.window_size_scalefactor', 'tcp.dstport', 'tcp.len', 'tcp.checksum', 'tcp.window_size', 'tcp.srcport', 'tcp.checksum.status', 'tcp.nxtseq', 'tcp.status', 'tcp.analysis.bytes_in_flight', 'tcp.analysis.push_bytes_sent', 'tcp.ack', 'tcp.hdr_len', 'tcp.seq', 'tcp.window_size_value', 'data.data', 'data.len', 'tcp.analysis.acks_frame', 'tcp.analysis.ack_rtt', 'eth.src.addr', 'eth.src.eth.src_resolved', 'eth.src.ig', 'eth.src.src_resolved', 'eth.src.addr_resolved', 'ip.proto', 'ip.dst_host', 'ip.flags', 'ip.len', 'ip.checksum', 'ip.checksum.status', 'ip.version', 'ip.host', 'ip.status', 'ip.id', 'ip.hdr_len', 'ip.ttl'), nn_aggregator: ~typing.Callable[[str, object], object] = <function pcap_nn_aggregator>) Self [source]
Creates a simple pyshark processor selecting specific features from each data point (nan if not existing) and transforms them into numpy vectors, ready for to be further processed by detection models.
- Parameters:
name – Name of processor for logging purposes.
f_features – Features to extract from the packets.
nn_aggregator – Aggregator, which should map non-numerical features to
integers / floats.
- daisy.data_sources.network_traffic.create_pyshark_processor(name: str = '', f_features: list[str, ...] = ('meta.len', 'meta.time', 'meta.time_epoch', 'meta.protocols', 'ip.addr', 'sll.halen', 'sll.pkttype', 'sll.eth', 'sll.hatype', 'sll.unused', 'ipv6.tclass', 'ipv6.flow', 'ipv6.nxt', 'ipv6.src_host', 'ipv6.host', 'ipv6.hlim', 'sll.ltype', 'cohda.Type', 'cohda.Ret', 'cohda.llc.MKxIFMsg.Ret', 'ipv6.addr', 'ipv6.dst', 'ipv6.plen', 'tcp.stream', 'tcp.payload', 'tcp.urgent_pointer', 'tcp.port', 'tcp.options.nop', 'tcp.options.timestamp', 'tcp.flags', 'tcp.window_size_scalefactor', 'tcp.dstport', 'tcp.len', 'tcp.checksum', 'tcp.window_size', 'tcp.srcport', 'tcp.checksum.status', 'tcp.nxtseq', 'tcp.status', 'tcp.analysis.bytes_in_flight', 'tcp.analysis.push_bytes_sent', 'tcp.ack', 'tcp.hdr_len', 'tcp.seq', 'tcp.window_size_value', 'data.data', 'data.len', 'tcp.analysis.acks_frame', 'tcp.analysis.ack_rtt', 'eth.src.addr', 'eth.src.eth.src_resolved', 'eth.src.ig', 'eth.src.src_resolved', 'eth.src.addr_resolved', 'ip.proto', 'ip.dst_host', 'ip.flags', 'ip.len', 'ip.checksum', 'ip.checksum.status', 'ip.version', 'ip.host', 'ip.status', 'ip.id', 'ip.hdr_len', 'ip.ttl'), nn_aggregator: ~typing.Callable[[str, object], object] = <function pcap_nn_aggregator>)[source]
Creates a DataProcessor using functions specifically for pyshark packets, selecting specific features from each data pont (nan if not existing) and transforms them into numpy vectors, ready for to be further processed by detection models.
- Parameters:
name – The name for logging purposes
f_features – The features to extract from the packets
nn_aggregator – The aggregator, which should map features to integers
- daisy.data_sources.network_traffic.demo_202303_label_data_point(client_id: int, d_point: dict) dict [source]
Labels the data points according to the events for the demo 202303.
- Parameters:
client_id – Client ID.
d_point – Data point as dictionary.
- Returns:
Labeled data point.
- daisy.data_sources.network_traffic.dict_to_json(dictionary: dict) str [source]
Takes a dictionary and returns a json object in form of a string.
- Parameters:
dictionary – The dictionary to convert to json string.
- Returns:
A JSON string from the dictionary.
- daisy.data_sources.network_traffic.dict_to_numpy_array(d_point: dict, nn_aggregator: Callable[[str, object], object]) ndarray [source]
Transform the pyshark data point directly into a numpy array without further processing, aggregating any value that is list into a singular value.
- Parameters:
d_point – Data point as dictionary.
nn_aggregator – Aggregator, which maps non-numerical features to integers
or floats. :return: Data point as vector.
- daisy.data_sources.network_traffic.packet_to_dict(p: Packet) dict [source]
Takes a single pyshark packet and converts it into a dictionary.
- Parameters:
p – Packet to convert.
- Returns:
Dictionary generated from the packet.
- daisy.data_sources.network_traffic.pcap_nn_aggregator(key: str, value: object) int | float [source]
Simple, exemplary value aggregator. Takes a non-numerical (i.e. string) key-value pair and attempts to converted it into an integer / float. This example does not take the key into account, but only checks the types of the value to proceed. Note, that ipv6 are lazily converted to 32 bit (collisions may occur).
- Parameters:
key – Name of pair, which always a string.
value – Arbitrary non-numerical value to be converted.
- Returns:
Converted numerical value.
- Raises:
ValueError – If value cannot be converted.