Skip to content

Logging

vgi-rpc has two logging systems: client-directed logs (transmitted over the wire to the caller) and server-side logs (stdlib logging for observability).

Client-Directed Logging

Server methods emit log messages that travel to the client's on_log callback:

from vgi_rpc import CallContext, Level


class MyServiceImpl:
    def process(self, data: str, ctx: CallContext) -> str:
        ctx.client_log(Level.INFO, "Processing started", input_size=str(len(data)))
        result = data.upper()
        ctx.client_log(Level.DEBUG, "Processing complete")
        return result

The client receives them via the on_log callback:

from vgi_rpc import Message, serve_pipe


def handle_log(msg: Message) -> None:
    print(f"[{msg.level.value}] {msg.message}")


with serve_pipe(MyService, MyServiceImpl(), on_log=handle_log) as proxy:
    proxy.process(data="hello")
    # [INFO] Processing started
    # [DEBUG] Processing complete

Server-Side Logging

Use ctx.logger for server-side observability. Every log record automatically includes server_id, method, principal, and remote_addr:

class MyServiceImpl:
    def process(self, data: str, ctx: CallContext) -> str:
        ctx.logger.info("Processing request", extra={"input_size": len(data)})
        return data.upper()

Access log fields

Every completed RPC call emits one structured INFO record on the vgi_rpc.access logger with per-call I/O statistics:

Field Type Description
server_id str Server instance identifier
protocol str Protocol class name
method str Method name
method_type str "unary" or "stream"
principal str Caller identity (empty if anonymous)
auth_domain str Authentication scheme (empty if anonymous)
remote_addr str Client address (HTTP only)
duration_ms float Call duration in milliseconds
status str "ok" or "error"
error_type str Exception class name (empty on success)
http_status int HTTP response status code (HTTP transport only)
input_batches int Number of input batches read by the server
output_batches int Number of output batches written by the server
input_rows int Total input rows
output_rows int Total output rows
input_bytes int Approximate logical input bytes
output_bytes int Approximate logical output bytes

Note: Byte counts use pa.RecordBatch.get_total_buffer_size() — logical Arrow buffer sizes without IPC framing overhead.

Logger hierarchy

Logger name Purpose
vgi_rpc Root — add handlers here
vgi_rpc.access Auto access log (one per call)
vgi_rpc.service.<Protocol> Developer ctx.logger per service
vgi_rpc.rpc Framework lifecycle
vgi_rpc.http HTTP transport lifecycle
vgi_rpc.external External storage operations
vgi_rpc.subprocess.stderr Child process stderr (StderrMode.PIPE)
vgi_rpc.wire.request Request serialization / deserialization
vgi_rpc.wire.response Response serialization / deserialization
vgi_rpc.wire.batch Batch classification (log / error / data dispatch)
vgi_rpc.wire.stream Stream session lifecycle
vgi_rpc.wire.transport Transport lifecycle (pipe, subprocess)
vgi_rpc.wire.http HTTP client requests / responses

Wire protocol debugging

From the CLI, use --debug for quick diagnostics or --log-logger for targeted debugging:

# All vgi_rpc loggers at DEBUG
vgi-rpc --debug describe --cmd "python worker.py"

# Target specific wire loggers
vgi-rpc --log-level DEBUG --log-logger vgi_rpc.wire.request call add --cmd "python worker.py" a=1 b=2

# List all available loggers
vgi-rpc loggers

In Python, enable the vgi_rpc.wire hierarchy at DEBUG to see exactly what flows over the wire — request/response batches, metadata, batch classification, stream lifecycle, and transport events:

import logging

logging.getLogger("vgi_rpc.wire").setLevel(logging.DEBUG)
logging.getLogger("vgi_rpc.wire").addHandler(logging.StreamHandler())

Sample output for a unary add(a=1.0, b=2.0) -> 3.0 call:

DEBUG vgi_rpc.wire.request  Send request: method=add, type=unary, defaults_applied=[]
DEBUG vgi_rpc.wire.request  Write request: method=add, schema=(a: double, b: double), kwargs={a=1.0, b=2.0}, metadata={...}
DEBUG vgi_rpc.wire.request  Read request batch: RecordBatch(rows=1, cols=2, schema=(a: double, b: double), bytes=16), metadata={...}
DEBUG vgi_rpc.wire.response Write result batch: RecordBatch(rows=1, cols=1, schema=(result: double), bytes=8), route=inline
DEBUG vgi_rpc.wire.batch    Classify batch: rows=1, no metadata -> data
DEBUG vgi_rpc.wire.response Read unary response: method=add, result_type=float

All formatting is gated behind isEnabledFor guards — zero overhead when disabled.

What to enable when

Rather than enabling everything, pick the loggers that match your scenario:

Scenario Loggers to enable
Method calls return wrong results vgi_rpc.wire.request + vgi_rpc.wire.response — see schemas, kwargs, and result types
Client can't connect or hangs vgi_rpc.wire.transport — see pipe/subprocess lifecycle and fd numbers
Streaming batches lost or out of order vgi_rpc.wire.stream — see each tick, exchange, and close event
Log/error batches not arriving at client vgi_rpc.wire.batch — see how each batch is classified (data vs. log vs. error)
HTTP transport issues vgi_rpc.wire.http — see HTTP request URLs, response status codes, and body sizes
Need everything vgi_rpc.wire — all six loggers at once

Selectively enable only what you need:

# Debug only request/response serialization
logging.getLogger("vgi_rpc.wire.request").setLevel(logging.DEBUG)
logging.getLogger("vgi_rpc.wire.request").addHandler(logging.StreamHandler())

Debugging cross-language implementations

When building a vgi-rpc client or server in another language, wire logging on the Python side lets you see the exact Arrow IPC bytes and metadata your implementation needs to produce or consume. Run the Python side with full wire debugging to use it as a reference:

import logging

# Full wire + access logging for interop debugging
logging.basicConfig(level=logging.DEBUG, format="%(name)-30s %(message)s")
logging.getLogger("vgi_rpc.wire").setLevel(logging.DEBUG)
logging.getLogger("vgi_rpc.access").setLevel(logging.INFO)

What to verify for a non-Python client (sending requests to a Python server):

  1. Schema metadata — The request IPC stream schema must include vgi_rpc.method in schema-level metadata. wire.request logs show the expected schema and metadata for every inbound request.
  2. Batch metadata — Every request batch must include vgi_rpc.request_version set to "1" in batch-level custom metadata. Missing or wrong version triggers a VersionError.
  3. Single-row batches — Requests are always exactly one row. The column order must match the schema.
  4. IPC stream framing — Each request/response is a complete Arrow IPC stream (schema message + record batch messages + EOS continuation bytes 0xFFFFFFFF 0x00000000). Multiple streams are written sequentially on the same byte stream.

What to verify for a non-Python server (receiving requests from a Python client):

  1. Response classificationwire.batch logs show how the Python client classifies each batch it reads. A data batch has num_rows > 0 or no vgi_rpc.log_level in custom metadata. A log/error batch has num_rows == 0 with vgi_rpc.log_level and vgi_rpc.log_message set.
  2. Error propagation — Errors are zero-row batches with vgi_rpc.log_level set to EXCEPTION, vgi_rpc.log_message as the error text, and optionally vgi_rpc.log_extra with JSON {"error_type": "...", "traceback": "..."}.
  3. Stream protocolwire.stream logs show the expected lockstep sequence: init → (tick/input → output)* → close. Your server must write exactly one output batch per input batch.
  4. Server identity — Include vgi_rpc.server_id in batch metadata on log and error batches for tracing. The Python client doesn't require it, but it helps with debugging.

Common cross-language pitfalls:

  • Arrow metadata keys and values must be UTF-8 encoded bytes — some Arrow libraries default to other encodings
  • The IPC stream EOS marker is 8 bytes (0xFFFFFFFF continuation token + 0x00000000 body size), not just closing the connection
  • Zero-row batches are valid Arrow IPC — ensure your library can write and read them
  • For HTTP transport, the content type must be application/vnd.apache.arrow.stream and the entire request/response body is a single IPC stream

Production JSON logging

import logging

from vgi_rpc.logging_utils import VgiJsonFormatter

handler = logging.StreamHandler()
handler.setFormatter(VgiJsonFormatter())
logging.getLogger("vgi_rpc").addHandler(handler)
logging.getLogger("vgi_rpc").setLevel(logging.INFO)

API Reference

Level

Level

Bases: Enum

Severity levels for log messages emitted during function processing.

Levels are ordered from most to least severe. Use the appropriate level to indicate the nature of the message:

ATTRIBUTE DESCRIPTION
EXCEPTION

Unrecoverable error that terminated processing.

ERROR

Significant error that may affect results but didn't terminate.

WARN

Potential issue that should be reviewed but isn't necessarily wrong.

INFO

General informational message about processing status.

DEBUG

Detailed information useful for debugging.

TRACE

Fine-grained tracing information for detailed diagnostics.

Message

Message

Message(level: Level, message: str, **kwargs: object)

Log message emitted during RPC method processing.

Messages are emitted via the emit_client_log callback or OutputCollector.client_log() and transmitted to the client as zero-row batches with log metadata.

ATTRIBUTE DESCRIPTION
level

Severity level indicating the nature of the message.

message

Human-readable log message text.

extra

Additional arbitrary key-value pairs to include in the JSON output.

TYPE: dict[str, object] | None

Create a log message with level, message text, and optional extras.

Source code in vgi_rpc/log.py
def __init__(self, level: Level, message: str, **kwargs: object) -> None:
    """Create a log message with level, message text, and optional extras."""
    self.level = level
    self.message = message
    self.extra: dict[str, object] | None = kwargs if kwargs else None

MAX_TRACEBACK_CHARS class-attribute

MAX_TRACEBACK_CHARS: int = 16000

Maximum character length for formatted tracebacks before truncation.

MAX_TRACEBACK_FRAMES class-attribute

MAX_TRACEBACK_FRAMES: int = 5

Maximum number of stack frames to include in structured frame data.

__eq__

__eq__(other: object) -> bool

Compare log messages by level, message, and extra fields.

Source code in vgi_rpc/log.py
def __eq__(self, other: object) -> bool:
    """Compare log messages by level, message, and extra fields."""
    if not isinstance(other, Message):
        return NotImplemented
    return self.level == other.level and self.message == other.message and self.extra == other.extra

__repr__

__repr__() -> str

Return a string representation suitable for debugging.

Source code in vgi_rpc/log.py
def __repr__(self) -> str:
    """Return a string representation suitable for debugging."""
    if self.extra:
        return f"Message({self.level!r}, {self.message!r}, **{self.extra!r})"
    return f"Message({self.level!r}, {self.message!r})"

exception classmethod

exception(message: str, **kwargs: object) -> Message

Create an EXCEPTION level log message.

Additional kwargs are stored in the extra field.

Source code in vgi_rpc/log.py
@classmethod
def exception(cls, message: str, **kwargs: object) -> Message:
    """Create an EXCEPTION level log message.

    Additional kwargs are stored in the extra field.
    """
    return cls(Level.EXCEPTION, message, **kwargs)

error classmethod

error(message: str, **kwargs: object) -> Message

Create an ERROR level log message.

Additional kwargs are stored in the extra field.

Source code in vgi_rpc/log.py
@classmethod
def error(cls, message: str, **kwargs: object) -> Message:
    """Create an ERROR level log message.

    Additional kwargs are stored in the extra field.
    """
    return cls(Level.ERROR, message, **kwargs)

warn classmethod

warn(message: str, **kwargs: object) -> Message

Create a WARN level log message.

Additional kwargs are stored in the extra field.

Source code in vgi_rpc/log.py
@classmethod
def warn(cls, message: str, **kwargs: object) -> Message:
    """Create a WARN level log message.

    Additional kwargs are stored in the extra field.
    """
    return cls(Level.WARN, message, **kwargs)

info classmethod

info(message: str, **kwargs: object) -> Message

Create an INFO level log message.

Additional kwargs are stored in the extra field.

Source code in vgi_rpc/log.py
@classmethod
def info(cls, message: str, **kwargs: object) -> Message:
    """Create an INFO level log message.

    Additional kwargs are stored in the extra field.
    """
    return cls(Level.INFO, message, **kwargs)

debug classmethod

debug(message: str, **kwargs: object) -> Message

Create a DEBUG level log message.

Additional kwargs are stored in the extra field.

Source code in vgi_rpc/log.py
@classmethod
def debug(cls, message: str, **kwargs: object) -> Message:
    """Create a DEBUG level log message.

    Additional kwargs are stored in the extra field.
    """
    return cls(Level.DEBUG, message, **kwargs)

trace classmethod

trace(message: str, **kwargs: object) -> Message

Create a TRACE level log message.

Additional kwargs are stored in the extra field.

Source code in vgi_rpc/log.py
@classmethod
def trace(cls, message: str, **kwargs: object) -> Message:
    """Create a TRACE level log message.

    Additional kwargs are stored in the extra field.
    """
    return cls(Level.TRACE, message, **kwargs)

add_to_metadata

add_to_metadata(
    metadata: dict[str, str] | None = None,
) -> dict[str, str]

Add log message fields to an existing metadata dictionary.

Creates a new dictionary with log-related keys added. Does not mutate the input dictionary.

PARAMETER DESCRIPTION
metadata

Existing metadata dict to augment, or None to create new.

TYPE: dict[str, str] | None DEFAULT: None

RETURNS DESCRIPTION
dict[str, str]

New dict containing original entries plus:

dict[str, str]
  • vgi_rpc.log_level: The Level value (e.g., "INFO", "EXCEPTION")
dict[str, str]
  • vgi_rpc.log_message: The human-readable message text
dict[str, str]
  • vgi_rpc.log_extra: JSON string with extra kwargs (omitted when empty)
Source code in vgi_rpc/log.py
def add_to_metadata(
    self,
    metadata: dict[str, str] | None = None,
) -> dict[str, str]:
    """Add log message fields to an existing metadata dictionary.

    Creates a new dictionary with log-related keys added. Does not mutate
    the input dictionary.

    Args:
        metadata: Existing metadata dict to augment, or None to create new.

    Returns:
        New dict containing original entries plus:
        - vgi_rpc.log_level: The Level value (e.g., "INFO", "EXCEPTION")
        - vgi_rpc.log_message: The human-readable message text
        - vgi_rpc.log_extra: JSON string with extra kwargs (omitted when empty)

    """
    result = dict(metadata) if metadata else {}
    level_key: str = LOG_LEVEL_KEY.decode()
    message_key: str = LOG_MESSAGE_KEY.decode()
    result[level_key] = self.level.value
    result[message_key] = self.message
    if self.extra:
        extra_key: str = LOG_EXTRA_KEY.decode()
        result[extra_key] = json.dumps(self.extra)
    return result

from_exception classmethod

from_exception(exc: BaseException) -> Message

Produce a Message from an exception.

Source code in vgi_rpc/log.py
@classmethod
def from_exception(cls, exc: BaseException) -> Message:
    """Produce a Message from an exception."""
    tb_exc = traceback.TracebackException.from_exception(
        exc,
        capture_locals=False,
    )

    formatted_tb = "".join(tb_exc.format())
    if len(formatted_tb) > cls.MAX_TRACEBACK_CHARS:
        formatted_tb = formatted_tb[: cls.MAX_TRACEBACK_CHARS] + "\n… <traceback truncated>"

    # Short, semantic summary (LLM anchor)
    summary = f"{type(exc).__name__}: {exc}"

    extra: dict[str, object] = {
        "exception_type": type(exc).__name__,
        "exception_message": str(exc),
        "traceback": formatted_tb,
    }

    if tb_exc.__cause__:
        cause_str = "".join(tb_exc.__cause__.format())
        if len(cause_str) > cls.MAX_TRACEBACK_CHARS:
            cause_str = cause_str[: cls.MAX_TRACEBACK_CHARS] + "\n… <traceback truncated>"
        extra["cause"] = cause_str

    if tb_exc.__context__ and not tb_exc.__suppress_context__:
        context_str = "".join(tb_exc.__context__.format())
        if len(context_str) > cls.MAX_TRACEBACK_CHARS:
            context_str = context_str[: cls.MAX_TRACEBACK_CHARS] + "\n… <traceback truncated>"
        extra["context"] = context_str

    extra["frames"] = [
        {
            "file": f.filename,
            "line": f.lineno,
            "function": f.name,
            "code": f.line,
        }
        for f in tb_exc.stack[-cls.MAX_TRACEBACK_FRAMES :]
    ]

    return cls(
        Level.EXCEPTION,
        summary,
        **extra,
    )

VgiJsonFormatter

VgiJsonFormatter

Bases: Formatter

JSON formatter that emits all structured extra fields.

Standard fields (timestamp, level, logger, message) are always present and cannot be overwritten by extra fields with the same name. Every other non-default attribute on the LogRecord is emitted as an additional key — this is future-proof because new fields added via extra appear automatically without updating the formatter.

Exception information is included under the "exception" key when present.

Non-serializable values are coerced to strings via default=str.

format

format(record: LogRecord) -> str

Format a log record as a single-line JSON string.

Source code in vgi_rpc/logging_utils.py
def format(self, record: logging.LogRecord) -> str:
    """Format a log record as a single-line JSON string."""
    record.message = record.getMessage()
    obj: dict[str, object] = {
        "timestamp": self.formatTime(record),
        "level": record.levelname,
        "logger": record.name,
        "message": record.message,
        # Emit extra fields, excluding default record attrs and reserved output keys
        **{k: v for k, v in record.__dict__.items() if k not in _DEFAULT_RECORD_ATTRS and k not in _RESERVED_KEYS},
    }
    if record.exc_info and record.exc_info[1]:
        obj["exception"] = self.formatException(record.exc_info)
    if record.stack_info:
        obj["stack_info"] = self.formatStack(record.stack_info)
    return json.dumps(obj, default=str)