Skip to content

CLI

Command-line interface for introspecting and calling methods on vgi-rpc services. Requires pip install vgi-rpc[cli].

Registered as the vgi-rpc entry point. Provides describe, call, and loggers commands.

Usage

Describe a service

# Subprocess transport
vgi-rpc describe --cmd "python worker.py"

# HTTP transport
vgi-rpc describe --url http://localhost:8000

# JSON output
vgi-rpc describe --cmd "python worker.py" --format json

Call a method

# Unary call with key=value parameters
vgi-rpc call add --cmd "python worker.py" a=1.0 b=2.0

# JSON parameter input
vgi-rpc call add --url http://localhost:8000 --json '{"a": 1.0, "b": 2.0}'

# Producer stream (table output)
vgi-rpc call countdown --cmd "python worker.py" n=5 --format table

# Exchange stream (pipe JSON lines to stdin)
echo '{"value": 1.0}' | vgi-rpc call accumulate --url http://localhost:8000

List loggers

# Human-readable table
vgi-rpc loggers

# JSON (for scripting / LLM consumption)
vgi-rpc loggers --format json

Debug logging

# Enable DEBUG on all vgi_rpc loggers
vgi-rpc --debug describe --cmd "python worker.py"

# Set a specific level
vgi-rpc --log-level INFO describe --cmd "python worker.py"

# Target specific loggers
vgi-rpc --log-level DEBUG --log-logger vgi_rpc.wire.request call add --cmd "python worker.py" a=1 b=2

# JSON log format on stderr
vgi-rpc --debug --log-format json call add --cmd "python worker.py" a=1 b=2

Output format

Results are printed as one JSON object per row (NDJSON). When a method returns a batch with multiple rows, each row is emitted as its own line rather than a single object with array values. This applies to unary results, producer streams, and exchange responses.

# A stream that emits 2 batches of 3 rows each produces 6 lines:
$ vgi-rpc call generate_rows --cmd "python worker.py" count=6 rows_per_batch=3
{"i": 0, "value": 0}
{"i": 1, "value": 10}
{"i": 2, "value": 20}
{"i": 3, "value": 30}
{"i": 4, "value": 40}
{"i": 5, "value": 50}

With --format table, rows from all batches are collected and displayed as a single aligned table:

$ vgi-rpc call generate_rows --cmd "python worker.py" count=4 --format table
i   value
--  -----
0   0
1   10
2   20
3   30

With --format auto (the default), the CLI uses pretty-printed JSON when stdout is a TTY and compact NDJSON when piped.

With --format arrow, raw Arrow IPC streaming data is written to the output. Use --output/-o to direct binary output to a file:

# Unary result as Arrow IPC
vgi-rpc call add --cmd "python worker.py" a=1.0 b=2.0 --format arrow -o result.arrow

# Producer stream as Arrow IPC
vgi-rpc call generate --cmd "python worker.py" count=100 --format arrow -o data.arrow

# Stream with header: two concatenated IPC streams (header + data)
vgi-rpc call generate_with_header --cmd "python worker.py" count=5 --format arrow -o stream.arrow

The output file contains standard Arrow IPC streaming format data. For unary calls and headerless streams, this is a single IPC stream (schema + batches + EOS). For streams with headers, the file contains two concatenated IPC streams: the header stream first, then the data stream.

Read back a unary or headerless stream result:

import pyarrow as pa
from pyarrow import ipc

with ipc.open_stream(pa.OSFile("result.arrow", "rb")) as reader:
    for batch in reader:
        print(batch.to_pandas())

Read back a stream with header (two concatenated IPC streams):

import pyarrow as pa
from pyarrow import ipc

with open("stream.arrow", "rb") as f:
    # First IPC stream: header
    header_reader = ipc.open_stream(f)
    header_batch = header_reader.read_next_batch()
    print("Header:", header_batch.to_pydict())
    # Drain remaining batches to reach EOS
    for _ in header_reader:
        pass

    # Second IPC stream: data
    data_reader = ipc.open_stream(f)
    for batch in data_reader:
        print(batch.to_pandas())

Without --output, arrow data is written to stdout. A warning is printed to stderr if stdout is a TTY.

Stream headers

Stream methods that declare a header type (e.g. Stream[MyState, MyHeader]) emit a one-time header before the data rows. The CLI reads this header and surfaces it in all output formats.

JSON (--format json): a {"__header__": {...}} line before data rows:

$ vgi-rpc call generate_with_header --cmd "python worker.py" count=3 --format json
{"__header__": {"total_count": 3, "label": "generate"}}
{"i": 0, "value": 0}
{"i": 1, "value": 10}
{"i": 2, "value": 20}

Table (--format table): a Header: section with indented key-value pairs before the data table:

$ vgi-rpc call generate_with_header --cmd "python worker.py" count=3 --format table
Header:
  total_count: 3
  label: generate

i  value
-  -----
0  0
1  10
2  20

Arrow (--format arrow): the header is written as a separate IPC stream before the data IPC stream (see reading back concatenated streams above).

Streams without headers are unaffected — no __header__ line or Header: section appears.

Options

Option Short Description
--url -u HTTP base URL
--cmd -c Subprocess command
--prefix -p URL path prefix (default /vgi)
--format -f Output format: auto, json, table, or arrow
--output -o Output file path (default: stdout)
--verbose -v Show server log messages on stderr
--debug Enable DEBUG on all vgi_rpc loggers to stderr
--log-level Python logging level: DEBUG, INFO, WARNING, ERROR
--log-logger Target specific logger(s), repeatable
--log-format Stderr log format: text (default) or json
--json -j Pass parameters as a JSON string (for call)

--debug is shorthand for --log-level DEBUG. When both are given, --debug wins. --verbose (server-to-client log messages) remains orthogonal to --debug/--log-level (Python logging).

Module Reference

cli

Command-line interface for vgi-rpc services.

Provides describe, call, and loggers commands for introspecting and invoking methods on any vgi-rpc service that has enable_describe=True.

Usage::

vgi-rpc describe --cmd "my-worker"
vgi-rpc call add --cmd "my-worker" a=1.0 b=2.0
vgi-rpc call generate --url http://localhost:8000 count=3
vgi-rpc loggers

OutputFormat

Bases: StrEnum

Output format for CLI commands.

LogLevel

Bases: StrEnum

Python logging level for --log-level.

LogFormat

Bases: StrEnum

Stderr log format for --log-format.

loggers

loggers(ctx: Context) -> None

List all known vgi-rpc logger names with descriptions.

Source code in vgi_rpc/cli.py
@app.command()
def loggers(
    ctx: typer.Context,
) -> None:
    """List all known vgi-rpc logger names with descriptions."""
    config: _CliConfig = ctx.obj
    is_tty = sys.stdout.isatty()
    fmt = config.format

    if fmt == OutputFormat.json or (fmt == OutputFormat.auto and not is_tty):
        data = [{"name": name, "description": desc, "scenario": scenario} for name, desc, scenario in _KNOWN_LOGGERS]
        _print_json(data)
    else:
        rows: list[dict[str, object]] = [
            {"name": name, "description": desc, "scenario": scenario} for name, desc, scenario in _KNOWN_LOGGERS
        ]
        typer.echo(_format_table(rows))

describe

describe(ctx: Context) -> None

Introspect a vgi-rpc service and show its methods.

Source code in vgi_rpc/cli.py
@app.command()
def describe(ctx: typer.Context) -> None:
    """Introspect a vgi-rpc service and show its methods."""
    config: _CliConfig = ctx.obj
    transport: RpcTransport | None = None
    try:
        desc, transport = _get_service_description(config)
    except RpcError as e:
        _emit_rpc_error(e)
        raise typer.Exit(1) from None
    except Exception as e:
        typer.echo(f"Error: {e}", err=True)
        raise typer.Exit(1) from None
    finally:
        if transport is not None:
            transport.close()

    is_tty = sys.stdout.isatty()
    fmt = config.format

    if fmt == OutputFormat.table or (fmt == OutputFormat.auto and is_tty):
        # Table output — use ServiceDescription.__str__()
        output = str(desc)
        # If HTTP, append capabilities
        if config.url:
            try:
                from vgi_rpc.http import http_capabilities

                caps = http_capabilities(config.url, prefix=config.prefix)
                output += "\nCapabilities:\n"
                output += f"  max_request_bytes: {caps.max_request_bytes}\n"
                output += f"  upload_url_support: {caps.upload_url_support}\n"
                output += f"  max_upload_bytes: {caps.max_upload_bytes}\n"
            except Exception:
                pass
        typer.echo(output)
    else:
        # JSON output
        data: dict[str, object] = {
            "protocol_name": desc.protocol_name,
            "server_id": desc.server_id,
            "request_version": desc.request_version,
            "describe_version": desc.describe_version,
            "methods": {
                name: {
                    "method_type": md.method_type.value,
                    "doc": md.doc,
                    "has_return": md.has_return,
                    "param_types": md.param_types,
                    "param_defaults": md.param_defaults,
                }
                for name, md in sorted(desc.methods.items())
            },
        }
        if config.url:
            try:
                from vgi_rpc.http import http_capabilities

                caps = http_capabilities(config.url, prefix=config.prefix)
                data["capabilities"] = {
                    "max_request_bytes": caps.max_request_bytes,
                    "upload_url_support": caps.upload_url_support,
                    "max_upload_bytes": caps.max_upload_bytes,
                }
            except Exception:
                pass
        is_pretty = fmt == OutputFormat.auto and is_tty
        _print_json(data, pretty=is_pretty)

call

call(
    ctx: Context,
    method: Annotated[
        str, Argument(help="Method name to call")
    ],
    args: Annotated[
        list[str] | None,
        Argument(help="key=value parameters"),
    ] = None,
    json_input: Annotated[
        str | None,
        Option("--json", "-j", help="JSON params"),
    ] = None,
    input_file: Annotated[
        str | None,
        Option(
            "--input",
            "-i",
            help="Arrow IPC file for exchange input",
        ),
    ] = None,
    no_stdin: Annotated[
        bool,
        Option(
            "--no-stdin",
            help="Force producer mode (ignore stdin)",
            hidden=True,
        ),
    ] = False,
) -> None

Call a method on a vgi-rpc service.

Source code in vgi_rpc/cli.py
@app.command()
def call(
    ctx: typer.Context,
    method: Annotated[str, typer.Argument(help="Method name to call")],
    args: Annotated[list[str] | None, typer.Argument(help="key=value parameters")] = None,
    json_input: Annotated[str | None, typer.Option("--json", "-j", help="JSON params")] = None,
    input_file: Annotated[str | None, typer.Option("--input", "-i", help="Arrow IPC file for exchange input")] = None,
    no_stdin: Annotated[
        bool, typer.Option("--no-stdin", help="Force producer mode (ignore stdin)", hidden=True)
    ] = False,
) -> None:
    """Call a method on a vgi-rpc service."""
    config: _CliConfig = ctx.obj
    if input_file:
        config.input_file = input_file
    if no_stdin:
        config.no_stdin = True

    transport: RpcTransport | None = None
    try:
        desc, transport = _get_service_description(config)
    except RpcError as e:
        _emit_rpc_error(e)
        raise typer.Exit(1) from None
    except Exception as e:
        typer.echo(f"Error: {e}", err=True)
        raise typer.Exit(1) from None

    if method not in desc.methods:
        if transport is not None:
            transport.close()
        available = ", ".join(sorted(desc.methods.keys()))
        typer.echo(
            f"Error: Unknown method '{method}'. Available: {available}",
            err=True,
        )
        raise typer.Exit(1) from None

    md = desc.methods[method]

    # Parse params
    if json_input and args:
        if transport is not None:
            transport.close()
        raise typer.BadParameter("--json and key=value args are mutually exclusive")

    kwargs: dict[str, object]
    if json_input:
        kwargs = json.loads(json_input)
    elif args:
        kwargs = _parse_key_value_args(args, md.params_schema)
    else:
        kwargs = {}

    # Merge defaults
    merged: dict[str, object] = {**md.param_defaults, **kwargs}

    on_log = _get_on_log(config)

    try:
        if config.cmd or config.unix:
            # Reuse the transport from introspect for the call
            assert transport is not None
            if md.method_type == MethodType.UNARY:
                _call_unary_pipe(transport, md, merged, on_log, config)
            else:
                _call_stream_pipe(transport, md, merged, on_log, config)
        elif config.url:
            if transport is not None:
                transport.close()
                transport = None
            if md.method_type == MethodType.UNARY:
                _call_unary_http(config.url, config.prefix, md, merged, on_log, config)
            else:
                _call_stream_http(
                    config.url,
                    config.prefix,
                    md,
                    merged,
                    on_log,
                    config,
                )
    except RpcError as e:
        _emit_rpc_error(e)
        raise typer.Exit(1) from None
    except typer.Exit:
        raise
    except Exception as e:
        typer.echo(f"Error: {e}", err=True)
        raise typer.Exit(1) from None
    finally:
        if transport is not None:
            transport.close()