Communication

Message Stream

An efficient, persistent, and stateless communications stream between two endpoints over BSD sockets. Supports SSL (soon) and LZ4 compression.

class daisy.communication.message_stream.EndpointServer(addr: tuple[str, int], name: str = 'EndpointServer', c_timeout: int = None, send_b_size: int = 65536, recv_b_size: int = 65536, compression: bool = False, marshal_f: ~typing.Callable[[object], bytes] = <built-in function dumps>, unmarshal_f: ~typing.Callable[[bytes], object] = <built-in function loads>, multithreading: bool = False, buffer_size: int = 1024, keep_alive: bool = True)[source]

Bases: object

Helper class to manage a group of (acceptor) connection endpoints listening to the same address. Supports all features of the existing endpoint class, besides also supporting thread-safe access, polling, and management of them as a group.

close_connections(addrs: list[tuple[str, int]], timeout: int = 10, blocking=True)[source]

Checks a list of given client addresses whether there is an available connection endpoint for each of them and closes them, shutting them also down.

Parameters:
  • addrs – Client addresses to check and close endpoints for.

  • timeout – Timeout for shutting down connection endpoints.

  • blocking – Whether to wait for endpoints to be closed before returning.

get_connections(addrs: list[tuple[str, int]]) dict[tuple[str, int], StreamEndpoint | None][source]

Checks a list of given client addresses whether there is an available connection endpoint for each of them and retrieves them.

Note that while this method is thread-safe in itself, it is not guaranteed that any returned endpoint will be still connected (and available) at the point of using it, since the underlying cleanup thread (if enabled) might have closed any potential dead endpoint if general timeout set (see __init__()).

Parameters:

addrs – Client addresses to check and retrieve endpoints for.

Returns:

Dictionary of addresses and endpoints (None if not existing).

get_new_connections(n: int = 1, timeout: int = 10) dict[tuple[str, int], StreamEndpoint][source]

Checks and retrieves the first n new connections in the underlying queue filled by the connection handler.

Note that while this method is thread-safe in itself, it is not guaranteed that any returned endpoint will be still connected (and available) at the point of using it, since the underlying cleanup thread (if enabled) might have closed any potential dead endpoint if general timeout set (see __init__()).

Parameters:
  • n – Maximum numbers of new connections to retrieve.

  • timeout – Maximum time when polling for new connections.

Returns:

Dictionary of new addresses and endpoints of new connections.

poll_connections() tuple[dict[tuple[str, int], StreamEndpoint], dict[tuple[str, int], StreamEndpoint]][source]

Polls the state of all current available connection endpoints, filtering them for readability and writability.

Note that while this method is thread-safe in itself, it is not guaranteed that any returned endpoint will be still connected (and available) at the point of using it, since the underlying cleanup thread (if enabled) might have closed any potential dead endpoint if general timeout set (see __init__()).

Returns:

Tuple of dictionary of addresses and endpoints from which can be

read from / written to.

start()[source]

Starts the endpoint server, launching the connection handlers in the background.

Raises:

RuntimeError – If endpoint server has already been started.

stop(timeout=10, blocking=True)[source]

Stops the endpoint server along all its connection endpoints, cleaning up underlying datastructures. This always shuts down all connection endpoints with a given timeout (see stop() of the Endpoint class for more information on this behavior).

Parameters:

timeout – Allows each connection endpoint to process remaining messages

until timeout. This is done for each endpoint and not in parallel if blocking. :param blocking: Whether to wait for endpoints to be closed before exiting. :raises RuntimeError: If endpoint server has not been started.

class daisy.communication.message_stream.EndpointSocket(name: str, addr: tuple[str, int] = None, remote_addr: tuple[str, int] = None, acceptor: bool = True, send_b_size: int = 65536, recv_b_size: int = 65536, keep_alive: bool = True)[source]

Bases: object

A bundle of up to two sockets, that is used to communicate with another endpoint over a persistent TCP connection in synchronous manner. Supports authentication and encryption over SSL, and stream compression using LZ4. Thread-safe for both access to the same endpoint socket and using multiple threads using endpoint sockets set to the same address (this is organized through an array of class variables, see below for more info).

Variables:

_listen_socks – Active listen sockets, along with a respective lock to access

each safely. :cvar _acc_r_socks: Pending registered connection cache for each listen socket. :cvar _acc_p_socks: Pending unregistered connection queue for each listen socket. :cvar _reg_r_addrs: Registered remote addresses. :cvar _addr_map: Mapping between registered remote addresses and their aliases. :cvar _act_l_counts: Active thread counter for each listen socket. Socket closes if counter reaches zero. :cvar _lock: General purpose lock to ensure safe access to class variables. :cvar _cls_logger: General purpose logger for class methods.

close(shutdown: bool = False)[source]

Closes the endpoint socket, cleaning up any underlying datastructures if acceptor. If already closed, allows the cleanup of just the datastructures incase a shutdown is requested.

open()[source]

Opens the endpoint socket along with its underlying socket(s) and its connection to a/the remote endpoint socket. Blocking until the connection is established.

poll(lazy: bool = False) tuple[list[bool], tuple[tuple[str, int], tuple[str, int]]][source]
Polls the state of various state and addresses of the endpoint socket:
  • 0,0: Existence of socket (true if connected).

  • 0,1: Whether there is something to read on the underlying socket.

  • 0,2: Whether one is able to write on the underlying socket.

  • 1,0: Address of endpoint socket, else None

  • 1,1: Address of remote endpoint socket, else None.

Note this does not necessarily guarantee that the underlying socket is actually connected and available for reading/writing; e.g. the connection could have broken down since then and is currently being re-established.

Parameters:

lazy – Whether to lazily skip the actual state of the underlying socket

and just check for connectivity. :return: Tuple of boolean states (connectivity, readability, writability) and address-pair of endpoint socket.

recv(timeout: int = None) bytes | None[source]

Receives the bytes of a single object sent over the connection, performing simple marshalling (size is received first, then the bytes of the object). Fault-tolerant for breakdowns and resets in the connection. Blocking in default-mode if timeout not set.

Parameters:

timeout – Timeout (seconds) to receive an object to return.

Returns:

Received bytes or None of end point socket has been closed.

Raises:
  • TimeoutError – If timeout set and triggered.

  • RuntimeError – If connection has been terminated by remote endpoint

and keep-alive is disabled.

send(p_data: bytes)[source]

Sends the given bytes of a single object over the connection, performing simple marshalling (size is sent first, then the bytes of the object). Fault-tolerant for breakdowns and resets in the connection. Blocking.

Parameters:

p_data – Bytes to send.

Raises:

RuntimeError – If connection has been terminated by remote endpoint

and keep-alive is disabled.

class daisy.communication.message_stream.StreamEndpoint(name: str = 'StreamEndpoint', addr: tuple[str, int] = None, remote_addr: tuple[str, int] = None, acceptor: bool = True, send_b_size: int = 65536, recv_b_size: int = 65536, compression: bool = False, marshal_f: ~typing.Callable[[object], bytes] = <built-in function dumps>, unmarshal_f: ~typing.Callable[[bytes], object] = <built-in function loads>, multithreading: bool = False, buffer_size: int = 1024, keep_alive: bool = True)[source]

Bases: object

One of a pair of endpoints that is able to communicate with one another over a persistent stateless stream over BSD sockets. Allows the transmission of generic objects in both synchronous and asynchronous fashion. Supports SSL and LZ4 compression for the stream. Thread-safe for both access to the same endpoint and using multiple threads using endpoints set to the same address.

classmethod create_quick_sender_ep(objects: ~typing.Iterable, remote_addr: tuple[str, int], name: str = 'QuickSenderEndpoint', addr: tuple[str, int] = None, send_b_size: int = 65536, compression: bool = False, marshal_f: ~typing.Callable[[object], bytes] = <built-in function dumps>, blocking=True)[source]

Creates a (simplified) one-time endpoint to send a number of objects to a remote endpoint before shutting down. May be called non-blocking to handle endpoint in background entirely.

Parameters:
  • objects – Iterable of objects to send to remote endpoint.

  • remote_addr – Address of remote endpoint to send messages to.

  • name – Name of endpoint for logging purposes.

  • addr – Address of endpoint.

  • send_b_size – Underlying send buffer size of socket.

  • compression – Enables lz4 stream compression for bandwidth optimization.

  • marshal_f – Marshal function to serialize objects to send into byte

  • blocking – Whether endpoint and message handling is to be done

synchronously or asynchronously (using threads).

poll() tuple[list[bool], tuple[tuple[str, int], tuple[str, int]]][source]

Polls the state of various stats of the endpoint (see below) and addresses of endpoint.

  • 0,0: Existence of underlying socket (true if connected).

  • 0,1: Whether there is something to read on the internal buffer (async)

    or underlying socket (sync).

  • 0,2: Whether one is able to write on the internal buffer (async)

    or underlying socket (sync).

  • 1,0: Address of endpoint, else None

  • 1,1: Address of remote endpoint, else None.

Note this does not necessarily guarantee that the underlying endpoint socket is actually connected and available for reading/writing; not only could have the connection broken down since then and is being re-established, but in multithreading mode the content of the internal buffers might change over time and thus change the read/write state.

Returns:

Tuple of boolean states (connectivity, readability, writability) and

address-pair of endpoint.

receive(timeout: int = None) object[source]

Generic receive function that receives data as a pickle over the persistent datastream, unpickles it into the respective object and returns it. Blocking in default-mode if timeout not set. Also supports receiving objects past the closing of the endpoint, if multithreading is enabled and the objects have already been received nad stored in the receive buffer.

Parameters:

timeout – Timeout (seconds) to receive an object to return.

Returns:

Received object.

Raises:

RuntimeError – If endpoint has not been started or has been terminated

by the remote counterpart, and there is nothing to receive asynchronously (multithreading is not enabled). :raises TimeoutError: If timeout set and triggered.

classmethod receive_latest_ep_objs(endpoints: ~typing.Iterable[~typing.Self], obj_type: type = <class 'object'>) dict[Self, Optional][source]

Endpoint helper function to receive the latest objects of a certain type from a number of endpoints. Note this flushes any other messages held by these endpoints as well, as non-blocking receives are called on them until their buffers are exhausted. Any messages of others types are discarded, as are endpoints who are not ready.

Parameters:
  • endpoints – Iterable of endpoints to receive objects from.

  • obj_type – Type of objects to receive. If none given, receives the latest

message of any type. :return: Dictionary of each endpoint and their respective latest received object, None if nothing received for endpoint.

classmethod select_eps(endpoints: Iterable[Self]) tuple[list[Self], list[Self]][source]

Endpoint select helper function to check a number of endpoints whether objects can be read from or written to them. For simplicity’s sake, does not mirror the actual UNIX select function (supporting separate lists).

Parameters:

endpoints – Iterable of endpoints to check for readiness.

Returns:

Tuple of lists of endpoints that are read/write ready:

send(obj: object)[source]

Generic send function that sends any object as a pickle over the persistent datastream. If multithreading is enabled, this function is non-blocking.

Parameters:

obj – Object to send.

Raises:

RuntimeError – If endpoint has not been started or has been terminated

by the remote counterpart.

start(blocking=True) Event[source]

Starts the endpoint, either in threaded fashion or as part of the main thread. By doing so, the two endpoints are connected and the datastream is opened. This method is blocking until a connection is established by default if multithreading is not enabled or the respective flag is not set. If either is the case, the caller can check the readiness of the connection via the returned event object. Note that in multithreading mode, objects can already be sent/received, however they will only be stored in internal buffers until the establishing of connection (sets the semaphore accordingly to allow async sender and receiver to proceed).

Parameters:

blocking – Whether to wait for a connection to be established in

non-multithreading (sync) mode. :return: Event object to check endpoint’s readiness to send/receive. Always true if start() was called blocking. :raises RuntimeError: If endpoint has already been started or shut down.

stop(shutdown=False, timeout=10, blocking=True)[source]

Stops the endpoint and closes the stream, cleaning up underlying datastructures. If multithreading is enabled, waits for both endpoint threads to stop before finishing. Note this does not guarantee the sending and receiving of all objects still pending — they may still be in internal buffers and will be processed if the endpoint is opened again, or get discarded by the underlying socket. This method is blocking until the endpoint is fully closed / shutdown if multithreading is not enabled or the respective flag is not set. If either is the case, the caller can check the progress via the returned event object.

Also note if the endpoint has not been started or has already been closed, a set shutdown flag still results in the full cleanup of the underlying datastructures.

Parameters:

shutdown – If set, also cleans up underlying datastructures of the

socket communication. :param timeout: Allows the sender thread to process remaining messages until timeout. :param blocking: Whether to wait for the endpoint to be closed in non-multithreading (sync) mode. :return: Event object to check whether endpoint is closed. Always true if stop() was called blocking. :raises RuntimeError: If endpoint has not been started or already shut down.

Tests