Core RPC¶
The core module provides the server, connection, transport interface, error types, and convenience functions for defining and running RPC services.
Typical Usage¶
Most users only need serve_pipe (testing) or connect (subprocess):
from vgi_rpc import serve_pipe, connect
# In-process (tests)
with serve_pipe(MyService, MyServiceImpl()) as proxy:
proxy.my_method(arg=42)
# Subprocess
with connect(MyService, ["python", "worker.py"]) as proxy:
proxy.my_method(arg=42)
For more control, use RpcServer and RpcConnection directly.
API Reference¶
RpcServer¶
RpcServer
¶
RpcServer(
protocol: type,
implementation: object,
*,
external_location: ExternalLocationConfig | None = None,
server_id: str | None = None,
enable_describe: bool = False,
ipc_validation: IpcValidation = FULL
)
Dispatches RPC requests to an implementation over IO-stream transports.
Initialize with a protocol type and its implementation.
| PARAMETER | DESCRIPTION |
|---|---|
protocol
|
The Protocol class defining the RPC interface.
TYPE:
|
implementation
|
Object implementing all methods from protocol.
TYPE:
|
external_location
|
Optional ExternalLocation configuration.
TYPE:
|
server_id
|
Optional server identifier; auto-generated if
TYPE:
|
enable_describe
|
When
TYPE:
|
ipc_validation
|
Validation level for incoming IPC batches.
Defaults to
TYPE:
|
Source code in vgi_rpc/rpc/_server.py
methods
property
¶
methods: Mapping[str, RpcMethodInfo]
Return method metadata for this server's protocol.
external_config
property
¶
external_config: ExternalLocationConfig | None
The ExternalLocation configuration, if any.
ctx_methods
property
¶
Method names whose implementations accept a ctx parameter.
serve
¶
serve(transport: RpcTransport) -> None
Serve RPC requests in a loop until the transport is closed.
Source code in vgi_rpc/rpc/_server.py
serve_one
¶
serve_one(transport: RpcTransport) -> None
Handle a single RPC call (any method type) over the given transport.
Protocol-level errors (VersionError, RpcError from missing
metadata) are caught, written back as error responses, and the
method returns normally so the serve loop can continue.
| RAISES | DESCRIPTION |
|---|---|
ArrowInvalid
|
If the incoming data is not valid Arrow IPC.
An error response is written to transport before raising so
the client can read a structured |
Source code in vgi_rpc/rpc/_server.py
RpcConnection¶
RpcConnection
¶
RpcConnection(
protocol: type[P],
transport: RpcTransport,
on_log: Callable[[Message], None] | None = None,
*,
external_location: ExternalLocationConfig | None = None,
ipc_validation: IpcValidation = FULL
)
Context manager that provides a typed RPC proxy over a transport.
The type parameter P is the Protocol class, enabling IDE
autocompletion for all methods defined on the protocol::
with RpcConnection(MyProtocol, transport) as svc:
result = svc.add(a=1, b=2) # IDE sees MyProtocol methods
Initialize with a protocol type and transport.
Source code in vgi_rpc/rpc/_client.py
__enter__
¶
Enter the context and return a typed proxy.
Source code in vgi_rpc/rpc/_client.py
__exit__
¶
__exit__(
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None
Close the transport.
Source code in vgi_rpc/rpc/_client.py
RpcTransport¶
RpcTransport
¶
Bases: Protocol
Bidirectional byte stream transport.
RpcMethodInfo¶
RpcMethodInfo
dataclass
¶
RpcMethodInfo(
name: str,
params_schema: Schema,
result_schema: Schema,
result_type: object,
method_type: MethodType,
has_return: bool,
doc: str | None,
param_defaults: dict[str, object] = dict(),
param_types: dict[str, object] = dict(),
header_type: (
type[ArrowSerializableDataclass] | None
) = None,
)
Metadata for a single RPC method, derived from Protocol type hints.
Produced by :func:rpc_methods when introspecting a Protocol class.
Each instance describes one method's wire-protocol details: its Arrow
schemas, parameter types and defaults, and the original docstring.
| ATTRIBUTE | DESCRIPTION |
|---|---|
name |
Method name as it appears on the Protocol.
TYPE:
|
params_schema |
Arrow schema for the serialized request parameters.
TYPE:
|
result_schema |
Arrow schema for the serialized response (unary only; empty schema for stream methods).
TYPE:
|
result_type |
The raw Python return-type annotation (e.g.
TYPE:
|
method_type |
Whether this is a
TYPE:
|
has_return |
TYPE:
|
doc |
The method's docstring from the Protocol class, or
TYPE:
|
param_defaults |
Mapping of parameter name to default value for parameters that have defaults in the Protocol signature.
TYPE:
|
param_types |
Mapping of parameter name to its Python type annotation
(excludes
TYPE:
|
header_type |
For stream methods with a header, the concrete
TYPE:
|
MethodType¶
MethodType
¶
Bases: Enum
Classification of RPC method patterns.
Errors¶
RpcError
¶
Bases: Exception
Raised on the client side when the server reports an error.
Initialize with error details from the remote side.
Source code in vgi_rpc/rpc/_common.py
VersionError
¶
Bases: Exception
Raised when a request has a missing or incompatible protocol version.
See also IPCError in the Serialization module.
CallStatistics¶
CallStatistics
dataclass
¶
CallStatistics(
input_batches: int = 0,
output_batches: int = 0,
input_rows: int = 0,
output_rows: int = 0,
input_bytes: int = 0,
output_bytes: int = 0,
)
Mutable accumulator of per-call I/O counters for usage accounting.
Created at dispatch start and populated as batches flow through the server. Surfaced through the access log and OTel dispatch hook.
Byte measurement: uses pa.RecordBatch.get_total_buffer_size()
which reports logical Arrow buffer sizes (O(columns), negligible cost).
This is an approximation — it does not include IPC framing
overhead (padding, schema messages, EOS markers).
| ATTRIBUTE | DESCRIPTION |
|---|---|
input_batches |
Number of input batches read by the server.
TYPE:
|
output_batches |
Number of output batches written by the server.
TYPE:
|
input_rows |
Total rows across all input batches.
TYPE:
|
output_rows |
Total rows across all output batches.
TYPE:
|
input_bytes |
Approximate logical bytes across all input batches.
TYPE:
|
output_bytes |
Approximate logical bytes across all output batches.
TYPE:
|
Convenience Functions¶
run_server
¶
run_server(
protocol_or_server: type | RpcServer,
implementation: object | None = None,
) -> None
Serve RPC requests over stdin/stdout.
This is the recommended entry point for subprocess workers. Accepts
either a (protocol, implementation) pair or a pre-built RpcServer.
| PARAMETER | DESCRIPTION |
|---|---|
protocol_or_server
|
A Protocol class (requires implementation) or
an already-constructed
TYPE:
|
implementation
|
The implementation object. Required when
protocol_or_server is a Protocol class; must be
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
TypeError
|
On invalid argument combinations. |
Source code in vgi_rpc/rpc/__init__.py
connect
¶
connect(
protocol: type[P],
cmd: list[str],
*,
on_log: Callable[[Message], None] | None = None,
external_location: ExternalLocationConfig | None = None,
stderr: StderrMode = INHERIT,
stderr_logger: Logger | None = None,
ipc_validation: IpcValidation = FULL
) -> Iterator[P]
Connect to a subprocess RPC server.
Context manager that spawns a subprocess, yields a typed proxy, and cleans up on exit.
| PARAMETER | DESCRIPTION |
|---|---|
protocol
|
The Protocol class defining the RPC interface.
TYPE:
|
cmd
|
Command to spawn the subprocess worker.
TYPE:
|
on_log
|
Optional callback for log messages from the server.
TYPE:
|
external_location
|
Optional ExternalLocation configuration for resolving and producing externalized batches.
TYPE:
|
stderr
|
How to handle the child's stderr stream (see :class:
TYPE:
|
stderr_logger
|
Logger for
TYPE:
|
ipc_validation
|
Validation level for incoming IPC batches.
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
P
|
A typed RPC proxy supporting all methods defined on protocol. |
Source code in vgi_rpc/rpc/__init__.py
serve_pipe
¶
serve_pipe(
protocol: type[P],
implementation: object,
*,
on_log: Callable[[Message], None] | None = None,
external_location: ExternalLocationConfig | None = None,
ipc_validation: IpcValidation | None = None
) -> Iterator[P]
Start an in-process pipe server and yield a typed client proxy.
Useful for tests and demos — no subprocess needed. A background thread
runs RpcServer.serve() on the server side of a pipe pair.
| PARAMETER | DESCRIPTION |
|---|---|
protocol
|
The Protocol class defining the RPC interface.
TYPE:
|
implementation
|
The implementation object.
TYPE:
|
on_log
|
Optional callback for log messages from the server.
TYPE:
|
external_location
|
Optional ExternalLocation configuration for resolving and producing externalized batches.
TYPE:
|
ipc_validation
|
Validation level for incoming IPC batches.
When
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
P
|
A typed RPC proxy supporting all methods defined on protocol. |
Source code in vgi_rpc/rpc/__init__.py
describe_rpc
¶
describe_rpc(
protocol: type,
*,
methods: Mapping[str, RpcMethodInfo] | None = None
) -> str
Return a human-readable description of an RPC protocol's methods.