Source code for daisy.data_sources.data_relay

# Copyright (C) 2024-2025 DAI-Labor and others
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
"""A number of useful tools that build on top of the data handler module, to provide
relays of data points, either over a network over communication endpoints or directly
to local file(s) on disk. Both wrap around DataHandler and thus process the data
stream as it yields data points. Can be used for arbitrarily large arbitrary data
streams.

Author: Fabian Hofmann, Jonathan Ackerschewski
Modified: 17.04.23
"""

import logging
import threading
from collections import OrderedDict
from pathlib import Path
from typing import IO, Iterable

from daisy.communication import StreamEndpoint
from .data_handler import DataHandler


[docs] class DataHandlerRelay: """A union of a data handler and a stream endpoint to retrieve data points from the former and relay them over the latter. This allows the disaggregation of the actual data handler from the other processing steps. For example, the relay could be deployed with our without an actual processor on another host and the data is forwarded over the network to another host running a data handler with a SimpleRemoteDataSource to receive and further process the data. This chain could also be continued beyond a single host pair. """ _logger: logging.Logger _data_handler: DataHandler _endpoint: StreamEndpoint _relay: threading.Thread _started: bool _completed = threading.Event def __init__( self, data_handler: DataHandler, endpoint: StreamEndpoint, name: str = "" ): """Creates a new data handler relay. :param data_handler: Data handler to relay data points from. :param endpoint: Streaming endpoint to which data points are relayed to. :param name: Name of data source relay for logging purposes. """ self._logger = logging.getLogger(name) self._logger.info("Initializing data handler relay...") self._started = False self._completed = threading.Event() self.data_handler = data_handler self._endpoint = endpoint self._logger.info("Data handler relay initialized.")
[docs] def start(self, blocking: bool = False): """Starts the data handler relay along any other objects in this union (data handler, endpoint). Non-blocking, as the relay is started in the background to allow the stopping of it afterward. :param blocking: Whether the relay should block until all data points have been processed. :return: Event object to check whether relay has completed processing every data point and may be closed. Only useful when calling start() non-blocking, otherwise it is implicitly used to wait for completion. """ self._logger.info("Starting data handler relay...") if self._started: raise RuntimeError("Relay has already been started!") self._started = True self._completed.clear() try: self.data_handler.open() except RuntimeError: pass try: self._endpoint.start() except RuntimeError: pass self._relay = threading.Thread(target=self._create_relay, daemon=True) self._relay.start() self._logger.info("Data handler relay started.") if blocking: self._completed.wait() self._logger.info("Relay has processed all data points and may be closed.") return self._completed
[docs] def stop(self): """Closes and stops the data handler and the endpoint and joins the relay thread into the current thread. Can be restarted (and stopped) and arbitrary amount of times. """ self._logger.info("Stopping data handler relay...") if not self._started: raise RuntimeError("Endpoint has not been started!") self._started = False try: self.data_handler.close() except RuntimeError: pass try: self._endpoint.stop() except RuntimeError: pass self._relay.join() self._logger.info("Data handler relay stopped.")
def _create_relay(self): """Actual relay, directly forwards data points from its data handler to its endpoint (both might be async). """ self._logger.info("Starting to relay data points from data handler...") for d_point in self._data_handler: try: self._endpoint.send(d_point) except RuntimeError: # stop() was called break self._logger.info("Data source exhausted, or relay closed.") self._completed.set() def __del__(self): if self._started: self.stop()
[docs] class CSVFileRelay: """A union of a data handler and a (csv) file handler to retrieve data points from the former and storing them in the file. This allows the pre-processing of data points from a stream and re-using them at a later time by replaying the file. Note that such a relay requires an intact dictionary containing values for all fields of the data point header's parameters. """ _logger: logging.Logger _data_handler: DataHandler _file: Path _header_buffer_size: int _headers: tuple[str, ...] _headers_provided: bool _separator: str _default_missing_value: object _d_point_counter: int _d_point_buffer: list _header_buffer: OrderedDict _do_buffer: bool _relay: threading.Thread _started: bool _completed = threading.Event def __init__( self, data_handler: DataHandler, target_file: str | Path, name: str = "", header_buffer_size: int = 1000, headers: tuple[str, ...] = None, overwrite_file: bool = False, separator: str = ",", default_missing_value: object = "", ): """Creates a new CSV file relay. :param data_handler: The data handler providing the data points to write to file. The processor used by the data handler is expected to return data points as a dictionary containing all values for all fields in the header's parameter. :param target_file: The path to the (new) CSV file. The parent directories will be created if not existent. :param name: Name of the relay for logging purposes. :param header_buffer_size: Number of packets to buffer to generate a common header via auto-detection. Note it is not guaranteed that all features/headers of all data points in the (possible infinite) stream will be discovered, if for example a data point with new features could arrive after the discovery is completed. On the other hand, if the buffer size is great or equal to the number of points, the entire stream is used for discovery. :param headers: If this is provided, the auto header discovery will be turned off and the provided headers will be used instead. :param overwrite_file: Whether the file should be overwritten if it exists. :param separator: Separator used in the CSV file. :param default_missing_value: Default value if a feature is not present in a data point. :raises ValueError: * If no buffer size is negative or 0. * If an invalid CSV separator is provided. * If file path provided is not valid. """ self._logger = logging.getLogger(name) self._logger.info("Initializing file relay...") self._started = False self._completed = threading.Event() if headers is None and header_buffer_size <= 0: raise ValueError("Header buffer size must be greater 0") if separator == '"': raise ValueError(f"'{separator}' is not allowed as a separator") if target_file is None or not target_file: raise ValueError("File to write to required.") self._file = Path(target_file) if self._file.is_dir(): raise ValueError("File path points to a directory instead of a file.") # create parent directories, then touch the file, # to check whether it exists and is a valid path parent_dir = Path(*self._file.parts[:-1]) parent_dir.mkdir(parents=True, exist_ok=True) try: self._file.touch(exist_ok=False) except FileNotFoundError: raise ValueError("File points to an invalid path.") except FileExistsError: if not overwrite_file: raise ValueError("File already exists and should not be overwritten.") self._data_handler = data_handler self._separator = separator if headers is not None: self._header_buffer_size = 0 self._headers = headers self._headers_provided = True else: self._header_buffer_size = header_buffer_size self._headers = () self._headers_provided = False self._default_missing_value = default_missing_value self._d_point_counter = 0 self._d_point_buffer = [] self._header_buffer = OrderedDict() self._do_buffer = not self._headers_provided self._logger.info("File relay initialized.")
[docs] def start(self, blocking: bool = False): """Starts the csv file relay along the data source itself. Non-blocking, as the relay is started in the background to allow the stopping of it afterward. :param blocking: Whether the relay should block until all data points have been processed. :return: Event object to check whether file relay has completed processing every data point and may be closed. Only useful when calling start() non-blocking, otherwise it is implicitly used to wait for completion. """ self._logger.info("Starting file relay...") if self._started: raise RuntimeError("Relay has already been started!") self._started = True self._completed.clear() try: self._data_handler.open() except RuntimeError: pass self._relay = threading.Thread(target=self._create_relay, daemon=True) self._relay.start() self._logger.info("File relay started.") if blocking: self._completed.wait() self._logger.info( "File relay has processed all data points and may be closed." ) return self._completed
[docs] def stop(self): """Closes and stops the data handler and joins the relay thread into the current thread. Can be restarted (and stopped) and arbitrary amount of times. Note that stopping the relay will not reset the csv file, only recreating it will do that. """ self._logger.info("Stopping file relay...") if not self._started: raise RuntimeError("Relay has not been started!") self._started = False try: self._data_handler.close() except RuntimeError: pass self._relay.join() self._logger.info("File relay stopped.")
def _create_relay(self): """Actual relay, directly writes data points from its data source to its filehandle in csv style. :raises TypeError: Retrieved data point is not of type dictionary. Only dictionaries are supported. """ self._logger.info("Starting to relay data points from data source...") if self._headers_provided: self._logger.info(f"Using headers: {self._headers}") else: self._logger.info("Attempting to discover headers...") with open(self._file, "w") as file: if self._headers_provided: self._write_line_to_file(file, self._headers) try: self._iterate_data_points(file=file) except RuntimeError: # stop() was called pass if self._d_point_counter <= self._header_buffer_size: self._process_buffer(file=file) self._logger.info("Data source exhausted, or relay closed.") self._completed.set() def _iterate_data_points(self, file: IO): """Iterates through data points and writes them to the csv file. :param file: File to write to :raises TypeError: Data point is not of type dictionary. Only dictionaries are supported. """ for d_point in self._data_handler: try: if not self._started: # stop was called break self._process_data_point(file=file, d_point=d_point) except RuntimeError: # stop() was called break def _process_data_point(self, file: IO, d_point: dict): """Processes a single data point. Writes it to the buffer or file. :param file: File to write to :param d_point: data point to process :raises TypeError: Data point is not of type dictionary. Only dictionaries are supported. """ if not isinstance(d_point, dict): raise TypeError("Received data point that is not of type dictionary.") if self._d_point_counter % 100 == 0: self._logger.debug(f"Received packet {self._d_point_counter}. ") if self._d_point_counter < self._header_buffer_size: self._header_buffer.update(OrderedDict.fromkeys(d_point.keys())) self._d_point_buffer += [d_point] else: if self._do_buffer: self._process_buffer(file=file) self._do_buffer = False self._write_data_point_to_file(file, d_point) self._d_point_counter += 1 def _process_buffer(self, file: IO): """Processes the buffer by detecting the headers and writing its contents to the csv file. :param file: File to write to """ self._headers = tuple(self._header_buffer) self._logger.info( "Headers found with buffer size of " f"{self._header_buffer_size}: {self._headers}" ) self._write_line_to_file(file, self._headers) for d_point_in_buffer in self._d_point_buffer: self._write_data_point_to_file(file, d_point_in_buffer) def _write_data_point_to_file(self, file: IO, d_point: dict): """Writes a single data point to the csv file. :param file: File to write to :param d_point: data point to write """ values = map(lambda topic: self._get_value(d_point, topic), self._headers) self._write_line_to_file(file, values) def _write_line_to_file(self, file: IO, line: Iterable[str]): """Writes a single line to the csv file. :param file: File to write to :param line: Line to write """ file.write(f"{self._separator.join(line)}\n") def _get_value(self, d_point: dict, topic: str) -> str: """Retrieves a value from a data point, transforming it into a string to prepare it for writing. If it contains csv separators, puts it into quotation marks to escape these characters, to make sure it stays a single entry. :param d_point: Data point to retrieve value from. :param topic: Topic to retrieve from data point. :return: Value of data point. """ string_value = str(d_point.get(topic, self._default_missing_value)) if self._separator in string_value: string_value = string_value.replace('"', '\\"') string_value = f'"{string_value}"' return string_value def __del__(self): if self._started: self.stop()