Streaming Utils
Utility class providing helpers for working with streaming byte sources, supporting both synchronous and asynchronous producers.
These functions are useful for:
Merging multiple byte streams (sync or async)
Reading from buffered readers in chunks
Applying transformations (processing functions) to each chunk
Yielding processed or raw output as an async generator
Supports interaction with:
File-like objects exposing
.read()Async streaming sources (e.g., websocket streams, async file reads)
Iterables or async iterables of raw
bytes
Methods
chain_streams(*streams) async staticmethod
Chain multiple byte streams (sync or async) into a single asynchronous generator, yielding chunks from each stream in order.
This function abstracts the difference between:
Synchronous iterables of bytes (
Iterable[bytes])Asynchronous iterables of bytes (
AsyncIterable[bytes])
making it easy to combine them uniformly.
Parameters
streams (
*Iterable[bytes] | *AsyncIterable[bytes]) Any number of streams. Each stream must yield rawbytes. A stream is considered asynchronous if it defines__aiter__().
Yields
bytes - Byte chunks from each stream in sequence.
process_read_stream(file_stream, processor, chunk_size, yield_processed=True) async staticmethod
Read a file/stream in chunks and optionally process each chunk before yielding it. Supports both synchronous and asynchronous .read() methods.
This helper is useful when streaming large files or socket data and applying on-the-fly transformations, such as:
hashing
compression / decompression
encryption / decryption
line parsing
transcoding
Parameters
file_stream (
AsyncBufferedReader | BufferedReader) Object with a.read(chunk_size)method. If the method is async, it will be awaited automatically.processor (
Callable[[bytes], T] | Callable[[bytes], Any]) Function applied to each chunk.If
yield_processed=True, the output of this function is yielded.If
yield_processed=False, the raw chunk is yielded instead.
chunk_size (
int) Maximum number of bytes to read per iteration.yield_processed (
bool, defaultTrue) Controls what gets yielded:Trueyieldprocessor(chunk)Falseyield the rawbyteschunk
Yields
Union[T, bytes] - This generator yields values: either processed items of type
Tproduced byprocessor()whenyield_processed=True, or rawbyteschunks whenyield_processed=False.