Streaming Utils

Utility class providing helpers for working with streaming byte sources, supporting both synchronous and asynchronous producers.

These functions are useful for:
  • Merging multiple byte streams (sync or async)

  • Reading from buffered readers in chunks

  • Applying transformations (processing functions) to each chunk

  • Yielding processed or raw output as an async generator

Supports interaction with:
  • File-like objects exposing .read()

  • Async streaming sources (e.g., websocket streams, async file reads)

  • Iterables or async iterables of raw bytes

Methods

chain_streams(*streams) async staticmethod

Chain multiple byte streams (sync or async) into a single asynchronous generator, yielding chunks from each stream in order.

This function abstracts the difference between:

  • Synchronous iterables of bytes (Iterable[bytes])

  • Asynchronous iterables of bytes (AsyncIterable[bytes])

making it easy to combine them uniformly.

Parameters
  • streams (*Iterable[bytes] | *AsyncIterable[bytes]) Any number of streams. Each stream must yield raw bytes. A stream is considered asynchronous if it defines __aiter__().

Yields
  • bytes - Byte chunks from each stream in sequence.

process_read_stream(file_stream, processor, chunk_size, yield_processed=True) async staticmethod

Read a file/stream in chunks and optionally process each chunk before yielding it. Supports both synchronous and asynchronous .read() methods.

This helper is useful when streaming large files or socket data and applying on-the-fly transformations, such as:

  • hashing

  • compression / decompression

  • encryption / decryption

  • line parsing

  • transcoding

Parameters
  • file_stream (AsyncBufferedReader | BufferedReader) Object with a .read(chunk_size) method. If the method is async, it will be awaited automatically.

  • processor (Callable[[bytes], T] | Callable[[bytes], Any]) Function applied to each chunk.

    • If yield_processed=True, the output of this function is yielded.

    • If yield_processed=False, the raw chunk is yielded instead.

  • chunk_size (int) Maximum number of bytes to read per iteration.

  • yield_processed (bool, default True) Controls what gets yielded:

    • True yield processor(chunk)

    • False yield the raw bytes chunk

Yields
  • Union[T, bytes] - This generator yields values: either processed items of type T produced by processor() when yield_processed=True, or raw bytes chunks when yield_processed=False.

·

©

2026

·

©

2026