Skip to content

Core RPC

The core module provides the server, connection, transport interface, error types, and convenience functions for defining and running RPC services.

Typical Usage

Most users only need serve_pipe (testing) or connect (subprocess):

from vgi_rpc import serve_pipe, connect

# In-process (tests)
with serve_pipe(MyService, MyServiceImpl()) as proxy:
    proxy.my_method(arg=42)

# Subprocess
with connect(MyService, ["python", "worker.py"]) as proxy:
    proxy.my_method(arg=42)

For more control, use RpcServer and RpcConnection directly.

API Reference

RpcServer

RpcServer

RpcServer(
    protocol: type,
    implementation: object,
    *,
    external_location: ExternalLocationConfig | None = None,
    server_id: str | None = None,
    enable_describe: bool = False,
    ipc_validation: IpcValidation = FULL
)

Dispatches RPC requests to an implementation over IO-stream transports.

Initialize with a protocol type and its implementation.

PARAMETER DESCRIPTION
protocol

The Protocol class defining the RPC interface.

TYPE: type

implementation

Object implementing all methods from protocol.

TYPE: object

external_location

Optional ExternalLocation configuration.

TYPE: ExternalLocationConfig | None DEFAULT: None

server_id

Optional server identifier; auto-generated if None.

TYPE: str | None DEFAULT: None

enable_describe

When True, the server handles __describe__ requests returning machine-readable method metadata.

TYPE: bool DEFAULT: False

ipc_validation

Validation level for incoming IPC batches. Defaults to FULL for maximum safety.

TYPE: IpcValidation DEFAULT: FULL

Source code in vgi_rpc/rpc/_server.py
def __init__(
    self,
    protocol: type,
    implementation: object,
    *,
    external_location: ExternalLocationConfig | None = None,
    server_id: str | None = None,
    enable_describe: bool = False,
    ipc_validation: IpcValidation = IpcValidation.FULL,
) -> None:
    """Initialize with a protocol type and its implementation.

    Args:
        protocol: The Protocol class defining the RPC interface.
        implementation: Object implementing all methods from *protocol*.
        external_location: Optional ExternalLocation configuration.
        server_id: Optional server identifier; auto-generated if ``None``.
        enable_describe: When ``True``, the server handles ``__describe__``
            requests returning machine-readable method metadata.
        ipc_validation: Validation level for incoming IPC batches.
            Defaults to ``FULL`` for maximum safety.

    """
    self._protocol = protocol
    self._impl = implementation
    self._ipc_validation = ipc_validation
    self._methods = rpc_methods(protocol)
    self._external_config = external_location
    self._server_id = server_id if server_id is not None else uuid.uuid4().hex[:12]
    self._dispatch_hook: _DispatchHook | None = None
    _validate_implementation(protocol, implementation, self._methods)

    if enable_describe:
        from vgi_rpc.introspect import DESCRIBE_METHOD_NAME, build_describe_batch

        self._describe_batch: pa.RecordBatch | None
        self._describe_metadata: pa.KeyValueMetadata | None
        self._describe_batch, self._describe_metadata = build_describe_batch(
            protocol.__name__, self._methods, self._server_id
        )
        # Register __describe__ as a synthetic unary method so normal dispatch handles it.
        self._methods = {
            **self._methods,
            DESCRIBE_METHOD_NAME: RpcMethodInfo(
                name=DESCRIBE_METHOD_NAME,
                params_schema=_EMPTY_SCHEMA,
                result_schema=self._describe_batch.schema,
                result_type=type(None),
                method_type=MethodType.UNARY,
                has_return=True,
                doc="Return machine-readable metadata about all server methods.",
            ),
        }
    else:
        self._describe_batch = None
        self._describe_metadata = None

    # Detect which impl methods accept a `ctx` parameter.
    self._ctx_methods: frozenset[str] = frozenset(
        name
        for name in self._methods
        if (method := getattr(implementation, name, None)) is not None
        and "ctx" in inspect.signature(method).parameters
    )

    _logger.info(
        "RpcServer created for %s (server_id=%s, methods=%d)",
        protocol.__name__,
        self._server_id,
        len(self._methods),
        extra={"server_id": self._server_id, "protocol": protocol.__name__, "method_count": len(self._methods)},
    )

methods property

methods: Mapping[str, RpcMethodInfo]

Return method metadata for this server's protocol.

implementation property

implementation: object

The implementation object.

external_config property

external_config: ExternalLocationConfig | None

The ExternalLocation configuration, if any.

server_id property

server_id: str

Short random identifier for this server instance.

protocol_name property

protocol_name: str

Name of the Protocol class this server implements.

ctx_methods property

ctx_methods: frozenset[str]

Method names whose implementations accept a ctx parameter.

describe_enabled property

describe_enabled: bool

Whether __describe__ introspection is enabled.

ipc_validation property

ipc_validation: IpcValidation

Validation level for incoming IPC batches.

serve

serve(transport: RpcTransport) -> None

Serve RPC requests in a loop until the transport is closed.

Source code in vgi_rpc/rpc/_server.py
def serve(self, transport: RpcTransport) -> None:
    """Serve RPC requests in a loop until the transport is closed."""
    while True:
        try:
            self.serve_one(transport)
        except (EOFError, StopIteration):
            break
        except (BrokenPipeError, ConnectionResetError, ConnectionAbortedError):
            _logger.debug(
                "serve loop ending due to broken pipe",
                exc_info=True,
                extra={"server_id": self._server_id},
            )
            break
        except pa.ArrowInvalid:
            _logger.warning(
                "serve loop ending due to ArrowInvalid",
                exc_info=True,
                extra={"server_id": self._server_id},
            )
            break

serve_one

serve_one(transport: RpcTransport) -> None

Handle a single RPC call (any method type) over the given transport.

Protocol-level errors (VersionError, RpcError from missing metadata) are caught, written back as error responses, and the method returns normally so the serve loop can continue.

RAISES DESCRIPTION
ArrowInvalid

If the incoming data is not valid Arrow IPC. An error response is written to transport before raising so the client can read a structured RpcError.

Source code in vgi_rpc/rpc/_server.py
def serve_one(self, transport: RpcTransport) -> None:
    """Handle a single RPC call (any method type) over the given transport.

    Protocol-level errors (``VersionError``, ``RpcError`` from missing
    metadata) are caught, written back as error responses, and the
    method returns normally so the serve loop can continue.

    Raises:
        pa.ArrowInvalid: If the incoming data is not valid Arrow IPC.
            An error response is written to *transport* before raising so
            the client can read a structured ``RpcError``.

    """
    token = _current_request_id.set(_generate_request_id())
    stats = CallStatistics()
    stats_token = _current_call_stats.set(stats)
    md_token = _current_request_metadata.set(None)
    dynamic_shm: ShmSegment | None = None
    try:
        try:
            method_name, kwargs = _read_request(transport.reader, self._ipc_validation)
        except pa.ArrowInvalid as exc:
            with contextlib.suppress(BrokenPipeError, OSError):
                _write_error_stream(transport.writer, _EMPTY_SCHEMA, exc, server_id=self._server_id)
            raise
        except (VersionError, RpcError) as exc:
            with contextlib.suppress(BrokenPipeError, OSError):
                _write_error_stream(transport.writer, _EMPTY_SCHEMA, exc, server_id=self._server_id)
            return

        info = self._methods.get(method_name)
        if info is None:
            available = sorted(self._methods.keys())
            _write_error_stream(
                transport.writer,
                _EMPTY_SCHEMA,
                AttributeError(f"Unknown method: '{method_name}'. Available methods: {available}"),
                server_id=self._server_id,
            )
            return

        _deserialize_params(kwargs, info.param_types, self._ipc_validation)

        try:
            _validate_params(info.name, kwargs, info.param_types)
        except TypeError as exc:
            err_schema = info.result_schema if info.method_type == MethodType.UNARY else _EMPTY_SCHEMA
            _write_error_stream(transport.writer, err_schema, exc, server_id=self._server_id)
            return

        # Determine SHM segment: prefer transport-level, fall back to request metadata
        shm = transport.shm if isinstance(transport, ShmPipeTransport) else None
        if shm is None:
            dynamic_shm = _maybe_attach_shm(_current_request_metadata.get())
            shm = dynamic_shm

        if info.method_type == MethodType.UNARY:
            self._serve_unary(transport, info, kwargs, stats=stats, shm=shm)
        elif info.method_type == MethodType.STREAM:
            self._serve_stream(transport, info, kwargs, stats=stats, shm=shm)
    finally:
        if dynamic_shm is not None:
            with contextlib.suppress(BufferError):
                dynamic_shm.close()
        _current_request_metadata.reset(md_token)
        _current_call_stats.reset(stats_token)
        _current_request_id.reset(token)

RpcConnection

RpcConnection

RpcConnection(
    protocol: type[P],
    transport: RpcTransport,
    on_log: Callable[[Message], None] | None = None,
    *,
    external_location: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation = FULL
)

Context manager that provides a typed RPC proxy over a transport.

The type parameter P is the Protocol class, enabling IDE autocompletion for all methods defined on the protocol::

with RpcConnection(MyProtocol, transport) as svc:
    result = svc.add(a=1, b=2)   # IDE sees MyProtocol methods

Initialize with a protocol type and transport.

Source code in vgi_rpc/rpc/_client.py
def __init__(
    self,
    protocol: type[P],
    transport: RpcTransport,
    on_log: Callable[[Message], None] | None = None,
    *,
    external_location: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation = IpcValidation.FULL,
) -> None:
    """Initialize with a protocol type and transport."""
    self._protocol = protocol
    self._transport = transport
    self._on_log = on_log
    self._external_config = external_location
    self._ipc_validation = ipc_validation

__enter__

__enter__() -> P

Enter the context and return a typed proxy.

Source code in vgi_rpc/rpc/_client.py
def __enter__(self) -> P:
    """Enter the context and return a typed proxy."""
    if wire_transport_logger.isEnabledFor(logging.DEBUG):
        wire_transport_logger.debug("RpcConnection open: protocol=%s", self._protocol.__name__)
    return cast(
        P,
        _RpcProxy(
            self._protocol,
            self._transport,
            self._on_log,
            external_config=self._external_config,
            ipc_validation=self._ipc_validation,
        ),
    )

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Close the transport.

Source code in vgi_rpc/rpc/_client.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Close the transport."""
    if wire_transport_logger.isEnabledFor(logging.DEBUG):
        wire_transport_logger.debug("RpcConnection close: protocol=%s", self._protocol.__name__)
    self._transport.close()

RpcTransport

RpcTransport

Bases: Protocol

Bidirectional byte stream transport.

reader property

reader: IOBase

Readable binary stream.

writer property

writer: IOBase

Writable binary stream.

close

close() -> None

Close the transport.

Source code in vgi_rpc/rpc/_transport.py
def close(self) -> None:
    """Close the transport."""
    ...

RpcMethodInfo

RpcMethodInfo dataclass

RpcMethodInfo(
    name: str,
    params_schema: Schema,
    result_schema: Schema,
    result_type: object,
    method_type: MethodType,
    has_return: bool,
    doc: str | None,
    param_defaults: dict[str, object] = dict(),
    param_types: dict[str, object] = dict(),
    header_type: (
        type[ArrowSerializableDataclass] | None
    ) = None,
)

Metadata for a single RPC method, derived from Protocol type hints.

Produced by :func:rpc_methods when introspecting a Protocol class. Each instance describes one method's wire-protocol details: its Arrow schemas, parameter types and defaults, and the original docstring.

ATTRIBUTE DESCRIPTION
name

Method name as it appears on the Protocol.

TYPE: str

params_schema

Arrow schema for the serialized request parameters.

TYPE: Schema

result_schema

Arrow schema for the serialized response (unary only; empty schema for stream methods).

TYPE: Schema

result_type

The raw Python return-type annotation (e.g. float, Stream[MyState]).

TYPE: object

method_type

Whether this is a UNARY or STREAM call.

TYPE: MethodType

has_return

True when the unary method returns a value (False for -> None or stream methods).

TYPE: bool

doc

The method's docstring from the Protocol class, or None if no docstring was provided.

TYPE: str | None

param_defaults

Mapping of parameter name to default value for parameters that have defaults in the Protocol signature.

TYPE: dict[str, object]

param_types

Mapping of parameter name to its Python type annotation (excludes self and return).

TYPE: dict[str, object]

header_type

For stream methods with a header, the concrete ArrowSerializableDataclass subclass for the header. None when the method has no header.

TYPE: type[ArrowSerializableDataclass] | None

MethodType

MethodType

Bases: Enum

Classification of RPC method patterns.

Errors

RpcError

RpcError(
    error_type: str,
    error_message: str,
    remote_traceback: str,
    *,
    request_id: str = ""
)

Bases: Exception

Raised on the client side when the server reports an error.

Initialize with error details from the remote side.

Source code in vgi_rpc/rpc/_common.py
def __init__(self, error_type: str, error_message: str, remote_traceback: str, *, request_id: str = "") -> None:
    """Initialize with error details from the remote side."""
    self.error_type = error_type
    self.error_message = error_message
    self.remote_traceback = remote_traceback
    self.request_id = request_id
    super().__init__(f"{error_type}: {error_message}")

VersionError

Bases: Exception

Raised when a request has a missing or incompatible protocol version.

See also IPCError in the Serialization module.

CallStatistics

CallStatistics dataclass

CallStatistics(
    input_batches: int = 0,
    output_batches: int = 0,
    input_rows: int = 0,
    output_rows: int = 0,
    input_bytes: int = 0,
    output_bytes: int = 0,
)

Mutable accumulator of per-call I/O counters for usage accounting.

Created at dispatch start and populated as batches flow through the server. Surfaced through the access log and OTel dispatch hook.

Byte measurement: uses pa.RecordBatch.get_total_buffer_size() which reports logical Arrow buffer sizes (O(columns), negligible cost). This is an approximation — it does not include IPC framing overhead (padding, schema messages, EOS markers).

ATTRIBUTE DESCRIPTION
input_batches

Number of input batches read by the server.

TYPE: int

output_batches

Number of output batches written by the server.

TYPE: int

input_rows

Total rows across all input batches.

TYPE: int

output_rows

Total rows across all output batches.

TYPE: int

input_bytes

Approximate logical bytes across all input batches.

TYPE: int

output_bytes

Approximate logical bytes across all output batches.

TYPE: int

record_input

record_input(batch: RecordBatch) -> None

Record an input batch's row count and buffer size.

Source code in vgi_rpc/rpc/_common.py
def record_input(self, batch: pa.RecordBatch) -> None:
    """Record an input batch's row count and buffer size."""
    self.input_batches += 1
    self.input_rows += batch.num_rows
    self.input_bytes += batch.get_total_buffer_size()

record_output

record_output(batch: RecordBatch) -> None

Record an output batch's row count and buffer size.

Source code in vgi_rpc/rpc/_common.py
def record_output(self, batch: pa.RecordBatch) -> None:
    """Record an output batch's row count and buffer size."""
    self.output_batches += 1
    self.output_rows += batch.num_rows
    self.output_bytes += batch.get_total_buffer_size()

Convenience Functions

run_server

run_server(
    protocol_or_server: type | RpcServer,
    implementation: object | None = None,
) -> None

Serve RPC requests over stdin/stdout.

This is the recommended entry point for subprocess workers. Accepts either a (protocol, implementation) pair or a pre-built RpcServer.

PARAMETER DESCRIPTION
protocol_or_server

A Protocol class (requires implementation) or an already-constructed RpcServer.

TYPE: type | RpcServer

implementation

The implementation object. Required when protocol_or_server is a Protocol class; must be None when passing an RpcServer.

TYPE: object | None DEFAULT: None

RAISES DESCRIPTION
TypeError

On invalid argument combinations.

Source code in vgi_rpc/rpc/__init__.py
def run_server(protocol_or_server: type | RpcServer, implementation: object | None = None) -> None:
    """Serve RPC requests over stdin/stdout.

    This is the recommended entry point for subprocess workers.  Accepts
    either a ``(protocol, implementation)`` pair or a pre-built ``RpcServer``.

    Args:
        protocol_or_server: A Protocol class (requires *implementation*) or
            an already-constructed ``RpcServer``.
        implementation: The implementation object.  Required when
            *protocol_or_server* is a Protocol class; must be ``None`` when
            passing an ``RpcServer``.

    Raises:
        TypeError: On invalid argument combinations.

    """
    if isinstance(protocol_or_server, RpcServer):
        if implementation is not None:
            raise TypeError("implementation must be None when passing an RpcServer")
        server = protocol_or_server
    elif isinstance(protocol_or_server, type):
        if implementation is None:
            raise TypeError("implementation is required when passing a Protocol class")
        server = RpcServer(protocol_or_server, implementation)
    else:
        raise TypeError(f"Expected a Protocol class or RpcServer, got {type(protocol_or_server).__name__}")
    serve_stdio(server)

connect

connect(
    protocol: type[P],
    cmd: list[str],
    *,
    on_log: Callable[[Message], None] | None = None,
    external_location: ExternalLocationConfig | None = None,
    stderr: StderrMode = INHERIT,
    stderr_logger: Logger | None = None,
    ipc_validation: IpcValidation = FULL
) -> Iterator[P]

Connect to a subprocess RPC server.

Context manager that spawns a subprocess, yields a typed proxy, and cleans up on exit.

PARAMETER DESCRIPTION
protocol

The Protocol class defining the RPC interface.

TYPE: type[P]

cmd

Command to spawn the subprocess worker.

TYPE: list[str]

on_log

Optional callback for log messages from the server.

TYPE: Callable[[Message], None] | None DEFAULT: None

external_location

Optional ExternalLocation configuration for resolving and producing externalized batches.

TYPE: ExternalLocationConfig | None DEFAULT: None

stderr

How to handle the child's stderr stream (see :class:StderrMode).

TYPE: StderrMode DEFAULT: INHERIT

stderr_logger

Logger for StderrMode.PIPE output; ignored for other modes. Defaults to logging.getLogger("vgi_rpc.subprocess.stderr").

TYPE: Logger | None DEFAULT: None

ipc_validation

Validation level for incoming IPC batches.

TYPE: IpcValidation DEFAULT: FULL

YIELDS DESCRIPTION
P

A typed RPC proxy supporting all methods defined on protocol.

Source code in vgi_rpc/rpc/__init__.py
@contextlib.contextmanager
def connect[P](
    protocol: type[P],
    cmd: list[str],
    *,
    on_log: Callable[[Message], None] | None = None,
    external_location: ExternalLocationConfig | None = None,
    stderr: StderrMode = StderrMode.INHERIT,
    stderr_logger: logging.Logger | None = None,
    ipc_validation: IpcValidation = IpcValidation.FULL,
) -> Iterator[P]:
    """Connect to a subprocess RPC server.

    Context manager that spawns a subprocess, yields a typed proxy, and
    cleans up on exit.

    Args:
        protocol: The Protocol class defining the RPC interface.
        cmd: Command to spawn the subprocess worker.
        on_log: Optional callback for log messages from the server.
        external_location: Optional ExternalLocation configuration for
            resolving and producing externalized batches.
        stderr: How to handle the child's stderr stream (see :class:`StderrMode`).
        stderr_logger: Logger for ``StderrMode.PIPE`` output; ignored for
            other modes.  Defaults to
            ``logging.getLogger("vgi_rpc.subprocess.stderr")``.
        ipc_validation: Validation level for incoming IPC batches.

    Yields:
        A typed RPC proxy supporting all methods defined on *protocol*.

    """
    transport = SubprocessTransport(cmd, stderr=stderr, stderr_logger=stderr_logger)
    try:
        with RpcConnection(
            protocol, transport, on_log=on_log, external_location=external_location, ipc_validation=ipc_validation
        ) as proxy:
            yield proxy
    finally:
        transport.close()

serve_pipe

serve_pipe(
    protocol: type[P],
    implementation: object,
    *,
    on_log: Callable[[Message], None] | None = None,
    external_location: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation | None = None
) -> Iterator[P]

Start an in-process pipe server and yield a typed client proxy.

Useful for tests and demos — no subprocess needed. A background thread runs RpcServer.serve() on the server side of a pipe pair.

PARAMETER DESCRIPTION
protocol

The Protocol class defining the RPC interface.

TYPE: type[P]

implementation

The implementation object.

TYPE: object

on_log

Optional callback for log messages from the server.

TYPE: Callable[[Message], None] | None DEFAULT: None

external_location

Optional ExternalLocation configuration for resolving and producing externalized batches.

TYPE: ExternalLocationConfig | None DEFAULT: None

ipc_validation

Validation level for incoming IPC batches. When None (the default), both components use IpcValidation.FULL.

TYPE: IpcValidation | None DEFAULT: None

YIELDS DESCRIPTION
P

A typed RPC proxy supporting all methods defined on protocol.

Source code in vgi_rpc/rpc/__init__.py
@contextlib.contextmanager
def serve_pipe[P](
    protocol: type[P],
    implementation: object,
    *,
    on_log: Callable[[Message], None] | None = None,
    external_location: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation | None = None,
) -> Iterator[P]:
    """Start an in-process pipe server and yield a typed client proxy.

    Useful for tests and demos — no subprocess needed.  A background thread
    runs ``RpcServer.serve()`` on the server side of a pipe pair.

    Args:
        protocol: The Protocol class defining the RPC interface.
        implementation: The implementation object.
        on_log: Optional callback for log messages from the server.
        external_location: Optional ExternalLocation configuration for
            resolving and producing externalized batches.
        ipc_validation: Validation level for incoming IPC batches.
            When ``None`` (the default), both components use
            ``IpcValidation.FULL``.

    Yields:
        A typed RPC proxy supporting all methods defined on *protocol*.

    """
    client_transport, server_transport = make_pipe_pair()
    server = RpcServer(
        protocol,
        implementation,
        external_location=external_location,
        ipc_validation=ipc_validation if ipc_validation is not None else IpcValidation.FULL,
    )
    thread = threading.Thread(target=server.serve, args=(server_transport,), daemon=True)
    thread.start()
    try:
        with RpcConnection(
            protocol,
            client_transport,
            on_log=on_log,
            external_location=external_location,
            ipc_validation=ipc_validation if ipc_validation is not None else IpcValidation.FULL,
        ) as proxy:
            yield proxy
    finally:
        client_transport.close()
        thread.join(timeout=5)
        server_transport.close()

describe_rpc

describe_rpc(
    protocol: type,
    *,
    methods: Mapping[str, RpcMethodInfo] | None = None
) -> str

Return a human-readable description of an RPC protocol's methods.

Source code in vgi_rpc/rpc/__init__.py
def describe_rpc(protocol: type, *, methods: Mapping[str, RpcMethodInfo] | None = None) -> str:
    """Return a human-readable description of an RPC protocol's methods."""
    if methods is None:
        methods = rpc_methods(protocol)
    lines: list[str] = [f"RPC Protocol: {protocol.__name__}", ""]

    for name, info in sorted(methods.items()):
        lines.append(f"  {name}({info.method_type.value})")
        lines.append(f"    params: {info.params_schema}")
        if info.method_type == MethodType.UNARY:
            lines.append(f"    result: {info.result_schema}")
        if info.doc:
            lines.append(f"    doc: {info.doc.strip()}")
        lines.append("")

    return "\n".join(lines)

Constants

REQUEST_VERSION module-attribute

REQUEST_VERSION = b'1'