Skip to content

HTTP Transport

HTTP transport using Falcon (server) and httpx (client). Requires pip install vgi-rpc[http].

Quick Start

Server

Create a WSGI app and serve it with any WSGI server (waitress, gunicorn, etc.):

from vgi_rpc import RpcServer, make_wsgi_app

server = RpcServer(MyService, MyServiceImpl())
app = make_wsgi_app(server)
# serve `app` with waitress, gunicorn, etc.

Client

from vgi_rpc import http_connect

with http_connect(MyService, "http://localhost:8080") as proxy:
    result = proxy.echo(message="hello")  # proxy is typed as MyService

Testing (no real server)

make_sync_client wraps a Falcon TestClient so you can test the full HTTP stack in-process:

from vgi_rpc import RpcServer
from vgi_rpc.http import http_connect, make_sync_client

server = RpcServer(MyService, MyServiceImpl())
client = make_sync_client(server)

with http_connect(MyService, client=client) as proxy:
    assert proxy.echo(message="hello") == "hello"

Landing Page

By default, GET {prefix} (e.g. GET /vgi) returns an HTML landing page showing the vgi-rpc logo, the protocol name, server ID, and links. When the server has enable_describe=True, the landing page includes a link to the describe page.

To disable the landing page:

app = make_wsgi_app(server, enable_landing_page=False)

POST {prefix} returns 405 Method Not Allowed — it does not interfere with RPC routing.

Describe Page

When the server has enable_describe=True, GET {prefix}/describe (e.g. GET /vgi/describe) returns an HTML page listing all methods, their parameters (name, type, default), return types, docstrings, and method type badges (UNARY / STREAM). The __describe__ introspection method is filtered out.

Both enable_describe=True on the RpcServer and enable_describe_page=True (the default) on make_wsgi_app() are required.

To disable only the HTML page while keeping the __describe__ RPC method available:

app = make_wsgi_app(server, enable_describe_page=False)

Reserved path

When the describe page is active, the path {prefix}/describe is reserved for the HTML page. If your service has an RPC method literally named describe, you must set enable_describe_page=False.

Not-Found Page

By default, make_wsgi_app() installs a friendly HTML 404 page for any request that does not match an RPC route. If someone navigates to the server root or a random path in a browser, they see the vgi-rpc logo, the service protocol name, and a link to vgi-rpc.query.farm instead of a generic error.

This does not affect RPC clients — a request to a valid RPC route for a non-existent method still returns a machine-readable Arrow IPC error with HTTP 404.

To disable the page:

app = make_wsgi_app(server, enable_not_found_page=False)

API Reference

Server

make_wsgi_app

make_wsgi_app(
    server: RpcServer,
    *,
    prefix: str = "",
    signing_key: bytes | None = None,
    max_stream_response_bytes: int | None = None,
    max_stream_response_time: float | None = None,
    max_request_bytes: int | None = None,
    authenticate: (
        Callable[[Request], AuthContext] | None
    ) = None,
    cors_origins: str | Iterable[str] | None = None,
    cors_max_age: int | None = 7200,
    upload_url_provider: UploadUrlProvider | None = None,
    max_upload_bytes: int | None = None,
    otel_config: object | None = None,
    sentry_config: object | None = None,
    token_ttl: int = 3600,
    compression_level: int | None = 3,
    enable_not_found_page: bool = True,
    enable_landing_page: bool = True,
    enable_describe_page: bool = True,
    enable_health_endpoint: bool = True,
    repo_url: str | None = None,
    oauth_resource_metadata: (
        OAuthResourceMetadata | None
    ) = None
) -> App[Request, Response]

Create a Falcon WSGI app that serves RPC requests over HTTP.

PARAMETER DESCRIPTION
server

The RpcServer instance to serve.

TYPE: RpcServer

prefix

URL prefix for all RPC endpoints (default "" — root).

TYPE: str DEFAULT: ''

signing_key

HMAC key for signing state tokens. When None (the default), a random 32-byte key is generated per process. This means state tokens issued by one worker are invalid in another — you must provide a shared key for multi-process deployments (e.g. gunicorn with multiple workers).

TYPE: bytes | None DEFAULT: None

max_stream_response_bytes

When set, producer stream responses may buffer multiple batches in a single HTTP response up to this size before emitting a continuation token. The client transparently resumes via POST /{method}/exchange. When None (the default) and max_stream_response_time is also None, each produce cycle emits one batch per HTTP response for incremental streaming.

TYPE: int | None DEFAULT: None

max_stream_response_time

When set, producer stream responses may buffer multiple batches up to this many seconds of wall time before emitting a continuation token. Can be combined with max_stream_response_bytes — the response breaks on whichever limit is reached first.

TYPE: float | None DEFAULT: None

max_request_bytes

When set, the value is advertised via the VGI-Max-Request-Bytes response header on every response (including OPTIONS). Clients can use http_capabilities() to discover this limit and decide whether to use external storage for large payloads. Advertisement only — no server-side enforcement. None (default) omits the header.

TYPE: int | None DEFAULT: None

authenticate

Optional callback that extracts an :class:AuthContext from a Falcon Request. When provided, every request is authenticated before dispatch. The callback should raise ValueError (bad credentials) or PermissionError (forbidden) on failure — these are mapped to HTTP 401. Other exceptions propagate as 500.

TYPE: Callable[[Request], AuthContext] | None DEFAULT: None

cors_origins

Allowed origins for CORS. Pass "*" to allow all origins, a single origin string like "https://example.com", or an iterable of origin strings. None (the default) disables CORS headers. Uses Falcon's built-in CORSMiddleware which also handles preflight OPTIONS requests automatically.

TYPE: str | Iterable[str] | None DEFAULT: None

cors_max_age

Value for the Access-Control-Max-Age header on preflight OPTIONS responses, in seconds. 7200 (2 hours) by default. None omits the header. Only effective when cors_origins is set.

TYPE: int | None DEFAULT: 7200

upload_url_provider

Optional provider for generating pre-signed upload URLs. When set, the __upload_url__/init endpoint is enabled and VGI-Upload-URL-Support: true is advertised on every response.

TYPE: UploadUrlProvider | None DEFAULT: None

max_upload_bytes

When set (and upload_url_provider is set), advertised via the VGI-Max-Upload-Bytes header. Informs clients of the maximum size they may upload to vended URLs. Advertisement only — no server-side enforcement.

TYPE: int | None DEFAULT: None

otel_config

Optional OtelConfig for OpenTelemetry instrumentation. When provided, instrument_server() is called and _OtelFalconMiddleware is prepended for W3C trace propagation. Requires pip install vgi-rpc[otel].

TYPE: object | None DEFAULT: None

sentry_config

Optional SentryConfig for Sentry error reporting. When provided, instrument_server_sentry() is called. Requires pip install vgi-rpc[sentry].

TYPE: object | None DEFAULT: None

token_ttl

Maximum age of stream state tokens in seconds. Tokens older than this are rejected with HTTP 400. Default is 3600 (1 hour). Set to 0 to disable expiry checking.

TYPE: int DEFAULT: 3600

compression_level

Zstandard compression level for HTTP request/ response bodies. 3 (the default) installs _CompressionMiddleware at level 3. Valid range is 1-22. None disables compression entirely.

TYPE: int | None DEFAULT: 3

enable_not_found_page

When True (the default), requests to paths that do not match any RPC route receive a friendly HTML 404 page. Set to False to use Falcon's default 404 behaviour instead.

TYPE: bool DEFAULT: True

enable_landing_page

When True (the default), GET {prefix} returns a friendly HTML landing page showing the protocol name, server ID, and links. Set to False to disable.

TYPE: bool DEFAULT: True

enable_describe_page

When True (the default) and the server has enable_describe=True, GET {prefix}/describe returns an HTML page listing all methods, parameters, and types. The path {prefix}/describe is reserved when active — an RPC method named describe would need the page disabled.

TYPE: bool DEFAULT: True

enable_health_endpoint

When True (the default), GET {prefix}/health returns a JSON health check response with the server's status, ID, and protocol name. The endpoint bypasses authentication. Set to False to disable.

TYPE: bool DEFAULT: True

repo_url

Optional URL to the service's source repository (e.g. a GitHub URL). When provided, a "Source repository" link appears on the landing page and describe page.

TYPE: str | None DEFAULT: None

oauth_resource_metadata

Optional OAuthResourceMetadata for RFC 9728 OAuth discovery. When provided, serves /.well-known/oauth-protected-resource and adds WWW-Authenticate: Bearer resource_metadata="..." to 401 responses.

TYPE: OAuthResourceMetadata | None DEFAULT: None

RETURNS DESCRIPTION
App[Request, Response]

A Falcon application with routes for unary and stream RPC calls.

Source code in vgi_rpc/http/_server.py
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
2496
2497
2498
2499
2500
2501
2502
2503
2504
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
def make_wsgi_app(
    server: RpcServer,
    *,
    prefix: str = "",
    signing_key: bytes | None = None,
    max_stream_response_bytes: int | None = None,
    max_stream_response_time: float | None = None,
    max_request_bytes: int | None = None,
    authenticate: Callable[[falcon.Request], AuthContext] | None = None,
    cors_origins: str | Iterable[str] | None = None,
    cors_max_age: int | None = 7200,
    upload_url_provider: UploadUrlProvider | None = None,
    max_upload_bytes: int | None = None,
    otel_config: object | None = None,
    sentry_config: object | None = None,
    token_ttl: int = 3600,
    compression_level: int | None = 3,
    enable_not_found_page: bool = True,
    enable_landing_page: bool = True,
    enable_describe_page: bool = True,
    enable_health_endpoint: bool = True,
    repo_url: str | None = None,
    oauth_resource_metadata: OAuthResourceMetadata | None = None,
) -> falcon.App[falcon.Request, falcon.Response]:
    """Create a Falcon WSGI app that serves RPC requests over HTTP.

    Args:
        server: The RpcServer instance to serve.
        prefix: URL prefix for all RPC endpoints (default ``""`` — root).
        signing_key: HMAC key for signing state tokens.  When ``None``
            (the default), a random 32-byte key is generated **per process**.
            This means state tokens issued by one worker are invalid in
            another — you **must** provide a shared key for multi-process
            deployments (e.g. gunicorn with multiple workers).
        max_stream_response_bytes: When set, producer stream responses may
            buffer multiple batches in a single HTTP response up to this
            size before emitting a continuation token.  The client
            transparently resumes via ``POST /{method}/exchange``.
            When ``None`` (the default) and ``max_stream_response_time``
            is also ``None``, each produce cycle emits one batch per HTTP
            response for incremental streaming.
        max_stream_response_time: When set, producer stream responses may
            buffer multiple batches up to this many seconds of wall time
            before emitting a continuation token.  Can be combined with
            ``max_stream_response_bytes`` — the response breaks on
            whichever limit is reached first.
        max_request_bytes: When set, the value is advertised via the
            ``VGI-Max-Request-Bytes`` response header on every response
            (including OPTIONS).  Clients can use ``http_capabilities()``
            to discover this limit and decide whether to use external
            storage for large payloads.  Advertisement only — no
            server-side enforcement.  ``None`` (default) omits the header.
        authenticate: Optional callback that extracts an :class:`AuthContext`
            from a Falcon ``Request``.  When provided, every request is
            authenticated before dispatch.  The callback should raise
            ``ValueError`` (bad credentials) or ``PermissionError``
            (forbidden) on failure — these are mapped to HTTP 401.
            Other exceptions propagate as 500.
        cors_origins: Allowed origins for CORS.  Pass ``"*"`` to allow all
            origins, a single origin string like ``"https://example.com"``,
            or an iterable of origin strings.  ``None`` (the default)
            disables CORS headers.  Uses Falcon's built-in
            ``CORSMiddleware`` which also handles preflight OPTIONS
            requests automatically.
        cors_max_age: Value for the ``Access-Control-Max-Age`` header on
            preflight OPTIONS responses, in seconds.  ``7200`` (2 hours)
            by default.  ``None`` omits the header.  Only effective when
            ``cors_origins`` is set.
        upload_url_provider: Optional provider for generating pre-signed
            upload URLs.  When set, the ``__upload_url__/init`` endpoint
            is enabled and ``VGI-Upload-URL-Support: true`` is advertised
            on every response.
        max_upload_bytes: When set (and ``upload_url_provider`` is set),
            advertised via the ``VGI-Max-Upload-Bytes`` header.  Informs
            clients of the maximum size they may upload to vended URLs.
            Advertisement only — no server-side enforcement.
        otel_config: Optional ``OtelConfig`` for OpenTelemetry instrumentation.
            When provided, ``instrument_server()`` is called and
            ``_OtelFalconMiddleware`` is prepended for W3C trace propagation.
            Requires ``pip install vgi-rpc[otel]``.
        sentry_config: Optional ``SentryConfig`` for Sentry error reporting.
            When provided, ``instrument_server_sentry()`` is called.
            Requires ``pip install vgi-rpc[sentry]``.
        token_ttl: Maximum age of stream state tokens in seconds.  Tokens
            older than this are rejected with HTTP 400.  Default is 3600
            (1 hour).  Set to ``0`` to disable expiry checking.
        compression_level: Zstandard compression level for HTTP request/
            response bodies.  ``3`` (the default) installs
            ``_CompressionMiddleware`` at level 3.  Valid range is 1-22.
            ``None`` disables compression entirely.
        enable_not_found_page: When ``True`` (the default), requests to
            paths that do not match any RPC route receive a friendly HTML
            404 page.  Set to ``False`` to use Falcon's default 404
            behaviour instead.
        enable_landing_page: When ``True`` (the default), ``GET {prefix}``
            returns a friendly HTML landing page showing the protocol name,
            server ID, and links.  Set to ``False`` to disable.
        enable_describe_page: When ``True`` (the default) **and** the server
            has ``enable_describe=True``, ``GET {prefix}/describe`` returns
            an HTML page listing all methods, parameters, and types.  The
            path ``{prefix}/describe`` is reserved when active — an RPC
            method named ``describe`` would need the page disabled.
        enable_health_endpoint: When ``True`` (the default),
            ``GET {prefix}/health`` returns a JSON health check response
            with the server's status, ID, and protocol name.  The endpoint
            bypasses authentication.  Set to ``False`` to disable.
        repo_url: Optional URL to the service's source repository (e.g. a
            GitHub URL).  When provided, a "Source repository" link appears
            on the landing page and describe page.
        oauth_resource_metadata: Optional ``OAuthResourceMetadata`` for
            RFC 9728 OAuth discovery.  When provided, serves
            ``/.well-known/oauth-protected-resource`` and adds
            ``WWW-Authenticate: Bearer resource_metadata="..."`` to 401
            responses.

    Returns:
        A Falcon application with routes for unary and stream RPC calls.

    """
    if signing_key is None:
        warnings.warn(
            "No signing_key provided; generating a random per-process key. "
            "State tokens will be invalid across workers — pass a shared key "
            "for multi-process deployments.",
            stacklevel=2,
        )
        signing_key = os.urandom(32)
    # OpenTelemetry instrumentation (optional)
    if otel_config is not None:
        from vgi_rpc.otel import OtelConfig, _OtelFalconMiddleware, instrument_server

        if not isinstance(otel_config, OtelConfig):
            raise TypeError(f"otel_config must be an OtelConfig instance, got {type(otel_config).__name__}")
        instrument_server(server, otel_config)

    # Sentry error reporting (optional)
    if sentry_config is not None:
        from vgi_rpc.sentry import SentryConfig, instrument_server_sentry

        if not isinstance(sentry_config, SentryConfig):
            raise TypeError(f"sentry_config must be a SentryConfig instance, got {type(sentry_config).__name__}")
        instrument_server_sentry(server, sentry_config)

    app_handler = _HttpRpcApp(
        server,
        signing_key,
        max_stream_response_bytes,
        max_stream_response_time,
        max_request_bytes,
        upload_url_provider,
        max_upload_bytes,
        token_ttl,
    )
    middleware: list[Any] = [_DrainRequestMiddleware(), _RequestIdMiddleware(), _AccessLogContextMiddleware()]

    # Compression middleware decompresses request bodies and compresses
    # responses — must come before auth so handlers read plaintext bodies.
    if compression_level is not None:
        middleware.append(_CompressionMiddleware(compression_level))

    # OTel middleware must come before auth so spans cover the full request
    if otel_config is not None:
        middleware.append(_OtelFalconMiddleware())

    # Always expose auth and request-id headers; capability headers are
    # appended conditionally below.
    cors_expose: list[str] = ["WWW-Authenticate", _REQUEST_ID_HEADER, "X-VGI-Content-Encoding", RPC_ERROR_HEADER]

    # Build capability headers
    capability_headers: dict[str, str] = {}
    if max_request_bytes is not None:
        capability_headers[MAX_REQUEST_BYTES_HEADER] = str(max_request_bytes)
        cors_expose.append(MAX_REQUEST_BYTES_HEADER)
    if upload_url_provider is not None:
        capability_headers[UPLOAD_URL_HEADER] = "true"
        cors_expose.append(UPLOAD_URL_HEADER)
        if max_upload_bytes is not None:
            capability_headers[MAX_UPLOAD_BYTES_HEADER] = str(max_upload_bytes)
            cors_expose.append(MAX_UPLOAD_BYTES_HEADER)

    # OAuth resource metadata (RFC 9728)
    from vgi_rpc.http._oauth import OAuthResourceMetadata as _OAuthMeta
    from vgi_rpc.http._oauth import _build_www_authenticate

    www_authenticate: str | None = None
    _validated_oauth_metadata: _OAuthMeta | None = None
    if oauth_resource_metadata is not None:
        if not isinstance(oauth_resource_metadata, _OAuthMeta):
            raise TypeError(
                f"oauth_resource_metadata must be an OAuthResourceMetadata instance, "
                f"got {type(oauth_resource_metadata).__name__}"
            )
        _validated_oauth_metadata = oauth_resource_metadata
        www_authenticate = _build_www_authenticate(_validated_oauth_metadata, prefix)

    if cors_origins is not None:
        cors_kwargs: dict[str, Any] = {
            "allow_origins": cors_origins,
            "expose_headers": cors_expose,
        }
        middleware.append(falcon.CORSMiddleware(**cors_kwargs))
        if cors_max_age is not None:
            middleware.append(_CorsMaxAgeMiddleware(cors_max_age))
    # OAuth PKCE browser flow — only when authenticate + OAuth metadata + client_id
    _pkce_active = False
    _pkce_user_info_html: str | None = None
    _exempt_prefixes_list: list[str] = []
    if enable_health_endpoint:
        _exempt_prefixes_list.append(f"{prefix}/health")
    if (
        authenticate is not None
        and _validated_oauth_metadata is not None
        and _validated_oauth_metadata.client_id is not None
    ):
        from urllib.parse import urlparse as _urlparse

        from vgi_rpc.http._bearer import chain_authenticate
        from vgi_rpc.http._oauth_pkce import (
            _create_oidc_discovery,
            _derive_session_key,
            _OAuthCallbackResource,
            _OAuthLogoutResource,
            _OAuthPkceMiddleware,
            build_user_info_html,
            make_cookie_authenticate,
        )

        _pkce_issuer = _validated_oauth_metadata.authorization_servers[0]
        _pkce_oidc_discovery = _create_oidc_discovery(_pkce_issuer)
        _pkce_session_key = _derive_session_key(signing_key)
        _pkce_resource_parsed = _urlparse(_validated_oauth_metadata.resource)
        _pkce_secure = _pkce_resource_parsed.scheme == "https"
        _pkce_redirect_uri = f"{_pkce_resource_parsed.scheme}://{_pkce_resource_parsed.netloc}{prefix}/_oauth/callback"

        if not _pkce_secure and _pkce_resource_parsed.hostname not in ("localhost", "127.0.0.1", "::1"):
            _logger.warning(
                "OAuth PKCE is configured without HTTPS (%s) — cookies will not be Secure. "
                "This is acceptable for local development but not for production.",
                _validated_oauth_metadata.resource,
            )

        # Wrap authenticate to also accept tokens from a cookie
        _pkce_cookie_auth = make_cookie_authenticate(authenticate)
        authenticate = chain_authenticate(authenticate, _pkce_cookie_auth)

        _pkce_client_id: str = _validated_oauth_metadata.client_id
        _pkce_client_secret = _validated_oauth_metadata.client_secret
        _pkce_use_id_token = _validated_oauth_metadata.use_id_token_as_bearer
        _pkce_scope = (
            " ".join(_validated_oauth_metadata.scopes_supported)
            if _validated_oauth_metadata.scopes_supported
            else "openid email"
        )
        _exempt_prefixes_list.append(f"{prefix}/_oauth/")
        _pkce_active = True
        _pkce_user_info_html = build_user_info_html(prefix)

    on_auth_failure: Callable[[str | None, str], None] | None = None
    if authenticate is not None and otel_config is not None:
        from vgi_rpc.otel import OtelConfig as _OtelCfg
        from vgi_rpc.otel import make_auth_failure_counter

        assert isinstance(otel_config, _OtelCfg)  # validated above
        on_auth_failure = make_auth_failure_counter(otel_config, server.protocol_name)
    middleware.append(
        _AuthMiddleware(
            authenticate,
            www_authenticate=www_authenticate,
            on_auth_failure=on_auth_failure,
            exempt_prefixes=tuple(_exempt_prefixes_list),
        )
    )
    if authenticate is not None and _pkce_active:
        middleware.append(
            _OAuthPkceMiddleware(
                session_key=_pkce_session_key,
                oidc_discovery=_pkce_oidc_discovery,
                client_id=_pkce_client_id,
                prefix=prefix,
                secure_cookie=_pkce_secure,
                redirect_uri=_pkce_redirect_uri,
                scope=_pkce_scope,
            )
        )
    if capability_headers:
        middleware.append(_CapabilitiesMiddleware(capability_headers))
    app: falcon.App[falcon.Request, falcon.Response] = falcon.App(middleware=middleware or None)
    app.set_error_serializer(_error_serializer)

    # OAuth well-known endpoint (must be before RPC routes)
    if _validated_oauth_metadata is not None:
        from vgi_rpc.http._oauth import _OAuthResourceMetadataResource

        well_known = _OAuthResourceMetadataResource(_validated_oauth_metadata)
        app.add_route("/.well-known/oauth-protected-resource", well_known)
        if prefix and prefix != "/":
            app.add_route(f"/.well-known/oauth-protected-resource{prefix}", well_known)

    app.add_route(f"{prefix}/{{method}}", _RpcResource(app_handler))
    app.add_route(f"{prefix}/{{method}}/init", _StreamInitResource(app_handler))
    app.add_route(f"{prefix}/{{method}}/exchange", _ExchangeResource(app_handler))
    if upload_url_provider is not None:
        app.add_route(f"{prefix}/__upload_url__/init", _UploadUrlResource(app_handler))

    # OAuth PKCE callback and logout routes (must be before not-found sink)
    if _pkce_active:
        app.add_route(
            f"{prefix}/_oauth/callback",
            _OAuthCallbackResource(
                session_key=_pkce_session_key,
                oidc_discovery=_pkce_oidc_discovery,
                client_id=_pkce_client_id,
                client_secret=_pkce_client_secret,
                use_id_token=_pkce_use_id_token,
                prefix=prefix,
                secure_cookie=_pkce_secure,
                redirect_uri=_pkce_redirect_uri,
            ),
        )
        app.add_route(f"{prefix}/_oauth/logout", _OAuthLogoutResource(prefix, _pkce_secure))

    # Describe page — GET {prefix}/describe (requires both flags and server support)
    describe_page_active = enable_describe_page and server.describe_enabled
    if describe_page_active:
        describe_html = _build_describe_html(server, prefix, repo_url)
        if _pkce_user_info_html:
            describe_html = describe_html.replace(b"</body>", _pkce_user_info_html.encode() + b"\n</body>")
        app.add_route(f"{prefix}/describe", _DescribePageResource(describe_html))

    # Health endpoint — GET {prefix}/health
    if enable_health_endpoint:
        app.add_route(f"{prefix}/health", _HealthResource(server.server_id, server.protocol_name))

    # Landing page — GET {prefix}
    if enable_landing_page:
        describe_path = f"{prefix}/describe" if describe_page_active else None
        landing_body = _build_landing_html(prefix, server.protocol_name, server.server_id, describe_path, repo_url)
        if _pkce_user_info_html:
            landing_body = landing_body.replace(b"</body>", _pkce_user_info_html.encode() + b"\n</body>")
        app.add_route(prefix or "/", _LandingPageResource(landing_body))

    if enable_not_found_page:
        app.add_sink(_make_not_found_sink(prefix, server.protocol_name))

    _logger.info(
        "WSGI app created for %s (server_id=%s, prefix=%s, auth=%s)",
        server.protocol_name,
        server.server_id,
        prefix,
        "enabled" if authenticate is not None else "disabled",
        extra={
            "server_id": server.server_id,
            "protocol": server.protocol_name,
            "prefix": prefix,
            "auth_enabled": authenticate is not None,
        },
    )

    return app

Client

http_connect

http_connect(
    protocol: type[P],
    base_url: str | None = None,
    *,
    prefix: str | None = None,
    on_log: Callable[[Message], None] | None = None,
    client: Client | _SyncTestClient | None = None,
    external_location: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation = FULL,
    retry: HttpRetryConfig | None = None,
    compression_level: int | None = 3
) -> Iterator[P]

Connect to an HTTP RPC server and yield a typed proxy.

PARAMETER DESCRIPTION
protocol

The Protocol class defining the RPC interface.

TYPE: type[P]

base_url

Base URL of the server (e.g. http://localhost:8000). Required when client is None; ignored when a pre-built client is provided. The internally-created client follows redirects transparently.

TYPE: str | None DEFAULT: None

prefix

URL prefix matching the server's prefix. When None (the default), auto-detected from a _SyncTestClient's .prefix attribute, or "" for other clients.

TYPE: str | None DEFAULT: None

on_log

Optional callback for log messages from the server.

TYPE: Callable[[Message], None] | None DEFAULT: None

client

Optional HTTP client — httpx.Client for production, or a _SyncTestClient from make_sync_client() for testing.

TYPE: Client | _SyncTestClient | None DEFAULT: None

external_location

Optional ExternalLocationConfig for resolving and producing externalized batches.

TYPE: ExternalLocationConfig | None DEFAULT: None

ipc_validation

Validation level for incoming IPC batches.

TYPE: IpcValidation DEFAULT: FULL

retry

Optional retry configuration for transient HTTP failures. When None (the default), no retries are attempted.

TYPE: HttpRetryConfig | None DEFAULT: None

compression_level

Zstandard compression level for request bodies. 3 (the default) compresses requests and adds Content-Encoding: zstd. None disables request compression (httpx still auto-decompresses server responses).

TYPE: int | None DEFAULT: 3

YIELDS DESCRIPTION
P

A typed RPC proxy supporting all methods defined on protocol.

RAISES DESCRIPTION
ValueError

If base_url is None and client is None.

Source code in vgi_rpc/http/_client.py
@contextlib.contextmanager
def http_connect[P](
    protocol: type[P],
    base_url: str | None = None,
    *,
    prefix: str | None = None,
    on_log: Callable[[Message], None] | None = None,
    client: httpx.Client | _SyncTestClient | None = None,
    external_location: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation = IpcValidation.FULL,
    retry: HttpRetryConfig | None = None,
    compression_level: int | None = 3,
) -> Iterator[P]:
    """Connect to an HTTP RPC server and yield a typed proxy.

    Args:
        protocol: The Protocol class defining the RPC interface.
        base_url: Base URL of the server (e.g. ``http://localhost:8000``).
            Required when *client* is ``None``; ignored when a pre-built
            *client* is provided.  The internally-created client follows
            redirects transparently.
        prefix: URL prefix matching the server's prefix.  When ``None``
            (the default), auto-detected from a ``_SyncTestClient``'s
            ``.prefix`` attribute, or ``""`` for other clients.
        on_log: Optional callback for log messages from the server.
        client: Optional HTTP client — ``httpx.Client`` for production,
            or a ``_SyncTestClient`` from ``make_sync_client()`` for testing.
        external_location: Optional ExternalLocationConfig for
            resolving and producing externalized batches.
        ipc_validation: Validation level for incoming IPC batches.
        retry: Optional retry configuration for transient HTTP failures.
            When ``None`` (the default), no retries are attempted.
        compression_level: Zstandard compression level for request bodies.
            ``3`` (the default) compresses requests and adds
            ``Content-Encoding: zstd``.  ``None`` disables request
            compression (httpx still auto-decompresses server responses).

    Yields:
        A typed RPC proxy supporting all methods defined on *protocol*.

    Raises:
        ValueError: If *base_url* is ``None`` and *client* is ``None``.

    """
    own_client = client is None
    if client is None:
        if base_url is None:
            raise ValueError("base_url is required when client is not provided")
        client = httpx.Client(base_url=base_url, follow_redirects=True)

    # Auto-detect prefix from _SyncTestClient when not explicitly provided
    url_prefix = getattr(client, "prefix", "") if prefix is None else prefix
    try:
        yield cast(
            P,
            _HttpProxy(
                protocol,
                client,
                url_prefix,
                on_log,
                external_config=external_location,
                ipc_validation=ipc_validation,
                retry_config=retry,
                compression_level=compression_level,
            ),
        )
    finally:
        if own_client:
            client.close()

http_introspect

http_introspect(
    base_url: str | None = None,
    *,
    prefix: str | None = None,
    client: Client | _SyncTestClient | None = None,
    ipc_validation: IpcValidation = FULL,
    retry: HttpRetryConfig | None = None
) -> ServiceDescription

Send a __describe__ request over HTTP and return a ServiceDescription.

PARAMETER DESCRIPTION
base_url

Base URL of the server (e.g. http://localhost:8000). Required when client is None.

TYPE: str | None DEFAULT: None

prefix

URL prefix matching the server's prefix. None auto-detects from _SyncTestClient.

TYPE: str | None DEFAULT: None

client

Optional HTTP client (httpx.Client or _SyncTestClient).

TYPE: Client | _SyncTestClient | None DEFAULT: None

ipc_validation

Validation level for incoming IPC batches.

TYPE: IpcValidation DEFAULT: FULL

retry

Optional retry configuration for transient HTTP failures.

TYPE: HttpRetryConfig | None DEFAULT: None

RETURNS DESCRIPTION
ServiceDescription

A ServiceDescription with all method metadata.

RAISES DESCRIPTION
RpcError

If the server does not support introspection or returns an error.

ValueError

If base_url is None and client is None.

Source code in vgi_rpc/http/_client.py
def http_introspect(
    base_url: str | None = None,
    *,
    prefix: str | None = None,
    client: httpx.Client | _SyncTestClient | None = None,
    ipc_validation: IpcValidation = IpcValidation.FULL,
    retry: HttpRetryConfig | None = None,
) -> ServiceDescription:
    """Send a ``__describe__`` request over HTTP and return a ``ServiceDescription``.

    Args:
        base_url: Base URL of the server (e.g. ``http://localhost:8000``).
            Required when *client* is ``None``.
        prefix: URL prefix matching the server's prefix.  ``None``
            auto-detects from ``_SyncTestClient``.
        client: Optional HTTP client (``httpx.Client`` or ``_SyncTestClient``).
        ipc_validation: Validation level for incoming IPC batches.
        retry: Optional retry configuration for transient HTTP failures.

    Returns:
        A ``ServiceDescription`` with all method metadata.

    Raises:
        RpcError: If the server does not support introspection or returns
            an error.
        ValueError: If *base_url* is ``None`` and *client* is ``None``.

    """
    from vgi_rpc.introspect import DESCRIBE_METHOD_NAME, parse_describe_batch

    own_client = client is None
    if client is None:
        if base_url is None:
            raise ValueError("base_url is required when client is not provided")
        client = httpx.Client(base_url=base_url, follow_redirects=True)
    if prefix is None:
        prefix = getattr(client, "prefix", "")

    try:
        # Build a minimal request: empty params with __describe__ method name
        req_buf = BytesIO()
        request_metadata = pa.KeyValueMetadata(
            {
                b"vgi_rpc.method": DESCRIBE_METHOD_NAME.encode(),
                b"vgi_rpc.request_version": b"1",
            }
        )
        with ipc.new_stream(req_buf, _EMPTY_SCHEMA) as writer:
            writer.write_batch(
                pa.RecordBatch.from_pydict({}, schema=_EMPTY_SCHEMA),
                custom_metadata=request_metadata,
            )

        resp = _post_with_retry(
            client,
            f"{prefix}/{DESCRIBE_METHOD_NAME}",
            content=req_buf.getvalue(),
            headers={"Content-Type": _ARROW_CONTENT_TYPE},
            config=retry,
        )

        reader = _open_response_stream(resp.content, resp.status_code, ipc_validation)
        # Skip log batches
        while True:
            batch, custom_metadata = reader.read_next_batch_with_custom_metadata()
            if not _dispatch_log_or_error(batch, custom_metadata):
                break
        _drain_stream(reader)

        return parse_describe_batch(batch, custom_metadata)
    finally:
        if own_client:
            client.close()

http_capabilities

http_capabilities(
    base_url: str | None = None,
    *,
    prefix: str | None = None,
    client: Client | _SyncTestClient | None = None,
    retry: HttpRetryConfig | None = None
) -> HttpServerCapabilities

Discover server capabilities via an OPTIONS request.

Sends OPTIONS {prefix}/__capabilities__ and reads capability headers (VGI-Max-Request-Bytes, VGI-Upload-URL-Support, VGI-Max-Upload-Bytes) from the response.

PARAMETER DESCRIPTION
base_url

Base URL of the server (e.g. http://localhost:8000). Required when client is None.

TYPE: str | None DEFAULT: None

prefix

URL prefix matching the server's prefix. None auto-detects from _SyncTestClient.

TYPE: str | None DEFAULT: None

client

Optional HTTP client (httpx.Client or _SyncTestClient).

TYPE: Client | _SyncTestClient | None DEFAULT: None

retry

Optional retry configuration for transient HTTP failures.

TYPE: HttpRetryConfig | None DEFAULT: None

RETURNS DESCRIPTION
HttpServerCapabilities

An HttpServerCapabilities with discovered values.

RAISES DESCRIPTION
ValueError

If base_url is None and client is None.

Source code in vgi_rpc/http/_client.py
def http_capabilities(
    base_url: str | None = None,
    *,
    prefix: str | None = None,
    client: httpx.Client | _SyncTestClient | None = None,
    retry: HttpRetryConfig | None = None,
) -> HttpServerCapabilities:
    """Discover server capabilities via an OPTIONS request.

    Sends ``OPTIONS {prefix}/__capabilities__`` and reads capability
    headers (``VGI-Max-Request-Bytes``, ``VGI-Upload-URL-Support``,
    ``VGI-Max-Upload-Bytes``) from the response.

    Args:
        base_url: Base URL of the server (e.g. ``http://localhost:8000``).
            Required when *client* is ``None``.
        prefix: URL prefix matching the server's prefix.  ``None``
            auto-detects from ``_SyncTestClient``.
        client: Optional HTTP client (``httpx.Client`` or ``_SyncTestClient``).
        retry: Optional retry configuration for transient HTTP failures.

    Returns:
        An ``HttpServerCapabilities`` with discovered values.

    Raises:
        ValueError: If *base_url* is ``None`` and *client* is ``None``.

    """
    own_client = client is None
    if client is None:
        if base_url is None:
            raise ValueError("base_url is required when client is not provided")
        client = httpx.Client(base_url=base_url, follow_redirects=True)
    if prefix is None:
        prefix = getattr(client, "prefix", "")

    try:
        url = f"{prefix}/__capabilities__"
        resp = _options_with_retry(client, url, config=retry)
        headers = resp.headers

        max_req: int | None = None
        raw = headers.get(MAX_REQUEST_BYTES_HEADER) or headers.get(MAX_REQUEST_BYTES_HEADER.lower())
        if raw is not None:
            with contextlib.suppress(ValueError):
                max_req = int(raw)

        upload_raw = headers.get(UPLOAD_URL_HEADER) or headers.get(UPLOAD_URL_HEADER.lower())
        upload_support = upload_raw == "true" if upload_raw is not None else False

        max_upload: int | None = None
        upload_bytes_raw = headers.get(MAX_UPLOAD_BYTES_HEADER) or headers.get(MAX_UPLOAD_BYTES_HEADER.lower())
        if upload_bytes_raw is not None:
            with contextlib.suppress(ValueError):
                max_upload = int(upload_bytes_raw)

        return HttpServerCapabilities(
            max_request_bytes=max_req,
            upload_url_support=upload_support,
            max_upload_bytes=max_upload,
        )
    finally:
        if own_client:
            client.close()

request_upload_urls

request_upload_urls(
    base_url: str | None = None,
    *,
    count: int = 1,
    prefix: str | None = None,
    client: Client | _SyncTestClient | None = None,
    retry: HttpRetryConfig | None = None
) -> list[UploadUrl]

Request pre-signed upload URLs from the server's __upload_url__ endpoint.

The server must have been configured with an upload_url_provider in make_wsgi_app().

PARAMETER DESCRIPTION
base_url

Base URL of the server (e.g. http://localhost:8000). Required when client is None.

TYPE: str | None DEFAULT: None

count

Number of upload URLs to request (default 1, max 100).

TYPE: int DEFAULT: 1

prefix

URL prefix matching the server's prefix. None auto-detects from _SyncTestClient.

TYPE: str | None DEFAULT: None

client

Optional HTTP client (httpx.Client or _SyncTestClient).

TYPE: Client | _SyncTestClient | None DEFAULT: None

retry

Optional retry configuration for transient HTTP failures.

TYPE: HttpRetryConfig | None DEFAULT: None

RETURNS DESCRIPTION
list[UploadUrl]

A list of UploadUrl objects with pre-signed PUT and GET URLs.

RAISES DESCRIPTION
RpcError

If the server does not support upload URLs (404) or returns an error.

ValueError

If base_url is None and client is None.

Source code in vgi_rpc/http/_client.py
def request_upload_urls(
    base_url: str | None = None,
    *,
    count: int = 1,
    prefix: str | None = None,
    client: httpx.Client | _SyncTestClient | None = None,
    retry: HttpRetryConfig | None = None,
) -> list[UploadUrl]:
    """Request pre-signed upload URLs from the server's ``__upload_url__`` endpoint.

    The server must have been configured with an ``upload_url_provider``
    in ``make_wsgi_app()``.

    Args:
        base_url: Base URL of the server (e.g. ``http://localhost:8000``).
            Required when *client* is ``None``.
        count: Number of upload URLs to request (default 1, max 100).
        prefix: URL prefix matching the server's prefix.  ``None``
            auto-detects from ``_SyncTestClient``.
        client: Optional HTTP client (``httpx.Client`` or ``_SyncTestClient``).
        retry: Optional retry configuration for transient HTTP failures.

    Returns:
        A list of ``UploadUrl`` objects with pre-signed PUT and GET URLs.

    Raises:
        RpcError: If the server does not support upload URLs (404) or
            returns an error.
        ValueError: If *base_url* is ``None`` and *client* is ``None``.

    """
    own_client = client is None
    if client is None:
        if base_url is None:
            raise ValueError("base_url is required when client is not provided")
        client = httpx.Client(base_url=base_url, follow_redirects=True)
    if prefix is None:
        prefix = getattr(client, "prefix", "")

    try:
        # Build request IPC with standard wire protocol metadata
        req_buf = BytesIO()
        _write_request(req_buf, _UPLOAD_URL_METHOD, _UPLOAD_URL_PARAMS_SCHEMA, {"count": count})

        resp = _post_with_retry(
            client,
            f"{prefix}/__upload_url__/init",
            content=req_buf.getvalue(),
            headers={"Content-Type": _ARROW_CONTENT_TYPE},
            config=retry,
        )

        # Without an upload_url_provider the route doesn't exist and the
        # request falls through to _StreamInitResource → 404.
        if resp.status_code == HTTPStatus.NOT_FOUND:
            raise RpcError("NotSupported", "Server does not support upload URLs", "")

        reader = _open_response_stream(resp.content, resp.status_code)
        urls: list[UploadUrl] = []
        try:
            while True:
                try:
                    batch, custom_metadata = reader.read_next_batch_with_custom_metadata()
                except StopIteration:
                    break

                if _dispatch_log_or_error(batch, custom_metadata):
                    continue

                for i in range(batch.num_rows):
                    upload_url = batch.column("upload_url")[i].as_py()
                    download_url = batch.column("download_url")[i].as_py()
                    expires_at = batch.column("expires_at")[i].as_py()
                    urls.append(UploadUrl(upload_url=upload_url, download_url=download_url, expires_at=expires_at))
        except RpcError:
            _drain_stream(reader)
            raise
        _drain_stream(reader)
        return urls
    finally:
        if own_client:
            client.close()

Capabilities

HttpServerCapabilities dataclass

HttpServerCapabilities(
    max_request_bytes: int | None = None,
    upload_url_support: bool = False,
    max_upload_bytes: int | None = None,
)

Capabilities advertised by an HTTP RPC server.

ATTRIBUTE DESCRIPTION
max_request_bytes

Maximum request body size the server advertises, or None if the server does not advertise a limit. Advertisement only -- no server-side enforcement.

TYPE: int | None

upload_url_support

Whether the server supports the __upload_url__ endpoint for client-side uploads.

TYPE: bool

max_upload_bytes

Maximum upload size the server advertises for client-vended URLs, or None if not advertised. Advertisement only -- no server-side enforcement.

TYPE: int | None

Stream Session

HttpStreamSession

HttpStreamSession(
    client: Client | _SyncTestClient,
    url_prefix: str,
    method: str,
    state_bytes: bytes | None,
    output_schema: Schema,
    on_log: Callable[[Message], None] | None = None,
    *,
    external_config: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation = FULL,
    pending_batches: list[AnnotatedBatch] | None = None,
    finished: bool = False,
    header: object | None = None,
    retry_config: HttpRetryConfig | None = None,
    compression_level: int | None = None
)

Client-side handle for a stream over HTTP (both producer and exchange patterns).

For producer streams, use __iter__() — yields batches from batched responses and follows continuation tokens transparently. For exchange streams, use exchange() — sends an input batch and receives an output batch.

Supports context manager protocol for convenience.

Initialize with HTTP client, method details, and initial state.

Source code in vgi_rpc/http/_client.py
def __init__(
    self,
    client: httpx.Client | _SyncTestClient,
    url_prefix: str,
    method: str,
    state_bytes: bytes | None,
    output_schema: pa.Schema,
    on_log: Callable[[Message], None] | None = None,
    *,
    external_config: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation = IpcValidation.FULL,
    pending_batches: list[AnnotatedBatch] | None = None,
    finished: bool = False,
    header: object | None = None,
    retry_config: HttpRetryConfig | None = None,
    compression_level: int | None = None,
) -> None:
    """Initialize with HTTP client, method details, and initial state."""
    self._client = client
    self._url_prefix = url_prefix
    self._method = method
    self._state_bytes = state_bytes
    self._output_schema = output_schema
    self._on_log = on_log
    self._external_config = external_config
    self._ipc_validation = ipc_validation
    self._pending_batches: list[AnnotatedBatch] = pending_batches or []
    self._finished = finished
    self._header = header
    self._retry_config = retry_config
    self._compression_level = compression_level

header property

header: object | None

The stream header, or None if the stream has no header.

typed_header

typed_header(header_type: type[H]) -> H

Return the stream header narrowed to the expected type.

PARAMETER DESCRIPTION
header_type

The expected header dataclass type.

TYPE: type[H]

RETURNS DESCRIPTION
H

The header, typed as header_type.

RAISES DESCRIPTION
TypeError

If the header is None or not an instance of header_type.

Source code in vgi_rpc/http/_client.py
def typed_header[H: ArrowSerializableDataclass](self, header_type: type[H]) -> H:
    """Return the stream header narrowed to the expected type.

    Args:
        header_type: The expected header dataclass type.

    Returns:
        The header, typed as *header_type*.

    Raises:
        TypeError: If the header is ``None`` or not an instance of
            *header_type*.

    """
    if self._header is None:
        raise TypeError(f"Stream has no header (expected {header_type.__name__})")
    if not isinstance(self._header, header_type):
        raise TypeError(f"Header type mismatch: expected {header_type.__name__}, got {type(self._header).__name__}")
    return self._header

exchange

exchange(input_batch: AnnotatedBatch) -> AnnotatedBatch

Send an input batch and receive the output batch.

PARAMETER DESCRIPTION
input_batch

The input batch to send.

TYPE: AnnotatedBatch

RETURNS DESCRIPTION
AnnotatedBatch

The output batch from the server.

RAISES DESCRIPTION
RpcError

If the server reports an error or the stream has finished.

Source code in vgi_rpc/http/_client.py
def exchange(self, input_batch: AnnotatedBatch) -> AnnotatedBatch:
    """Send an input batch and receive the output batch.

    Args:
        input_batch: The input batch to send.

    Returns:
        The output batch from the server.

    Raises:
        RpcError: If the server reports an error or the stream has finished.

    """
    if self._state_bytes is None:
        raise RpcError("ProtocolError", "Stream has finished — no state token available", "")

    batch_to_write = input_batch.batch
    cm_to_write = input_batch.custom_metadata

    # Client-side externalization for large inputs
    if self._external_config is not None:
        batch_to_write, cm_to_write = maybe_externalize_batch(batch_to_write, cm_to_write, self._external_config)

    # Write input batch with state in metadata
    req_buf = BytesIO()
    state_md = pa.KeyValueMetadata({STATE_KEY: self._state_bytes})
    merged = merge_metadata(cm_to_write, state_md)
    with ipc.new_stream(req_buf, batch_to_write.schema) as writer:
        writer.write_batch(batch_to_write, custom_metadata=merged)

    if wire_http_logger.isEnabledFor(logging.DEBUG):
        wire_http_logger.debug(
            "HTTP stream exchange: method=%s, input=%s",
            self._method,
            fmt_batch(batch_to_write),
        )
    # Exchange calls are NOT retried: the server's process() method may
    # have side effects, and a proxy 502 after server processing would
    # cause duplicate execution.  Only init/unary/continuation are retried.
    resp = self._client.post(
        f"{self._url_prefix}/{self._method}/exchange",
        content=self._prepare_body(req_buf.getvalue()),
        headers=self._build_headers(),
    )
    if wire_http_logger.isEnabledFor(logging.DEBUG):
        wire_http_logger.debug(
            "HTTP stream exchange response: method=%s, status=%d, size=%d",
            self._method,
            resp.status_code,
            len(resp.content),
        )

    # Read response — log batches + data batch with state
    reader = _open_response_stream(resp.content, resp.status_code, self._ipc_validation)
    try:
        ab = _read_batch_with_log_check(reader, self._on_log, self._external_config)
    except RpcError:
        _drain_stream(reader)
        raise

    # Extract updated state from metadata
    if ab.custom_metadata is not None:
        new_state = ab.custom_metadata.get(STATE_KEY)
        if new_state is not None:
            self._state_bytes = new_state

    # Strip state token from user-visible metadata
    user_cm = strip_keys(ab.custom_metadata, STATE_KEY)

    _drain_stream(reader)
    return AnnotatedBatch(batch=ab.batch, custom_metadata=user_cm)

__iter__

__iter__() -> Iterator[AnnotatedBatch]

Iterate over output batches from a producer stream.

Yields pre-loaded batches from init, then follows continuation tokens.

Source code in vgi_rpc/http/_client.py
def __iter__(self) -> Iterator[AnnotatedBatch]:
    """Iterate over output batches from a producer stream.

    Yields pre-loaded batches from init, then follows continuation tokens.
    """
    # Yield pre-loaded batches from init response
    yield from self._pending_batches
    self._pending_batches.clear()

    if self._finished:
        return

    # Follow continuation tokens
    if self._state_bytes is None:
        return

    reader: ValidatedReader | None = None
    try:
        reader = self._send_continuation(self._state_bytes)
        while True:
            try:
                batch, custom_metadata = reader.read_next_batch_with_custom_metadata()
            except StopIteration:
                break

            # Check for continuation token (zero-row batch with STATE_KEY)
            if batch.num_rows == 0 and custom_metadata is not None:
                token = custom_metadata.get(STATE_KEY)
                if token is not None:
                    if not isinstance(token, bytes):
                        raise TypeError(f"Expected bytes for state token, got {type(token).__name__}")
                    _drain_stream(reader)
                    reader = self._send_continuation(token)
                    continue

            # Dispatch log/error batches
            if _dispatch_log_or_error(batch, custom_metadata, self._on_log):
                continue

            resolved_batch, resolved_cm = resolve_external_location(
                batch, custom_metadata, self._external_config, self._on_log, reader.ipc_validation
            )
            yield AnnotatedBatch(batch=resolved_batch, custom_metadata=resolved_cm)
    except RpcError:
        if reader is not None:
            _drain_stream(reader)
        raise

close

close() -> None

Close the session (no-op for HTTP — stateless).

Source code in vgi_rpc/http/_client.py
def close(self) -> None:
    """Close the session (no-op for HTTP — stateless)."""

cancel

cancel() -> None

Signal the server to discard stream state and stop processing.

Sends a POST {prefix}/{method}/exchange carrying vgi_rpc.cancel metadata alongside the current state token. The server invokes state.on_cancel(ctx) (if defined) and releases the state.

Idempotent and best-effort: network failures are swallowed. After cancel(), the session is marked finished; further exchange() or iteration raises RpcError.

Source code in vgi_rpc/http/_client.py
def cancel(self) -> None:
    """Signal the server to discard stream state and stop processing.

    Sends a ``POST {prefix}/{method}/exchange`` carrying ``vgi_rpc.cancel``
    metadata alongside the current state token. The server invokes
    ``state.on_cancel(ctx)`` (if defined) and releases the state.

    Idempotent and best-effort: network failures are swallowed. After
    ``cancel()``, the session is marked finished; further ``exchange()``
    or iteration raises ``RpcError``.
    """
    if self._finished or self._state_bytes is None:
        self._finished = True
        self._state_bytes = None
        return
    token = self._state_bytes
    self._finished = True
    self._state_bytes = None
    if wire_http_logger.isEnabledFor(logging.DEBUG):
        wire_http_logger.debug("HTTP stream cancel: method=%s", self._method)
    req_buf = BytesIO()
    cancel_md = pa.KeyValueMetadata({STATE_KEY: token, CANCEL_KEY: b"1"})
    with ipc.new_stream(req_buf, _EMPTY_SCHEMA) as writer:
        writer.write_batch(empty_batch(_EMPTY_SCHEMA), custom_metadata=cancel_md)
    try:
        resp = self._client.post(
            f"{self._url_prefix}/{self._method}/exchange",
            content=self._prepare_body(req_buf.getvalue()),
            headers=self._build_headers(),
        )
    except Exception:
        return
    with contextlib.suppress(Exception):
        reader = _open_response_stream(resp.content, resp.status_code, self._ipc_validation)
        _drain_stream(reader)

__enter__

__enter__() -> HttpStreamSession

Enter the context.

Source code in vgi_rpc/http/_client.py
def __enter__(self) -> HttpStreamSession:
    """Enter the context."""
    return self

__exit__

__exit__(
    _exc_type: type[BaseException] | None,
    _exc_val: BaseException | None,
    _exc_tb: TracebackType | None,
) -> None

Exit the context.

Source code in vgi_rpc/http/_client.py
def __exit__(
    self,
    _exc_type: type[BaseException] | None,
    _exc_val: BaseException | None,
    _exc_tb: TracebackType | None,
) -> None:
    """Exit the context."""
    self.close()

Testing

make_sync_client

make_sync_client(
    server: RpcServer,
    *,
    prefix: str = "",
    signing_key: bytes | None = None,
    max_stream_response_bytes: int | None = None,
    max_request_bytes: int | None = None,
    authenticate: (
        Callable[[Request], AuthContext] | None
    ) = None,
    default_headers: dict[str, str] | None = None,
    upload_url_provider: UploadUrlProvider | None = None,
    max_upload_bytes: int | None = None,
    otel_config: object | None = None,
    sentry_config: object | None = None,
    token_ttl: int = 3600,
    compression_level: int | None = 3,
    enable_not_found_page: bool = True,
    enable_landing_page: bool = True,
    enable_describe_page: bool = True,
    enable_health_endpoint: bool = True,
    repo_url: str | None = None,
    oauth_resource_metadata: (
        OAuthResourceMetadata | None
    ) = None
) -> _SyncTestClient

Create a synchronous test client for an RpcServer.

Uses falcon.testing.TestClient internally — no real HTTP server needed.

PARAMETER DESCRIPTION
server

The RpcServer to test.

TYPE: RpcServer

prefix

URL prefix for RPC endpoints (default "" — root).

TYPE: str DEFAULT: ''

signing_key

HMAC key for signing state tokens (see make_wsgi_app for details).

TYPE: bytes | None DEFAULT: None

max_stream_response_bytes

See make_wsgi_app.

TYPE: int | None DEFAULT: None

max_request_bytes

See make_wsgi_app.

TYPE: int | None DEFAULT: None

authenticate

See make_wsgi_app.

TYPE: Callable[[Request], AuthContext] | None DEFAULT: None

default_headers

Headers merged into every request (e.g. auth tokens).

TYPE: dict[str, str] | None DEFAULT: None

upload_url_provider

See make_wsgi_app.

TYPE: UploadUrlProvider | None DEFAULT: None

max_upload_bytes

See make_wsgi_app.

TYPE: int | None DEFAULT: None

otel_config

See make_wsgi_app.

TYPE: object | None DEFAULT: None

sentry_config

See make_wsgi_app.

TYPE: object | None DEFAULT: None

token_ttl

See make_wsgi_app.

TYPE: int DEFAULT: 3600

compression_level

See make_wsgi_app.

TYPE: int | None DEFAULT: 3

enable_not_found_page

See make_wsgi_app.

TYPE: bool DEFAULT: True

enable_landing_page

See make_wsgi_app.

TYPE: bool DEFAULT: True

enable_describe_page

See make_wsgi_app.

TYPE: bool DEFAULT: True

enable_health_endpoint

See make_wsgi_app.

TYPE: bool DEFAULT: True

repo_url

See make_wsgi_app.

TYPE: str | None DEFAULT: None

oauth_resource_metadata

See make_wsgi_app.

TYPE: OAuthResourceMetadata | None DEFAULT: None

RETURNS DESCRIPTION
_SyncTestClient

A sync client that can be passed to http_connect(client=...).

Source code in vgi_rpc/http/_testing.py
def make_sync_client(
    server: RpcServer,
    *,
    prefix: str = "",
    signing_key: bytes | None = None,
    max_stream_response_bytes: int | None = None,
    max_request_bytes: int | None = None,
    authenticate: Callable[[falcon.Request], AuthContext] | None = None,
    default_headers: dict[str, str] | None = None,
    upload_url_provider: UploadUrlProvider | None = None,
    max_upload_bytes: int | None = None,
    otel_config: object | None = None,
    sentry_config: object | None = None,
    token_ttl: int = 3600,
    compression_level: int | None = 3,
    enable_not_found_page: bool = True,
    enable_landing_page: bool = True,
    enable_describe_page: bool = True,
    enable_health_endpoint: bool = True,
    repo_url: str | None = None,
    oauth_resource_metadata: OAuthResourceMetadata | None = None,
) -> _SyncTestClient:
    """Create a synchronous test client for an RpcServer.

    Uses ``falcon.testing.TestClient`` internally — no real HTTP server needed.

    Args:
        server: The RpcServer to test.
        prefix: URL prefix for RPC endpoints (default ``""`` — root).
        signing_key: HMAC key for signing state tokens (see
            ``make_wsgi_app`` for details).
        max_stream_response_bytes: See ``make_wsgi_app``.
        max_request_bytes: See ``make_wsgi_app``.
        authenticate: See ``make_wsgi_app``.
        default_headers: Headers merged into every request (e.g. auth tokens).
        upload_url_provider: See ``make_wsgi_app``.
        max_upload_bytes: See ``make_wsgi_app``.
        otel_config: See ``make_wsgi_app``.
        sentry_config: See ``make_wsgi_app``.
        token_ttl: See ``make_wsgi_app``.
        compression_level: See ``make_wsgi_app``.
        enable_not_found_page: See ``make_wsgi_app``.
        enable_landing_page: See ``make_wsgi_app``.
        enable_describe_page: See ``make_wsgi_app``.
        enable_health_endpoint: See ``make_wsgi_app``.
        repo_url: See ``make_wsgi_app``.
        oauth_resource_metadata: See ``make_wsgi_app``.

    Returns:
        A sync client that can be passed to ``http_connect(client=...)``.

    """
    app = make_wsgi_app(
        server,
        prefix=prefix,
        signing_key=signing_key,
        max_stream_response_bytes=max_stream_response_bytes,
        max_request_bytes=max_request_bytes,
        authenticate=authenticate,
        upload_url_provider=upload_url_provider,
        max_upload_bytes=max_upload_bytes,
        otel_config=otel_config,
        sentry_config=sentry_config,
        token_ttl=token_ttl,
        compression_level=compression_level,
        enable_not_found_page=enable_not_found_page,
        enable_landing_page=enable_landing_page,
        enable_describe_page=enable_describe_page,
        enable_health_endpoint=enable_health_endpoint,
        repo_url=repo_url,
        oauth_resource_metadata=oauth_resource_metadata,
    )
    return _SyncTestClient(app, default_headers=default_headers, prefix=prefix)

Header Constants

MAX_REQUEST_BYTES_HEADER module-attribute

MAX_REQUEST_BYTES_HEADER = 'VGI-Max-Request-Bytes'

MAX_UPLOAD_BYTES_HEADER module-attribute

MAX_UPLOAD_BYTES_HEADER = 'VGI-Max-Upload-Bytes'

UPLOAD_URL_HEADER module-attribute

UPLOAD_URL_HEADER = 'VGI-Upload-URL-Support'