Streaming¶
Streaming methods return Stream[S] where S is a StreamState subclass. The state's process(input, out, ctx) method is called once per iteration.
- Producer streams — server pushes data; client iterates. Call
out.finish()to end. - Exchange streams — client sends data; server responds each iteration. Client controls termination via
close().
How StreamState Persistence Works¶
StreamState extends ArrowSerializableDataclass, which means every field you define on your state dataclass is automatically serializable via Apache Arrow IPC. This is the key mechanism that lets your state survive across streaming calls.
Any field you add to your state class — counters, accumulators, lists, nested dataclasses, enums — is persisted between iterations. Just define it as a dataclass field:
from dataclasses import dataclass, field
from vgi_rpc import ExchangeState, AnnotatedBatch, OutputCollector, CallContext
@dataclass
class RunningStatsState(ExchangeState):
"""All fields are automatically serialized between calls."""
count: int = 0 # simple counter
total: float = 0.0 # running accumulator
history: list[float] = field(default_factory=list) # grows each call
label: str = "" # set once at init
def exchange(self, input: AnnotatedBatch, out: OutputCollector, ctx: CallContext) -> None:
"""Each call sees the updated state from the previous call."""
values = input.batch.column("value").to_pylist()
self.count += len(values)
self.total += sum(values)
self.history.extend(values)
out.emit_pydict({"count": [self.count], "mean": [self.total / self.count]})
Supported field types¶
State fields use the same type mappings as ArrowSerializableDataclass:
| Python type | Arrow type | Example use |
|---|---|---|
int |
int64 |
Counters, indices |
float |
float64 |
Accumulators, thresholds |
str |
utf8 |
Labels, configuration |
bool |
bool_ |
Flags |
bytes |
binary |
Raw data |
list[T] |
list_<T> |
History buffers |
dict[K, V] |
map_<K, V> |
Lookup tables |
Enum |
dictionary |
Status tracking |
Optional[T] |
nullable T |
Optional config |
| Nested dataclass | struct |
Grouped sub-state |
Why this matters: pipe vs HTTP¶
The persistence mechanism differs by transport, but your code doesn't need to know:
sequenceDiagram
participant C as Client
participant S as Server
Note over C,S: Pipe / Subprocess Transport
C->>S: exchange(input_1)
Note right of S: state lives in memory
S->>C: output_1
C->>S: exchange(input_2)
Note right of S: same object, mutated in-place
S->>C: output_2
With pipe and subprocess transports, the StreamState object stays in server memory. It's mutated in-place — self.count += 1 persists because the same Python object handles every call.
sequenceDiagram
participant C as Client
participant S as Server
Note over C,S: HTTP Transport (stateless)
C->>S: init()
S-->>S: state.serialize_to_bytes()
S->>C: signed state token
C->>S: exchange(input_1, token)
S-->>S: StreamState.deserialize_from_bytes(token)
S-->>S: state.process(input_1)
S-->>S: state.serialize_to_bytes()
S->>C: output_1 + updated token
C->>S: exchange(input_2, updated_token)
S-->>S: StreamState.deserialize_from_bytes(token)
S-->>S: state.process(input_2)
S-->>S: state.serialize_to_bytes()
S->>C: output_2 + updated token
With HTTP transport, each request is stateless. After every process() call, the framework:
- Serializes your state to Arrow IPC bytes via
serialize_to_bytes() - Packs the bytes into an HMAC-signed token (prevents tampering)
- Sends the token back to the client in response metadata
- The client sends the token back with the next exchange
- The server deserializes the state and calls
process()again
This all happens transparently — you write the same StreamState subclass regardless of transport.
Design your state for serialization
Since HTTP transport serializes and deserializes state every exchange, keep state fields to serializable types (see table above). File handles, sockets, database connections, and other non-serializable resources should not be stored in state fields — use CallContext or external storage instead.
Producer vs Exchange¶
Producer streams¶
A producer stream ignores client input and pushes data until out.finish(). Use ProducerState to skip the unused input parameter:
from dataclasses import dataclass
import pyarrow as pa
from vgi_rpc import CallContext, OutputCollector, ProducerState, Stream, StreamState
@dataclass
class PaginatedQueryState(ProducerState):
"""Produces pages of results, tracking the current offset."""
query: str
page_size: int = 100
offset: int = 0 # persisted — increments each call
exhausted: bool = False # persisted — signals completion
def produce(self, out: OutputCollector, ctx: CallContext) -> None:
"""Emit one page per call."""
if self.exhausted:
out.finish()
return
# ... fetch results from offset ...
results = list(range(self.offset, self.offset + self.page_size))
if len(results) < self.page_size:
self.exhausted = True
self.offset += self.page_size
out.emit_pydict({"value": results})
Client side:
for batch in proxy.search(query="SELECT *", page_size=100):
rows = batch.batch.to_pylist()
# process each page
Exchange streams¶
An exchange stream receives client data each iteration. Use ExchangeState for clarity:
from dataclasses import dataclass, field
from enum import Enum
from vgi_rpc import AnnotatedBatch, CallContext, ExchangeState, OutputCollector
class Phase(Enum):
"""Processing phase."""
COLLECTING = "collecting"
PROCESSING = "processing"
@dataclass
class StatefulPipeline(ExchangeState):
"""Exchange state with enum and list fields — all serialized automatically."""
phase: Phase = Phase.COLLECTING
buffer: list[float] = field(default_factory=list)
threshold: float = 100.0
def exchange(self, input: AnnotatedBatch, out: OutputCollector, ctx: CallContext) -> None:
"""Accumulate values, switch phase when threshold is reached."""
values = input.batch.column("value").to_pylist()
self.buffer.extend(values)
if sum(self.buffer) >= self.threshold and self.phase == Phase.COLLECTING:
self.phase = Phase.PROCESSING
out.emit_pydict({
"phase": [self.phase.value],
"buffer_size": [len(self.buffer)],
"total": [sum(self.buffer)],
})
Client side:
with proxy.pipeline(threshold=100.0) as session:
r1 = session.exchange(AnnotatedBatch.from_pydict({"value": [10.0, 20.0]}))
# phase=collecting, buffer_size=2, total=30.0
r2 = session.exchange(AnnotatedBatch.from_pydict({"value": [80.0]}))
# phase=processing, buffer_size=3, total=110.0
Stream Headers¶
Stream methods can send a one-time ArrowSerializableDataclass header before the data stream begins. Declare it as the second type parameter: Stream[S, H]. Headers carry metadata that applies to the entire stream — total row counts, column descriptions, job identifiers, or any fixed information the client needs before processing batches.
Producer with header¶
Define a header dataclass, use it in the Protocol signature, and return it via Stream(..., header=...):
from dataclasses import dataclass
import pyarrow as pa
from vgi_rpc import (
ArrowSerializableDataclass,
CallContext,
OutputCollector,
ProducerState,
Stream,
StreamState,
)
@dataclass(frozen=True)
class JobHeader(ArrowSerializableDataclass):
"""One-time metadata sent before the stream begins."""
total_rows: int
description: str
# Protocol declares the header type
class DataService(Protocol):
"""Service with a header-bearing producer stream."""
def fetch_rows(self, query: str) -> Stream[StreamState, JobHeader]: ...
# Implementation returns the header alongside the stream
@dataclass
class FetchState(ProducerState):
"""Produces query results."""
remaining: int
def produce(self, out: OutputCollector, ctx: CallContext) -> None:
"""Emit one batch per call."""
if self.remaining <= 0:
out.finish()
return
out.emit_pydict({"value": [self.remaining]})
self.remaining -= 1
class DataServiceImpl:
"""Implementation with stream headers."""
def fetch_rows(self, query: str) -> Stream[FetchState, JobHeader]:
"""Return a stream with a header."""
schema = pa.schema([pa.field("value", pa.int64())])
header = JobHeader(total_rows=100, description=f"Results for: {query}")
return Stream(output_schema=schema, state=FetchState(remaining=100), header=header)
Client side — access the header via session.header:
from vgi_rpc import serve_pipe
with serve_pipe(DataService, DataServiceImpl()) as proxy:
session = proxy.fetch_rows(query="SELECT *")
print(session.header) # JobHeader(total_rows=100, description='Results for: SELECT *')
for batch in session:
print(batch.batch.to_pydict())
Exchange with header¶
Exchange streams work the same way — the header is available when entering the context manager:
from vgi_rpc import AnnotatedBatch
with serve_pipe(DataService, DataServiceImpl()) as proxy:
with proxy.transform_rows(factor=2.0) as session:
print(session.header) # JobHeader(...)
result = session.exchange(AnnotatedBatch.from_pydict({"value": [1.0]}))
Notes¶
- For streams without a header (
Stream[S]),session.headerreturnsNone. - Use
session.typed_header(JobHeader)for a typed narrowing that raisesTypeErrorif the header is missing or the wrong type. - Headers work across all transports (pipe, subprocess, HTTP). For HTTP, the header is included in the
/initresponse only — subsequent/exchangerequests do not re-send it. - Headers are visible in runtime introspection:
MethodDescription.has_headerandMethodDescription.header_schema. - The CLI surfaces headers in all output formats:
{"__header__": {...}}in JSON, aHeader:section in table, and a separate IPC stream in--format arrow.
API Reference¶
Stream¶
Stream
dataclass
¶
Stream(
output_schema: Schema,
state: S,
input_schema: Schema = _EMPTY_SCHEMA,
header: H | None = None,
)
Return type for stream RPC methods.
Bundles the output schema with a state object whose process(input, out, ctx)
method is called once per input batch.
For producer streams, input_schema is _EMPTY_SCHEMA (the default)
and the client iterates via __iter__ / tick(). For exchange
streams, input_schema is set to the real input schema and the client
uses exchange() / context manager.
The optional second type parameter H specifies a header type — an
ArrowSerializableDataclass that is sent once before the main data
stream begins. Use Stream[MyState, MyHeader] to declare a header;
omit it (Stream[MyState]) for streams without headers.
Client-side stub methods provide accurate types for IDE autocompletion.
__iter__
¶
__iter__() -> Iterator[AnnotatedBatch]
Iterate over output batches (client-side stub for producer streams).
| RAISES | DESCRIPTION |
|---|---|
NotImplementedError
|
Always — this is a server-side type.
Use |
Source code in vgi_rpc/rpc/_types.py
__enter__
¶
Enter context manager (client-side stub for exchange streams).
| RAISES | DESCRIPTION |
|---|---|
NotImplementedError
|
Always — this is a server-side type.
Use |
Source code in vgi_rpc/rpc/_types.py
__exit__
¶
__exit__(
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None
Exit context manager (client-side stub).
| RAISES | DESCRIPTION |
|---|---|
NotImplementedError
|
Always — this is a server-side type. |
Source code in vgi_rpc/rpc/_types.py
exchange
¶
exchange(input: AnnotatedBatch) -> AnnotatedBatch
Exchange an input batch for an output batch (client-side stub).
| PARAMETER | DESCRIPTION |
|---|---|
input
|
The input batch to send.
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
NotImplementedError
|
Always — this is a server-side type.
Use |
Source code in vgi_rpc/rpc/_types.py
tick
¶
tick() -> AnnotatedBatch
Send a tick and receive output (client-side stub for producer streams).
| RAISES | DESCRIPTION |
|---|---|
NotImplementedError
|
Always — this is a server-side type.
Use |
Source code in vgi_rpc/rpc/_types.py
close
¶
Close the stream (client-side stub).
| RAISES | DESCRIPTION |
|---|---|
NotImplementedError
|
Always — this is a server-side type.
Use |
StreamState¶
StreamState
¶
Bases: ArrowSerializableDataclass, ABC
Base class for stream state objects.
Subclasses must be dataclasses that define process(input, out, ctx)
which is called once per input batch.
For producer streams, input is a zero-row empty-schema tick
batch that can be ignored; call out.finish() to signal stream end.
For exchange streams, input is real data.
State is mutated in-place across calls.
Extends ArrowSerializableDataclass so that state can be serialized
between requests (required for HTTP transport).
process
abstractmethod
¶
process(
input: AnnotatedBatch,
out: OutputCollector,
ctx: CallContext,
) -> None
ProducerState¶
ProducerState
¶
Bases: StreamState, ABC
Base class for producer stream state objects.
Subclasses implement produce(out, ctx) instead of process(),
eliminating the phantom input parameter that producer streams
must otherwise ignore.
Call out.finish() from produce() to signal end of stream.
produce
abstractmethod
¶
produce(out: OutputCollector, ctx: CallContext) -> None
Produce output batches into the collector.
| PARAMETER | DESCRIPTION |
|---|---|
out
|
The output collector to emit batches into.
TYPE:
|
ctx
|
The call context for this request.
TYPE:
|
Source code in vgi_rpc/rpc/_types.py
process
¶
process(
input: AnnotatedBatch,
out: OutputCollector,
ctx: CallContext,
) -> None
ExchangeState¶
ExchangeState
¶
Bases: StreamState, ABC
Base class for exchange stream state objects.
Subclasses implement exchange(input, out, ctx) instead of
process(). Exchange streams must emit exactly one data batch
per call and must not call out.finish().
exchange
abstractmethod
¶
exchange(
input: AnnotatedBatch,
out: OutputCollector,
ctx: CallContext,
) -> None
Process an input batch and emit exactly one output batch.
| PARAMETER | DESCRIPTION |
|---|---|
input
|
The input batch from the client.
TYPE:
|
out
|
The output collector to emit the response batch into.
TYPE:
|
ctx
|
The call context for this request.
TYPE:
|
Source code in vgi_rpc/rpc/_types.py
process
¶
process(
input: AnnotatedBatch,
out: OutputCollector,
ctx: CallContext,
) -> None
StreamSession¶
StreamSession
¶
StreamSession(
writer_stream: IOBase,
reader_stream: IOBase,
on_log: Callable[[Message], None] | None = None,
*,
external_config: ExternalLocationConfig | None = None,
ipc_validation: IpcValidation = FULL,
shm: ShmSegment | None = None,
header: object | None = None
)
Client-side handle for a stream call (both producer and exchange patterns).
For producer streams, use __iter__() or tick().
For exchange streams, use exchange() with context manager.
Log batches are delivered to the on_log callback.
Not thread-safe: do not share a session across threads.
Initialize with writer/reader streams and optional log callback.
Source code in vgi_rpc/rpc/_client.py
typed_header
¶
Return the stream header narrowed to the expected type.
| PARAMETER | DESCRIPTION |
|---|---|
header_type
|
The expected header dataclass type.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
H
|
The header, typed as header_type. |
| RAISES | DESCRIPTION |
|---|---|
TypeError
|
If the header is |
Source code in vgi_rpc/rpc/_client.py
exchange
¶
exchange(input: AnnotatedBatch) -> AnnotatedBatch
Send an input batch, receive the output batch.
Returns an AnnotatedBatch. Log batches are delivered to
the on_log callback before returning. On RpcError, the
session is automatically closed so the transport is clean.
| RAISES | DESCRIPTION |
|---|---|
StopIteration
|
When the stream has finished. |
RpcError
|
On server-side errors or transport failures. |
Source code in vgi_rpc/rpc/_client.py
tick
¶
tick() -> AnnotatedBatch
Send a tick batch (producer streams) and receive the output batch.
| RETURNS | DESCRIPTION |
|---|---|
AnnotatedBatch
|
The next output batch. |
| RAISES | DESCRIPTION |
|---|---|
StopIteration
|
When the producer stream has finished. |
RpcError
|
On server-side errors or transport failures. |
Source code in vgi_rpc/rpc/_client.py
__iter__
¶
__iter__() -> Iterator[AnnotatedBatch]
Iterate over output batches from a producer stream.
Sends tick batches and yields output batches until the server signals stream completion.
Source code in vgi_rpc/rpc/_client.py
close
¶
Close input stream (signals EOS) and drain remaining output.
Source code in vgi_rpc/rpc/_client.py
__enter__
¶
__enter__() -> StreamSession
__exit__
¶
OutputCollector¶
OutputCollector
¶
OutputCollector(
output_schema: Schema,
*,
prior_data_bytes: int = 0,
server_id: str | None = None,
producer_mode: bool = True
)
Accumulates output batches during a produce/process call.
Enforces that exactly one data batch is emitted per call (plus any number of log batches). Batches are stored in a single ordered list because interleaving order matters for the wire protocol (logs must precede the data batch they annotate).
Initialize with the output schema for this stream.
| PARAMETER | DESCRIPTION |
|---|---|
output_schema
|
The Arrow schema for data batches.
TYPE:
|
prior_data_bytes
|
Cumulative data bytes from earlier produce/process calls in this stream.
TYPE:
|
server_id
|
Optional server identifier injected into log batch metadata.
TYPE:
|
producer_mode
|
When
TYPE:
|
Source code in vgi_rpc/rpc/_types.py
total_data_bytes
property
¶
Cumulative data bytes emitted across the stream so far.
Includes bytes from prior produce/process calls plus the current
data batch (if one has been emitted). Measured via
pa.RecordBatch.get_total_buffer_size().
data_batch
property
¶
data_batch: AnnotatedBatch
Return the single data batch, or raise if none was emitted.
validate
¶
Assert that exactly one data batch was emitted.
| RAISES | DESCRIPTION |
|---|---|
RuntimeError
|
If no data batch was emitted. |
merge_data_metadata
¶
Merge extra metadata into the data batch.
| RAISES | DESCRIPTION |
|---|---|
RuntimeError
|
If no data batch was emitted. |
Source code in vgi_rpc/rpc/_types.py
emit
¶
Emit a pre-built data batch. Raises if a data batch was already emitted.
Source code in vgi_rpc/rpc/_types.py
emit_arrays
¶
Build a RecordBatch from arrays using output_schema and emit it.
Source code in vgi_rpc/rpc/_types.py
emit_pydict
¶
Build a RecordBatch from a Python dict using output_schema and emit it.
Source code in vgi_rpc/rpc/_types.py
emit_client_log_message
¶
emit_client_log_message(msg: Message) -> None
Append a zero-row client-directed log batch (used by CallContext.emit_client_log in produce/process).
Source code in vgi_rpc/rpc/_types.py
client_log
¶
client_log(
level: Level, message: str, **extra: str
) -> None
Emit a zero-row client-directed log batch with log metadata.
finish
¶
Signal stream completion for producer streams.
Producer streams (input_schema == _EMPTY_SCHEMA) call this
to signal that no more data will be produced.
| RAISES | DESCRIPTION |
|---|---|
RuntimeError
|
If called on an exchange stream ( |
Source code in vgi_rpc/rpc/_types.py
AnnotatedBatch¶
AnnotatedBatch
dataclass
¶
AnnotatedBatch(
batch: RecordBatch,
custom_metadata: KeyValueMetadata | None = None,
_release_fn: Callable[[], None] | None = None,
)
A RecordBatch paired with its custom metadata.
Used as both input and output for all batch I/O in stream methods.
release
¶
Release associated shared memory region.
After calling release(), the batch data may be overwritten — accessing batch column data after release is undefined behavior. No-op if the batch did not come from shared memory.
Source code in vgi_rpc/rpc/_types.py
from_pydict
classmethod
¶
from_pydict(
data: dict[str, Any], schema: Schema | None = None
) -> AnnotatedBatch
Create from a Python dict, optionally with a schema.