Skip to content

Streaming

Streaming methods return Stream[S] where S is a StreamState subclass. The state's process(input, out, ctx) method is called once per iteration.

  • Producer streams — server pushes data; client iterates. Call out.finish() to end.
  • Exchange streams — client sends data; server responds each iteration. Client controls termination via close().

How StreamState Persistence Works

StreamState extends ArrowSerializableDataclass, which means every field you define on your state dataclass is automatically serializable via Apache Arrow IPC. This is the key mechanism that lets your state survive across streaming calls.

Any field you add to your state class — counters, accumulators, lists, nested dataclasses, enums — is persisted between iterations. Just define it as a dataclass field:

from dataclasses import dataclass, field

from vgi_rpc import ExchangeState, AnnotatedBatch, OutputCollector, CallContext


@dataclass
class RunningStatsState(ExchangeState):
    """All fields are automatically serialized between calls."""

    count: int = 0                            # simple counter
    total: float = 0.0                        # running accumulator
    history: list[float] = field(default_factory=list)  # grows each call
    label: str = ""                           # set once at init

    def exchange(self, input: AnnotatedBatch, out: OutputCollector, ctx: CallContext) -> None:
        """Each call sees the updated state from the previous call."""
        values = input.batch.column("value").to_pylist()
        self.count += len(values)
        self.total += sum(values)
        self.history.extend(values)
        out.emit_pydict({"count": [self.count], "mean": [self.total / self.count]})

Supported field types

State fields use the same type mappings as ArrowSerializableDataclass:

Python type Arrow type Example use
int int64 Counters, indices
float float64 Accumulators, thresholds
str utf8 Labels, configuration
bool bool_ Flags
bytes binary Raw data
list[T] list_<T> History buffers
dict[K, V] map_<K, V> Lookup tables
Enum dictionary Status tracking
Optional[T] nullable T Optional config
Nested dataclass struct Grouped sub-state

Why this matters: pipe vs HTTP

The persistence mechanism differs by transport, but your code doesn't need to know:

sequenceDiagram
    participant C as Client
    participant S as Server

    Note over C,S: Pipe / Subprocess Transport
    C->>S: exchange(input_1)
    Note right of S: state lives in memory
    S->>C: output_1
    C->>S: exchange(input_2)
    Note right of S: same object, mutated in-place
    S->>C: output_2

With pipe and subprocess transports, the StreamState object stays in server memory. It's mutated in-place — self.count += 1 persists because the same Python object handles every call.

sequenceDiagram
    participant C as Client
    participant S as Server

    Note over C,S: HTTP Transport (stateless)
    C->>S: init()
    S-->>S: state.serialize_to_bytes()
    S->>C: signed state token
    C->>S: exchange(input_1, token)
    S-->>S: StreamState.deserialize_from_bytes(token)
    S-->>S: state.process(input_1)
    S-->>S: state.serialize_to_bytes()
    S->>C: output_1 + updated token
    C->>S: exchange(input_2, updated_token)
    S-->>S: StreamState.deserialize_from_bytes(token)
    S-->>S: state.process(input_2)
    S-->>S: state.serialize_to_bytes()
    S->>C: output_2 + updated token

With HTTP transport, each request is stateless. After every process() call, the framework:

  1. Serializes your state to Arrow IPC bytes via serialize_to_bytes()
  2. Packs the bytes into an HMAC-signed token (prevents tampering)
  3. Sends the token back to the client in response metadata
  4. The client sends the token back with the next exchange
  5. The server deserializes the state and calls process() again

This all happens transparently — you write the same StreamState subclass regardless of transport.

Design your state for serialization

Since HTTP transport serializes and deserializes state every exchange, keep state fields to serializable types (see table above). File handles, sockets, database connections, and other non-serializable resources should not be stored in state fields — use CallContext or external storage instead.

Producer vs Exchange

Producer streams

A producer stream ignores client input and pushes data until out.finish(). Use ProducerState to skip the unused input parameter:

from dataclasses import dataclass

import pyarrow as pa

from vgi_rpc import CallContext, OutputCollector, ProducerState, Stream, StreamState


@dataclass
class PaginatedQueryState(ProducerState):
    """Produces pages of results, tracking the current offset."""

    query: str
    page_size: int = 100
    offset: int = 0        # persisted — increments each call
    exhausted: bool = False # persisted — signals completion

    def produce(self, out: OutputCollector, ctx: CallContext) -> None:
        """Emit one page per call."""
        if self.exhausted:
            out.finish()
            return
        # ... fetch results from offset ...
        results = list(range(self.offset, self.offset + self.page_size))
        if len(results) < self.page_size:
            self.exhausted = True
        self.offset += self.page_size
        out.emit_pydict({"value": results})

Client side:

for batch in proxy.search(query="SELECT *", page_size=100):
    rows = batch.batch.to_pylist()
    # process each page

Exchange streams

An exchange stream receives client data each iteration. Use ExchangeState for clarity:

from dataclasses import dataclass, field
from enum import Enum

from vgi_rpc import AnnotatedBatch, CallContext, ExchangeState, OutputCollector


class Phase(Enum):
    """Processing phase."""

    COLLECTING = "collecting"
    PROCESSING = "processing"


@dataclass
class StatefulPipeline(ExchangeState):
    """Exchange state with enum and list fields — all serialized automatically."""

    phase: Phase = Phase.COLLECTING
    buffer: list[float] = field(default_factory=list)
    threshold: float = 100.0

    def exchange(self, input: AnnotatedBatch, out: OutputCollector, ctx: CallContext) -> None:
        """Accumulate values, switch phase when threshold is reached."""
        values = input.batch.column("value").to_pylist()
        self.buffer.extend(values)

        if sum(self.buffer) >= self.threshold and self.phase == Phase.COLLECTING:
            self.phase = Phase.PROCESSING

        out.emit_pydict({
            "phase": [self.phase.value],
            "buffer_size": [len(self.buffer)],
            "total": [sum(self.buffer)],
        })

Client side:

with proxy.pipeline(threshold=100.0) as session:
    r1 = session.exchange(AnnotatedBatch.from_pydict({"value": [10.0, 20.0]}))
    # phase=collecting, buffer_size=2, total=30.0

    r2 = session.exchange(AnnotatedBatch.from_pydict({"value": [80.0]}))
    # phase=processing, buffer_size=3, total=110.0

Stream Headers

Stream methods can send a one-time ArrowSerializableDataclass header before the data stream begins. Declare it as the second type parameter: Stream[S, H]. Headers carry metadata that applies to the entire stream — total row counts, column descriptions, job identifiers, or any fixed information the client needs before processing batches.

Producer with header

Define a header dataclass, use it in the Protocol signature, and return it via Stream(..., header=...):

from dataclasses import dataclass

import pyarrow as pa

from vgi_rpc import (
    ArrowSerializableDataclass,
    CallContext,
    OutputCollector,
    ProducerState,
    Stream,
    StreamState,
)


@dataclass(frozen=True)
class JobHeader(ArrowSerializableDataclass):
    """One-time metadata sent before the stream begins."""

    total_rows: int
    description: str


# Protocol declares the header type
class DataService(Protocol):
    """Service with a header-bearing producer stream."""

    def fetch_rows(self, query: str) -> Stream[StreamState, JobHeader]: ...


# Implementation returns the header alongside the stream
@dataclass
class FetchState(ProducerState):
    """Produces query results."""

    remaining: int

    def produce(self, out: OutputCollector, ctx: CallContext) -> None:
        """Emit one batch per call."""
        if self.remaining <= 0:
            out.finish()
            return
        out.emit_pydict({"value": [self.remaining]})
        self.remaining -= 1


class DataServiceImpl:
    """Implementation with stream headers."""

    def fetch_rows(self, query: str) -> Stream[FetchState, JobHeader]:
        """Return a stream with a header."""
        schema = pa.schema([pa.field("value", pa.int64())])
        header = JobHeader(total_rows=100, description=f"Results for: {query}")
        return Stream(output_schema=schema, state=FetchState(remaining=100), header=header)

Client side — access the header via session.header:

from vgi_rpc import serve_pipe

with serve_pipe(DataService, DataServiceImpl()) as proxy:
    session = proxy.fetch_rows(query="SELECT *")
    print(session.header)  # JobHeader(total_rows=100, description='Results for: SELECT *')
    for batch in session:
        print(batch.batch.to_pydict())

Exchange with header

Exchange streams work the same way — the header is available when entering the context manager:

from vgi_rpc import AnnotatedBatch

with serve_pipe(DataService, DataServiceImpl()) as proxy:
    with proxy.transform_rows(factor=2.0) as session:
        print(session.header)  # JobHeader(...)
        result = session.exchange(AnnotatedBatch.from_pydict({"value": [1.0]}))

Notes

  • For streams without a header (Stream[S]), session.header returns None.
  • Use session.typed_header(JobHeader) for a typed narrowing that raises TypeError if the header is missing or the wrong type.
  • Headers work across all transports (pipe, subprocess, HTTP). For HTTP, the header is included in the /init response only — subsequent /exchange requests do not re-send it.
  • Headers are visible in runtime introspection: MethodDescription.has_header and MethodDescription.header_schema.
  • The CLI surfaces headers in all output formats: {"__header__": {...}} in JSON, a Header: section in table, and a separate IPC stream in --format arrow.

API Reference

Stream

Stream dataclass

Stream(
    output_schema: Schema,
    state: S,
    input_schema: Schema = _EMPTY_SCHEMA,
    header: H | None = None,
)

Return type for stream RPC methods.

Bundles the output schema with a state object whose process(input, out, ctx) method is called once per input batch.

For producer streams, input_schema is _EMPTY_SCHEMA (the default) and the client iterates via __iter__ / tick(). For exchange streams, input_schema is set to the real input schema and the client uses exchange() / context manager.

The optional second type parameter H specifies a header type — an ArrowSerializableDataclass that is sent once before the main data stream begins. Use Stream[MyState, MyHeader] to declare a header; omit it (Stream[MyState]) for streams without headers.

Client-side stub methods provide accurate types for IDE autocompletion.

__iter__

__iter__() -> Iterator[AnnotatedBatch]

Iterate over output batches (client-side stub for producer streams).

RAISES DESCRIPTION
NotImplementedError

Always — this is a server-side type. Use StreamSession on the client.

Source code in vgi_rpc/rpc/_types.py
def __iter__(self) -> Iterator[AnnotatedBatch]:
    """Iterate over output batches (client-side stub for producer streams).

    Raises:
        NotImplementedError: Always — this is a server-side type.
            Use ``StreamSession`` on the client.

    """
    raise NotImplementedError("Stream is a server-side type; iterate StreamSession on the client")

__enter__

__enter__() -> Self

Enter context manager (client-side stub for exchange streams).

RAISES DESCRIPTION
NotImplementedError

Always — this is a server-side type. Use StreamSession on the client.

Source code in vgi_rpc/rpc/_types.py
def __enter__(self) -> Self:
    """Enter context manager (client-side stub for exchange streams).

    Raises:
        NotImplementedError: Always — this is a server-side type.
            Use ``StreamSession`` on the client.

    """
    raise NotImplementedError("Stream is a server-side type; use StreamSession on the client")

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit context manager (client-side stub).

RAISES DESCRIPTION
NotImplementedError

Always — this is a server-side type.

Source code in vgi_rpc/rpc/_types.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit context manager (client-side stub).

    Raises:
        NotImplementedError: Always — this is a server-side type.

    """
    raise NotImplementedError

exchange

exchange(input: AnnotatedBatch) -> AnnotatedBatch

Exchange an input batch for an output batch (client-side stub).

PARAMETER DESCRIPTION
input

The input batch to send.

TYPE: AnnotatedBatch

RAISES DESCRIPTION
NotImplementedError

Always — this is a server-side type. Use StreamSession on the client.

Source code in vgi_rpc/rpc/_types.py
def exchange(self, input: AnnotatedBatch) -> AnnotatedBatch:
    """Exchange an input batch for an output batch (client-side stub).

    Args:
        input: The input batch to send.

    Raises:
        NotImplementedError: Always — this is a server-side type.
            Use ``StreamSession`` on the client.

    """
    raise NotImplementedError

tick

tick() -> AnnotatedBatch

Send a tick and receive output (client-side stub for producer streams).

RAISES DESCRIPTION
NotImplementedError

Always — this is a server-side type. Use StreamSession on the client.

Source code in vgi_rpc/rpc/_types.py
def tick(self) -> AnnotatedBatch:
    """Send a tick and receive output (client-side stub for producer streams).

    Raises:
        NotImplementedError: Always — this is a server-side type.
            Use ``StreamSession`` on the client.

    """
    raise NotImplementedError

close

close() -> None

Close the stream (client-side stub).

RAISES DESCRIPTION
NotImplementedError

Always — this is a server-side type. Use StreamSession on the client.

Source code in vgi_rpc/rpc/_types.py
def close(self) -> None:
    """Close the stream (client-side stub).

    Raises:
        NotImplementedError: Always — this is a server-side type.
            Use ``StreamSession`` on the client.

    """
    raise NotImplementedError

StreamState

StreamState

Bases: ArrowSerializableDataclass, ABC

Base class for stream state objects.

Subclasses must be dataclasses that define process(input, out, ctx) which is called once per input batch.

For producer streams, input is a zero-row empty-schema tick batch that can be ignored; call out.finish() to signal stream end.

For exchange streams, input is real data. State is mutated in-place across calls.

Extends ArrowSerializableDataclass so that state can be serialized between requests (required for HTTP transport).

process abstractmethod

process(
    input: AnnotatedBatch,
    out: OutputCollector,
    ctx: CallContext,
) -> None

Process an input batch and emit output into the collector.

Source code in vgi_rpc/rpc/_types.py
@abc.abstractmethod
def process(self, input: AnnotatedBatch, out: OutputCollector, ctx: CallContext) -> None:
    """Process an input batch and emit output into the collector."""
    ...

ProducerState

ProducerState

Bases: StreamState, ABC

Base class for producer stream state objects.

Subclasses implement produce(out, ctx) instead of process(), eliminating the phantom input parameter that producer streams must otherwise ignore.

Call out.finish() from produce() to signal end of stream.

produce abstractmethod

produce(out: OutputCollector, ctx: CallContext) -> None

Produce output batches into the collector.

PARAMETER DESCRIPTION
out

The output collector to emit batches into.

TYPE: OutputCollector

ctx

The call context for this request.

TYPE: CallContext

Source code in vgi_rpc/rpc/_types.py
@abc.abstractmethod
def produce(self, out: OutputCollector, ctx: CallContext) -> None:
    """Produce output batches into the collector.

    Args:
        out: The output collector to emit batches into.
        ctx: The call context for this request.

    """
    ...

process

process(
    input: AnnotatedBatch,
    out: OutputCollector,
    ctx: CallContext,
) -> None

Delegate to produce(), ignoring the tick input.

Source code in vgi_rpc/rpc/_types.py
def process(self, input: AnnotatedBatch, out: OutputCollector, ctx: CallContext) -> None:
    """Delegate to ``produce()``, ignoring the tick input."""
    self.produce(out, ctx)

ExchangeState

ExchangeState

Bases: StreamState, ABC

Base class for exchange stream state objects.

Subclasses implement exchange(input, out, ctx) instead of process(). Exchange streams must emit exactly one data batch per call and must not call out.finish().

exchange abstractmethod

exchange(
    input: AnnotatedBatch,
    out: OutputCollector,
    ctx: CallContext,
) -> None

Process an input batch and emit exactly one output batch.

PARAMETER DESCRIPTION
input

The input batch from the client.

TYPE: AnnotatedBatch

out

The output collector to emit the response batch into.

TYPE: OutputCollector

ctx

The call context for this request.

TYPE: CallContext

Source code in vgi_rpc/rpc/_types.py
@abc.abstractmethod
def exchange(self, input: AnnotatedBatch, out: OutputCollector, ctx: CallContext) -> None:
    """Process an input batch and emit exactly one output batch.

    Args:
        input: The input batch from the client.
        out: The output collector to emit the response batch into.
        ctx: The call context for this request.

    """
    ...

process

process(
    input: AnnotatedBatch,
    out: OutputCollector,
    ctx: CallContext,
) -> None

Delegate to exchange().

Source code in vgi_rpc/rpc/_types.py
def process(self, input: AnnotatedBatch, out: OutputCollector, ctx: CallContext) -> None:
    """Delegate to ``exchange()``."""
    self.exchange(input, out, ctx)

StreamSession

StreamSession

StreamSession(
    writer_stream: IOBase,
    reader_stream: IOBase,
    on_log: Callable[[Message], None] | None = None,
    *,
    external_config: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation = FULL,
    shm: ShmSegment | None = None,
    header: object | None = None
)

Client-side handle for a stream call (both producer and exchange patterns).

For producer streams, use __iter__() or tick(). For exchange streams, use exchange() with context manager. Log batches are delivered to the on_log callback.

Not thread-safe: do not share a session across threads.

Initialize with writer/reader streams and optional log callback.

Source code in vgi_rpc/rpc/_client.py
def __init__(
    self,
    writer_stream: IOBase,
    reader_stream: IOBase,
    on_log: Callable[[Message], None] | None = None,
    *,
    external_config: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation = IpcValidation.FULL,
    shm: ShmSegment | None = None,
    header: object | None = None,
) -> None:
    """Initialize with writer/reader streams and optional log callback."""
    self._writer_stream = writer_stream
    self._reader_stream = reader_stream
    self._on_log = on_log
    self._input_writer: ipc.RecordBatchStreamWriter | None = None
    self._output_reader: ValidatedReader | None = None
    self._closed = False
    self._external_config = external_config
    self._ipc_validation = ipc_validation
    self._shm = shm
    self._header = header

header property

header: object | None

The stream header, or None if the stream has no header.

typed_header

typed_header(header_type: type[H]) -> H

Return the stream header narrowed to the expected type.

PARAMETER DESCRIPTION
header_type

The expected header dataclass type.

TYPE: type[H]

RETURNS DESCRIPTION
H

The header, typed as header_type.

RAISES DESCRIPTION
TypeError

If the header is None or not an instance of header_type.

Source code in vgi_rpc/rpc/_client.py
def typed_header[H: ArrowSerializableDataclass](self, header_type: type[H]) -> H:
    """Return the stream header narrowed to the expected type.

    Args:
        header_type: The expected header dataclass type.

    Returns:
        The header, typed as *header_type*.

    Raises:
        TypeError: If the header is ``None`` or not an instance of
            *header_type*.

    """
    if self._header is None:
        raise TypeError(f"Stream has no header (expected {header_type.__name__})")
    if not isinstance(self._header, header_type):
        raise TypeError(f"Header type mismatch: expected {header_type.__name__}, got {type(self._header).__name__}")
    return self._header

exchange

exchange(input: AnnotatedBatch) -> AnnotatedBatch

Send an input batch, receive the output batch.

Returns an AnnotatedBatch. Log batches are delivered to the on_log callback before returning. On RpcError, the session is automatically closed so the transport is clean.

RAISES DESCRIPTION
StopIteration

When the stream has finished.

RpcError

On server-side errors or transport failures.

Source code in vgi_rpc/rpc/_client.py
def exchange(self, input: AnnotatedBatch) -> AnnotatedBatch:
    """Send an input batch, receive the output batch.

    Returns an ``AnnotatedBatch``. Log batches are delivered to
    the ``on_log`` callback before returning. On ``RpcError``, the
    session is automatically closed so the transport is clean.

    Raises:
        StopIteration: When the stream has finished.
        RpcError: On server-side errors or transport failures.

    """
    if wire_stream_logger.isEnabledFor(logging.DEBUG):
        wire_stream_logger.debug("Stream exchange: sending input")
    try:
        self._write_batch(input)
    except _TRANSPORT_ERRORS as exc:
        # Set _closed directly — calling close() would attempt I/O on the broken transport.
        self._closed = True
        raise RpcError("TransportError", f"Transport failed during stream exchange (write): {exc}", "") from exc
    try:
        return self._read_response()
    except RpcError:
        self.close()
        raise
    except _TRANSPORT_ERRORS as exc:
        self._closed = True  # Bypass close() — transport is broken.
        raise RpcError("TransportError", f"Transport failed during stream exchange (read): {exc}", "") from exc

tick

tick() -> AnnotatedBatch

Send a tick batch (producer streams) and receive the output batch.

RETURNS DESCRIPTION
AnnotatedBatch

The next output batch.

RAISES DESCRIPTION
StopIteration

When the producer stream has finished.

RpcError

On server-side errors or transport failures.

Source code in vgi_rpc/rpc/_client.py
def tick(self) -> AnnotatedBatch:
    """Send a tick batch (producer streams) and receive the output batch.

    Returns:
        The next output batch.

    Raises:
        StopIteration: When the producer stream has finished.
        RpcError: On server-side errors or transport failures.

    """
    if wire_stream_logger.isEnabledFor(logging.DEBUG):
        wire_stream_logger.debug("Stream tick")
    try:
        self._write_batch(_TICK_BATCH)
    except _TRANSPORT_ERRORS as exc:
        # Set _closed directly — calling close() would attempt I/O on the broken transport.
        self._closed = True
        raise RpcError("TransportError", f"Transport failed during stream tick (write): {exc}", "") from exc
    try:
        return self._read_response()
    except StopIteration:
        self.close()
        raise
    except RpcError:
        self.close()
        raise
    except _TRANSPORT_ERRORS as exc:
        self._closed = True  # Bypass close() — transport is broken.
        raise RpcError("TransportError", f"Transport failed during stream tick (read): {exc}", "") from exc

__iter__

__iter__() -> Iterator[AnnotatedBatch]

Iterate over output batches from a producer stream.

Sends tick batches and yields output batches until the server signals stream completion.

Source code in vgi_rpc/rpc/_client.py
def __iter__(self) -> Iterator[AnnotatedBatch]:
    """Iterate over output batches from a producer stream.

    Sends tick batches and yields output batches until the server
    signals stream completion.
    """
    while True:
        try:
            yield self.tick()
        except StopIteration:
            break

close

close() -> None

Close input stream (signals EOS) and drain remaining output.

Source code in vgi_rpc/rpc/_client.py
def close(self) -> None:
    """Close input stream (signals EOS) and drain remaining output."""
    if self._closed:
        return
    if wire_stream_logger.isEnabledFor(logging.DEBUG):
        wire_stream_logger.debug("Stream close")
    self._closed = True
    if self._input_writer is not None:
        self._input_writer.close()
    else:
        with ipc.new_stream(self._writer_stream, _EMPTY_SCHEMA):
            pass
    if self._output_reader is None:
        try:
            self._output_reader = ValidatedReader(ipc.open_stream(self._reader_stream), self._ipc_validation)
        except (pa.ArrowInvalid, OSError, StopIteration):
            return
    _MAX_DRAIN = 10_000
    with contextlib.suppress(StopIteration, RpcError, pa.ArrowInvalid, OSError):
        for _ in range(_MAX_DRAIN):
            _read_batch_with_log_check(self._output_reader, self._on_log, self._external_config, shm=self._shm)

__enter__

__enter__() -> StreamSession

Enter context manager.

Source code in vgi_rpc/rpc/_client.py
def __enter__(self) -> StreamSession:
    """Enter context manager."""
    return self

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit context manager.

Source code in vgi_rpc/rpc/_client.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit context manager."""
    self.close()

OutputCollector

OutputCollector

OutputCollector(
    output_schema: Schema,
    *,
    prior_data_bytes: int = 0,
    server_id: str | None = None,
    producer_mode: bool = True
)

Accumulates output batches during a produce/process call.

Enforces that exactly one data batch is emitted per call (plus any number of log batches). Batches are stored in a single ordered list because interleaving order matters for the wire protocol (logs must precede the data batch they annotate).

Initialize with the output schema for this stream.

PARAMETER DESCRIPTION
output_schema

The Arrow schema for data batches.

TYPE: Schema

prior_data_bytes

Cumulative data bytes from earlier produce/process calls in this stream.

TYPE: int DEFAULT: 0

server_id

Optional server identifier injected into log batch metadata.

TYPE: str | None DEFAULT: None

producer_mode

When True (default), finish() is allowed. Set to False for exchange streams where finish() is not permitted.

TYPE: bool DEFAULT: True

Source code in vgi_rpc/rpc/_types.py
def __init__(
    self,
    output_schema: pa.Schema,
    *,
    prior_data_bytes: int = 0,
    server_id: str | None = None,
    producer_mode: bool = True,
) -> None:
    """Initialize with the output schema for this stream.

    Args:
        output_schema: The Arrow schema for data batches.
        prior_data_bytes: Cumulative data bytes from earlier produce/process calls in this stream.
        server_id: Optional server identifier injected into log batch metadata.
        producer_mode: When ``True`` (default), ``finish()`` is allowed.
            Set to ``False`` for exchange streams where ``finish()`` is
            not permitted.

    """
    self._output_schema = output_schema
    self._batches: list[AnnotatedBatch] = []
    self._finished: bool = False
    self._data_batch_idx: int | None = None
    self._prior_data_bytes = prior_data_bytes
    self._producer_mode = producer_mode
    self._server_id = server_id

output_schema property

output_schema: Schema

The output schema for this stream.

finished property

finished: bool

Whether finish() has been called.

total_data_bytes property

total_data_bytes: int

Cumulative data bytes emitted across the stream so far.

Includes bytes from prior produce/process calls plus the current data batch (if one has been emitted). Measured via pa.RecordBatch.get_total_buffer_size().

batches property

batches: list[AnnotatedBatch]

The accumulated batches.

data_batch property

data_batch: AnnotatedBatch

Return the single data batch, or raise if none was emitted.

validate

validate() -> None

Assert that exactly one data batch was emitted.

RAISES DESCRIPTION
RuntimeError

If no data batch was emitted.

Source code in vgi_rpc/rpc/_types.py
def validate(self) -> None:
    """Assert that exactly one data batch was emitted.

    Raises:
        RuntimeError: If no data batch was emitted.

    """
    if self._data_batch_idx is None:
        raise RuntimeError("No data batch was emitted")

merge_data_metadata

merge_data_metadata(metadata: KeyValueMetadata) -> None

Merge extra metadata into the data batch.

RAISES DESCRIPTION
RuntimeError

If no data batch was emitted.

Source code in vgi_rpc/rpc/_types.py
def merge_data_metadata(self, metadata: pa.KeyValueMetadata) -> None:
    """Merge extra metadata into the data batch.

    Raises:
        RuntimeError: If no data batch was emitted.

    """
    if self._data_batch_idx is None:
        raise RuntimeError("No data batch was emitted")
    ab = self._batches[self._data_batch_idx]
    self._batches[self._data_batch_idx] = AnnotatedBatch(
        batch=ab.batch,
        custom_metadata=merge_metadata(ab.custom_metadata, metadata),
    )

emit

emit(
    batch: RecordBatch,
    metadata: dict[str, str] | None = None,
) -> None

Emit a pre-built data batch. Raises if a data batch was already emitted.

Source code in vgi_rpc/rpc/_types.py
def emit(
    self,
    batch: pa.RecordBatch,
    metadata: dict[str, str] | None = None,
) -> None:
    """Emit a pre-built data batch. Raises if a data batch was already emitted."""
    if self._data_batch_idx is not None:
        raise RuntimeError("Only one data batch may be emitted per call")
    self._data_batch_idx = len(self._batches)
    custom_metadata = encode_metadata(metadata) if metadata else None
    self._batches.append(AnnotatedBatch(batch=batch, custom_metadata=custom_metadata))

emit_arrays

emit_arrays(
    arrays: Sequence[Array[Any]],
    metadata: dict[str, str] | None = None,
) -> None

Build a RecordBatch from arrays using output_schema and emit it.

Source code in vgi_rpc/rpc/_types.py
def emit_arrays(
    self,
    arrays: Sequence[pa.Array[Any]],
    metadata: dict[str, str] | None = None,
) -> None:
    """Build a RecordBatch from arrays using output_schema and emit it."""
    batch = pa.RecordBatch.from_arrays(arrays, schema=self._output_schema)
    self.emit(batch, metadata=metadata)

emit_pydict

emit_pydict(
    data: dict[str, Any],
    metadata: dict[str, str] | None = None,
) -> None

Build a RecordBatch from a Python dict using output_schema and emit it.

Source code in vgi_rpc/rpc/_types.py
def emit_pydict(
    self,
    data: dict[str, Any],
    metadata: dict[str, str] | None = None,
) -> None:
    """Build a RecordBatch from a Python dict using output_schema and emit it."""
    batch = pa.RecordBatch.from_pydict(data, schema=self._output_schema)
    self.emit(batch, metadata=metadata)

emit_client_log_message

emit_client_log_message(msg: Message) -> None

Append a zero-row client-directed log batch (used by CallContext.emit_client_log in produce/process).

Source code in vgi_rpc/rpc/_types.py
def emit_client_log_message(self, msg: Message) -> None:
    """Append a zero-row client-directed log batch (used by CallContext.emit_client_log in produce/process)."""
    md = msg.add_to_metadata()
    if self._server_id is not None:
        md[SERVER_ID_KEY.decode()] = self._server_id
    custom_metadata = encode_metadata(md)
    self._batches.append(AnnotatedBatch(batch=empty_batch(self._output_schema), custom_metadata=custom_metadata))

client_log

client_log(
    level: Level, message: str, **extra: str
) -> None

Emit a zero-row client-directed log batch with log metadata.

Source code in vgi_rpc/rpc/_types.py
def client_log(self, level: Level, message: str, **extra: str) -> None:
    """Emit a zero-row client-directed log batch with log metadata."""
    self.emit_client_log_message(Message(level, message, **extra))

finish

finish() -> None

Signal stream completion for producer streams.

Producer streams (input_schema == _EMPTY_SCHEMA) call this to signal that no more data will be produced.

RAISES DESCRIPTION
RuntimeError

If called on an exchange stream (producer_mode=False).

Source code in vgi_rpc/rpc/_types.py
def finish(self) -> None:
    """Signal stream completion for producer streams.

    Producer streams (``input_schema == _EMPTY_SCHEMA``) call this
    to signal that no more data will be produced.

    Raises:
        RuntimeError: If called on an exchange stream (``producer_mode=False``).

    """
    if not self._producer_mode:
        raise RuntimeError(
            "finish() is not allowed on exchange streams; "
            "exchange streams must emit exactly one data batch per call"
        )
    self._finished = True

AnnotatedBatch

AnnotatedBatch dataclass

AnnotatedBatch(
    batch: RecordBatch,
    custom_metadata: KeyValueMetadata | None = None,
    _release_fn: Callable[[], None] | None = None,
)

A RecordBatch paired with its custom metadata.

Used as both input and output for all batch I/O in stream methods.

release

release() -> None

Release associated shared memory region.

After calling release(), the batch data may be overwritten — accessing batch column data after release is undefined behavior. No-op if the batch did not come from shared memory.

Source code in vgi_rpc/rpc/_types.py
def release(self) -> None:
    """Release associated shared memory region.

    After calling release(), the batch data may be overwritten —
    accessing batch column data after release is undefined behavior.
    No-op if the batch did not come from shared memory.
    """
    if self._release_fn is not None:
        self._release_fn()

from_pydict classmethod

from_pydict(
    data: dict[str, Any], schema: Schema | None = None
) -> AnnotatedBatch

Create from a Python dict, optionally with a schema.

Source code in vgi_rpc/rpc/_types.py
@classmethod
def from_pydict(
    cls,
    data: dict[str, Any],
    schema: pa.Schema | None = None,
) -> AnnotatedBatch:
    """Create from a Python dict, optionally with a schema."""
    batch = pa.RecordBatch.from_pydict(data, schema=schema)
    return cls(batch=batch)