Skip to content

External Storage

When RPC payloads grow large — multi-megabyte query results, bulk data imports, large model artifacts — they can exceed the capacity of the underlying transport. HTTP servers and reverse proxies typically enforce request and response body size limits (e.g., 10 MB). Even pipe-based transports become inefficient when serializing very large batches inline.

External storage solves this by transparently offloading oversized Arrow IPC batches to cloud storage (S3, GCS, or any compatible backend). The batch is replaced with a lightweight pointer batch — a zero-row batch carrying a download URL in its metadata. The receiving side resolves the pointer automatically, fetching the actual data in parallel chunks.

This works in both directions:

  • Large outputs — the server externalizes result batches that exceed a size threshold
  • Large inputs — the client uploads input data to a pre-signed URL and sends a pointer in the RPC request

From the caller's perspective, nothing changes. The proxy returns the same typed results; the externalization is invisible.

How It Works

Large Outputs (Server to Client)

When a server method returns a batch that exceeds externalize_threshold_bytes, the framework intercepts the response before writing it to the wire:

  1. The batch is serialized to Arrow IPC format
  2. Optionally compressed with zstd
  3. Uploaded to cloud storage via the configured ExternalStorage backend
  4. The storage backend returns a download URL (typically a pre-signed URL with an expiry)
  5. A pointer batch replaces the original — a zero-row batch with vgi_rpc.location metadata containing the URL
  6. The pointer batch is written to the wire instead of the full data

On the client side, pointer batches are detected and resolved transparently:

  1. The client reads the pointer batch and extracts the URL from vgi_rpc.location metadata
  2. A HEAD probe determines the content size and whether the server supports range requests
  3. For large payloads (>64 MB by default), parallel range-request fetching splits the download into chunks with speculative hedging for slow chunks
  4. For smaller payloads, a single GET request fetches the data
  5. If the data was compressed, it is decompressed automatically
  6. The Arrow IPC stream is parsed back into a RecordBatch and returned to the caller

This works for both unary results and streaming outputs. For streaming, all batches in a single output cycle (including any log batches) are serialized as one IPC stream, uploaded together, and resolved as a unit.

Large Inputs (Client to Server)

For the reverse direction — when a client needs to send large data to the server — the framework provides upload URLs. This is particularly important for HTTP transport where request body size limits are common.

  1. The client calls request_upload_urls() to get one or more pre-signed URL pairs from the server
  2. Each UploadUrl contains an upload_url (for PUT) and a download_url (for GET) — these may be the same or different depending on the storage backend
  3. The client serializes the large batch to Arrow IPC and uploads it directly to the upload_url via HTTP PUT, bypassing the RPC server entirely
  4. The client sends the RPC request with a pointer batch referencing the download_url
  5. The server resolves the pointer batch transparently, fetching the data from storage

This pattern lets clients send arbitrarily large payloads even when the HTTP server enforces strict request size limits — the large data goes directly to cloud storage, and only a small pointer crosses the RPC boundary.

The upload URL endpoint is enabled by passing upload_url_provider to make_wsgi_app(). The server advertises this capability via the VGI-Upload-URL-Support: true response header, and clients can discover it with http_capabilities().

Server Configuration

from vgi_rpc import Compression, ExternalLocationConfig, FetchConfig, RpcServer, S3Storage

storage = S3Storage(bucket="my-bucket", prefix="rpc-data/")
config = ExternalLocationConfig(
    storage=storage,
    externalize_threshold_bytes=1_048_576,  # 1 MiB — batches above this go to S3
    compression=Compression(),              # zstd level 3 by default
    fetch_config=FetchConfig(
        chunk_size_bytes=8 * 1024 * 1024,   # 8 MiB parallel chunks
        max_parallel_requests=8,
    ),
)

server = RpcServer(MyService, MyServiceImpl(), external_location=config)

The same ExternalLocationConfig is passed to the client so it can resolve pointer batches in responses:

from vgi_rpc import connect

with connect(MyService, ["python", "worker.py"], external_location=config) as proxy:
    result = proxy.large_query()  # pointer batches resolved transparently

For HTTP transport with upload URL support:

from vgi_rpc import make_wsgi_app

app = make_wsgi_app(
    server,
    upload_url_provider=storage,       # enables __upload_url__ endpoint
    max_upload_bytes=100 * 1024 * 1024,  # advertise 100 MiB max upload
)

Client Upload URLs

from vgi_rpc import UploadUrl, request_upload_urls

# Get pre-signed upload URLs from the server
urls: list[UploadUrl] = request_upload_urls(
    base_url="http://localhost:8080",
    count=1,
)

# Upload large data directly to storage (bypasses RPC server)
import httpx
ipc_data = serialize_large_batch(my_batch)
httpx.put(urls[0].upload_url, content=ipc_data)

# Send RPC request with pointer to uploaded data
# The server resolves the pointer transparently

Clients can discover whether the server supports upload URLs:

from vgi_rpc import http_capabilities

caps = http_capabilities("http://localhost:8080")
if caps.upload_url_support:
    urls = request_upload_urls("http://localhost:8080", count=5)

Compression

When compression is enabled, batches are compressed with zstd before upload and decompressed transparently on fetch:

from vgi_rpc import Compression

# Default: zstd level 3
config = ExternalLocationConfig(
    storage=storage,
    compression=Compression(),  # algorithm="zstd", level=3
)

# Custom compression level (higher = smaller but slower)
config = ExternalLocationConfig(
    storage=storage,
    compression=Compression(level=9),
)

# Disable compression
config = ExternalLocationConfig(
    storage=storage,
    compression=None,
)

The storage backend stores the Content-Encoding header alongside the object. When the client fetches the data, the Content-Encoding: zstd header triggers automatic decompression.

Requires pip install vgi-rpc[external] (installs zstandard).

Parallel Fetching

For large externalized batches, the client fetches data in parallel chunks using range requests. This significantly reduces download time for multi-megabyte payloads:

  • A HEAD probe determines the total size and whether the server supports Accept-Ranges: bytes
  • If the payload exceeds parallel_threshold_bytes (default 64 MB) and ranges are supported, it is split into chunk_size_bytes chunks (default 8 MB) fetched concurrently
  • Speculative hedging: if a chunk takes longer than speculative_retry_multiplier times the median chunk time, a hedge request is launched in parallel — the first response wins
  • For smaller payloads or servers without range support, a single GET request is used
from vgi_rpc import FetchConfig

fetch_config = FetchConfig(
    parallel_threshold_bytes=64 * 1024 * 1024,  # 64 MiB
    chunk_size_bytes=8 * 1024 * 1024,            # 8 MiB chunks
    max_parallel_requests=8,                      # concurrent fetches
    timeout_seconds=60.0,                         # overall deadline
    max_fetch_bytes=256 * 1024 * 1024,            # 256 MiB hard cap
    speculative_retry_multiplier=2.0,             # hedge at 2x median
    max_speculative_hedges=4,                     # max hedge requests
)

Object Lifecycle

Uploaded objects persist indefinitely — vgi-rpc does not delete them. Configure storage-level cleanup policies to auto-expire old data.

S3 lifecycle rule:

aws s3api put-bucket-lifecycle-configuration \
  --bucket MY_BUCKET \
  --lifecycle-configuration '{
    "Rules": [{
      "ID": "expire-vgi-rpc",
      "Filter": {"Prefix": "rpc-data/"},
      "Status": "Enabled",
      "Expiration": {"Days": 1}
    }]
  }'

GCS lifecycle rule:

gsutil lifecycle set <(cat <<EOF
{"rule": [{"action": {"type": "Delete"},
           "condition": {"age": 1, "matchesPrefix": ["rpc-data/"]}}]}
EOF
) gs://MY_BUCKET

URL Validation

By default, external location URLs are validated with https_only_validator, which rejects non-HTTPS URLs. This prevents pointer batches from being used to probe internal networks. You can provide a custom validator via ExternalLocationConfig.url_validator.

API Reference

Configuration

ExternalLocationConfig dataclass

ExternalLocationConfig(
    storage: ExternalStorage | None = None,
    externalize_threshold_bytes: int = 1048576,
    max_retries: int = 2,
    retry_delay_seconds: float = 0.5,
    fetch_config: FetchConfig = FetchConfig(),
    compression: Compression | None = None,
    url_validator: (
        Callable[[str], None] | None
    ) = https_only_validator,
)

Configuration for ExternalLocation batch support.

.. note:: Trust boundary — When resolution is enabled, the client will fetch URLs embedded in server responses. The default url_validator (https_only_validator) rejects non-HTTPS URLs, but callers should still only connect to trusted RPC servers.

ATTRIBUTE DESCRIPTION
storage

Storage backend for uploading externalized data. Required for production (writing); not needed for resolution-only (reading) scenarios.

TYPE: ExternalStorage | None

externalize_threshold_bytes

Data batch buffer size above which to externalize. Uses batch.get_total_buffer_size() as a fast O(1) estimate.

TYPE: int

max_retries

Number of fetch retries (total attempts = max_retries + 1, capped at 3).

TYPE: int

retry_delay_seconds

Delay between retry attempts.

TYPE: float

fetch_config

Fetch configuration controlling parallelism, timeouts, and size limits. Defaults to sensible values.

TYPE: FetchConfig

compression

Compression settings for externalized data. None disables compression (default).

TYPE: Compression | None

url_validator

Callback invoked before fetching external URLs. Should raise ValueError to reject. Defaults to https_only_validator (HTTPS-only). Set to None to disable validation.

TYPE: Callable[[str], None] | None

Compression dataclass

Compression(
    algorithm: Literal["zstd"] = "zstd", level: int = 3
)

Compression settings for externalized data.

ATTRIBUTE DESCRIPTION
algorithm

Compression algorithm. Currently only "zstd" is supported.

TYPE: Literal['zstd']

level

Compression level (1-22 for zstd).

TYPE: int

FetchConfig dataclass

FetchConfig(
    parallel_threshold_bytes: int = 64 * 1024 * 1024,
    chunk_size_bytes: int = 8 * 1024 * 1024,
    max_parallel_requests: int = 8,
    timeout_seconds: float = 60.0,
    max_fetch_bytes: int = 256 * 1024 * 1024,
    speculative_retry_multiplier: float = 2.0,
    max_speculative_hedges: int = 4,
)

Configuration for parallel range-request fetching.

Maintains a persistent aiohttp.ClientSession backed by a daemon thread. Use as a context manager or call close() to release resources.

ATTRIBUTE DESCRIPTION
parallel_threshold_bytes

Below this size, use a single GET.

TYPE: int

chunk_size_bytes

Size of each Range request chunk.

TYPE: int

max_parallel_requests

Semaphore limit on concurrent requests.

TYPE: int

timeout_seconds

Overall deadline for the fetch.

TYPE: float

max_fetch_bytes

Hard cap on total download size.

TYPE: int

speculative_retry_multiplier

Launch a hedge request for chunks taking longer than multiplier * median. Set to 0 to disable hedging.

TYPE: float

max_speculative_hedges

Maximum number of hedge requests per fetch. Prevents runaway request amplification under adversarial or slow-backend conditions. 0 means no limit (bounded only by chunk count).

TYPE: int

close

close() -> None

Close the pooled session, stop the event loop, and join the thread.

Source code in vgi_rpc/external_fetch.py
def close(self) -> None:
    """Close the pooled session, stop the event loop, and join the thread."""
    pool = self._pool
    with pool.lock:
        if pool.session is not None and pool.loop is not None and not pool.loop.is_closed():
            asyncio.run_coroutine_threadsafe(pool.session.close(), pool.loop).result(timeout=5)
        pool.session = None
        if pool.loop is not None and not pool.loop.is_closed():
            pool.loop.call_soon_threadsafe(pool.loop.stop)
        if pool.thread is not None:
            pool.thread.join(timeout=5)
        pool.loop = None
        pool.thread = None

__del__

__del__() -> None

Safety net: close pool on garbage collection.

Source code in vgi_rpc/external_fetch.py
def __del__(self) -> None:
    """Safety net: close pool on garbage collection."""
    with contextlib.suppress(Exception):
        self.close()

__enter__

__enter__() -> FetchConfig

Enter context manager.

Source code in vgi_rpc/external_fetch.py
def __enter__(self) -> FetchConfig:
    """Enter context manager."""
    return self

__exit__

__exit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Exit context manager, closing the pool.

Source code in vgi_rpc/external_fetch.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Exit context manager, closing the pool."""
    self.close()

Storage Protocol

ExternalStorage

Bases: Protocol

Pluggable storage interface for externalizing large batches.

Implementations must be thread-safe — upload() may be called concurrently from different server threads.

.. important:: Object lifecycle — vgi-rpc does not manage the lifecycle of uploaded objects. Data uploaded via upload() or generate_upload_url() persists indefinitely unless the server operator configures cleanup. Use storage-level lifecycle rules (e.g. S3 Lifecycle Policies, GCS Object Lifecycle Management) or bucket-level TTLs to automatically expire and delete stale objects.

upload

upload(
    data: bytes,
    schema: Schema,
    *,
    content_encoding: str | None = None
) -> str

Upload serialized IPC data and return a URL for retrieval.

The uploaded object is not automatically deleted — server operators are responsible for configuring object cleanup via storage lifecycle rules or TTLs.

PARAMETER DESCRIPTION
data

Complete Arrow IPC stream bytes.

TYPE: bytes

schema

The schema of the data being uploaded.

TYPE: Schema

content_encoding

Optional encoding applied to data (e.g. "zstd"). Backends should store this so that fetchers can decompress correctly.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
str

A URL (typically pre-signed) that can be fetched to retrieve

str

the uploaded data.

Source code in vgi_rpc/external.py
def upload(self, data: bytes, schema: pa.Schema, *, content_encoding: str | None = None) -> str:
    """Upload serialized IPC data and return a URL for retrieval.

    The uploaded object is not automatically deleted — server
    operators are responsible for configuring object cleanup via
    storage lifecycle rules or TTLs.

    Args:
        data: Complete Arrow IPC stream bytes.
        schema: The schema of the data being uploaded.
        content_encoding: Optional encoding applied to *data*
            (e.g. ``"zstd"``).  Backends should store this so that
            fetchers can decompress correctly.

    Returns:
        A URL (typically pre-signed) that can be fetched to retrieve
        the uploaded data.

    """
    ...

UploadUrlProvider

Bases: Protocol

Generates pre-signed upload URL pairs.

Implementations must be thread-safe — generate_upload_url() may be called concurrently from different server threads.

.. important:: Object lifecycle — vgi-rpc does not manage the lifecycle of uploaded objects. Data uploaded via these URLs persists indefinitely unless the server operator configures cleanup. Use storage-level lifecycle rules (e.g. S3 Lifecycle Policies, GCS Object Lifecycle Management) or bucket-level TTLs to automatically expire and delete stale objects.

generate_upload_url

generate_upload_url(schema: Schema) -> UploadUrl

Generate a pre-signed upload/download URL pair.

The caller receives time-limited PUT and GET URLs for a new storage object. The uploaded object is not automatically deleted — server operators are responsible for configuring object cleanup via storage lifecycle rules or TTLs.

PARAMETER DESCRIPTION
schema

The Arrow schema of the data to be uploaded. Backends may use this for content-type or metadata hints.

TYPE: Schema

RETURNS DESCRIPTION
UploadUrl

An UploadUrl with PUT and GET URLs for the same object.

Source code in vgi_rpc/external.py
def generate_upload_url(self, schema: pa.Schema) -> UploadUrl:
    """Generate a pre-signed upload/download URL pair.

    The caller receives time-limited PUT and GET URLs for a new
    storage object.  **The uploaded object is not automatically
    deleted** — server operators are responsible for configuring
    object cleanup via storage lifecycle rules or TTLs.

    Args:
        schema: The Arrow schema of the data to be uploaded.
            Backends may use this for content-type or metadata hints.

    Returns:
        An ``UploadUrl`` with PUT and GET URLs for the same object.

    """
    ...

UploadUrl dataclass

UploadUrl(
    upload_url: str, download_url: str, expires_at: datetime
)

Pre-signed URL pair for client-side data upload.

S3/GCS pre-signed URLs are signed per HTTP method, so a PUT URL cannot be used for GET — hence two URLs for the same storage object.

ATTRIBUTE DESCRIPTION
upload_url

Pre-signed PUT URL for uploading data.

TYPE: str

download_url

Pre-signed GET URL for downloading the uploaded data.

TYPE: str

expires_at

Expiration time for the pre-signed URLs (UTC).

TYPE: datetime

Validation

https_only_validator

https_only_validator(url: str) -> None

Reject URLs that do not use the https scheme.

This is the default url_validator for ExternalLocationConfig. It prevents the client from issuing requests over plain HTTP or other schemes (ftp, file, etc.) when resolving external-location pointers.

PARAMETER DESCRIPTION
url

The URL to validate.

TYPE: str

RAISES DESCRIPTION
ValueError

If the URL scheme is not https.

Source code in vgi_rpc/external.py
def https_only_validator(url: str) -> None:
    """Reject URLs that do not use the ``https`` scheme.

    This is the default ``url_validator`` for ``ExternalLocationConfig``.
    It prevents the client from issuing requests over plain HTTP or other
    schemes (``ftp``, ``file``, etc.) when resolving external-location
    pointers.

    Args:
        url: The URL to validate.

    Raises:
        ValueError: If the URL scheme is not ``https``.

    """
    from urllib.parse import urlparse

    parsed = urlparse(url)
    if parsed.scheme != "https":
        raise ValueError(f"URL scheme '{parsed.scheme}' not allowed (only 'https' is permitted)")

S3 Backend

Requires pip install vgi-rpc[s3].

S3Storage dataclass

S3Storage(
    bucket: str,
    prefix: str = "vgi-rpc/",
    presign_expiry_seconds: int = 3600,
    region_name: str | None = None,
    endpoint_url: str | None = None,
)

S3-backed ExternalStorage using boto3.

.. important:: Object lifecycle — uploaded objects persist indefinitely. Configure an S3 Lifecycle Policy_ on the bucket to expire objects under prefix (default vgi-rpc/) after a suitable retention period. See :mod:vgi_rpc.external for full details and examples.

ATTRIBUTE DESCRIPTION
bucket

S3 bucket name.

TYPE: str

prefix

Key prefix for uploaded objects.

TYPE: str

presign_expiry_seconds

Lifetime of pre-signed GET URLs.

TYPE: int

region_name

AWS region (None uses boto3 default).

TYPE: str | None

endpoint_url

Custom endpoint for S3-compatible services (e.g. MinIO, LocalStack).

TYPE: str | None

.. _S3 Lifecycle Policy: https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lifecycle-mgmt.html

upload

upload(
    data: bytes,
    schema: Schema,
    *,
    content_encoding: str | None = None
) -> str

Upload IPC data to S3 and return a pre-signed GET URL.

PARAMETER DESCRIPTION
data

Serialized Arrow IPC stream bytes.

TYPE: bytes

schema

Schema of the data (unused but required by protocol).

TYPE: Schema

content_encoding

Optional encoding applied to data (e.g. "zstd").

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
str

A pre-signed URL that can be used to download the data.

Source code in vgi_rpc/s3.py
def upload(self, data: bytes, schema: pa.Schema, *, content_encoding: str | None = None) -> str:
    """Upload IPC data to S3 and return a pre-signed GET URL.

    Args:
        data: Serialized Arrow IPC stream bytes.
        schema: Schema of the data (unused but required by protocol).
        content_encoding: Optional encoding applied to *data*
            (e.g. ``"zstd"``).

    Returns:
        A pre-signed URL that can be used to download the data.

    """
    client = self._get_client()

    ext = ".arrow.zst" if content_encoding == "zstd" else ".arrow"
    key = f"{self.prefix}{uuid.uuid4().hex}{ext}"

    put_kwargs: dict[str, str | bytes] = {
        "Bucket": self.bucket,
        "Key": key,
        "Body": data,
        "ContentType": "application/octet-stream",
    }
    if content_encoding is not None:
        put_kwargs["ContentEncoding"] = content_encoding

    t0 = time.monotonic()
    try:
        client.put_object(**put_kwargs)
    except Exception as exc:
        _logger.error(
            "S3 upload failed: bucket=%s key=%s",
            self.bucket,
            key,
            exc_info=True,
            extra={"bucket": self.bucket, "key": key, "error_type": type(exc).__name__},
        )
        raise
    duration_ms = (time.monotonic() - t0) * 1000

    url: str = client.generate_presigned_url(
        "get_object",
        Params={"Bucket": self.bucket, "Key": key},
        ExpiresIn=self.presign_expiry_seconds,
    )
    _logger.debug(
        "S3 upload completed: bucket=%s key=%s (%d bytes, %.1fms)",
        self.bucket,
        key,
        len(data),
        duration_ms,
        extra={"bucket": self.bucket, "key": key, "size_bytes": len(data), "duration_ms": round(duration_ms, 2)},
    )
    return url

generate_upload_url

generate_upload_url(schema: Schema) -> UploadUrl

Generate pre-signed PUT and GET URLs for client-side upload.

The created S3 object is not automatically deleted. Configure S3 Lifecycle Policies on the bucket to expire objects after a suitable retention period.

PARAMETER DESCRIPTION
schema

The Arrow schema of the data to be uploaded (unused but available for metadata hints).

TYPE: Schema

RETURNS DESCRIPTION
UploadUrl

An UploadUrl with PUT and GET pre-signed URLs for the

UploadUrl

same S3 object.

Source code in vgi_rpc/s3.py
def generate_upload_url(self, schema: pa.Schema) -> UploadUrl:
    """Generate pre-signed PUT and GET URLs for client-side upload.

    The created S3 object is not automatically deleted.  Configure
    S3 Lifecycle Policies on the bucket to expire objects after
    a suitable retention period.

    Args:
        schema: The Arrow schema of the data to be uploaded
            (unused but available for metadata hints).

    Returns:
        An ``UploadUrl`` with PUT and GET pre-signed URLs for the
        same S3 object.

    """
    client = self._get_client()
    key = f"{self.prefix}{uuid.uuid4().hex}.arrow"
    params = {"Bucket": self.bucket, "Key": key}
    expires_at = datetime.now(UTC) + timedelta(seconds=self.presign_expiry_seconds)

    put_url: str = client.generate_presigned_url(
        "put_object",
        Params=params,
        ExpiresIn=self.presign_expiry_seconds,
    )
    get_url: str = client.generate_presigned_url(
        "get_object",
        Params=params,
        ExpiresIn=self.presign_expiry_seconds,
    )

    _logger.debug(
        "S3 upload URL generated: bucket=%s key=%s",
        self.bucket,
        key,
        extra={"bucket": self.bucket, "key": key},
    )
    return UploadUrl(upload_url=put_url, download_url=get_url, expires_at=expires_at)

GCS Backend

Requires pip install vgi-rpc[gcs].

GCSStorage dataclass

GCSStorage(
    bucket: str,
    prefix: str = "vgi-rpc/",
    presign_expiry_seconds: int = 3600,
    project: str | None = None,
)

GCS-backed ExternalStorage using google-cloud-storage.

.. important:: Object lifecycle — uploaded objects persist indefinitely. Configure Object Lifecycle Management_ on the bucket to delete objects under prefix (default vgi-rpc/) after a suitable retention period. See :mod:vgi_rpc.external for full details and examples.

ATTRIBUTE DESCRIPTION
bucket

GCS bucket name.

TYPE: str

prefix

Key prefix for uploaded objects.

TYPE: str

presign_expiry_seconds

Lifetime of signed GET URLs.

TYPE: int

project

GCS project ID (None uses Application Default Credentials default project).

TYPE: str | None

.. _Object Lifecycle Management: https://cloud.google.com/storage/docs/lifecycle

upload

upload(
    data: bytes,
    schema: Schema,
    *,
    content_encoding: str | None = None
) -> str

Upload IPC data to GCS and return a signed GET URL.

PARAMETER DESCRIPTION
data

Serialized Arrow IPC stream bytes.

TYPE: bytes

schema

Schema of the data (unused but required by protocol).

TYPE: Schema

content_encoding

Optional encoding applied to data (e.g. "zstd").

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
str

A signed URL that can be used to download the data.

Source code in vgi_rpc/gcs.py
def upload(self, data: bytes, schema: pa.Schema, *, content_encoding: str | None = None) -> str:
    """Upload IPC data to GCS and return a signed GET URL.

    Args:
        data: Serialized Arrow IPC stream bytes.
        schema: Schema of the data (unused but required by protocol).
        content_encoding: Optional encoding applied to *data*
            (e.g. ``"zstd"``).

    Returns:
        A signed URL that can be used to download the data.

    """
    client = self._get_client()
    bucket = client.bucket(self.bucket)
    ext = ".arrow.zst" if content_encoding == "zstd" else ".arrow"
    blob_name = f"{self.prefix}{uuid.uuid4().hex}{ext}"
    blob = bucket.blob(blob_name)
    if content_encoding is not None:
        blob.content_encoding = content_encoding
    t0 = time.monotonic()
    try:
        blob.upload_from_string(data, content_type="application/octet-stream")
    except Exception as exc:
        _logger.error(
            "GCS upload failed: bucket=%s key=%s",
            self.bucket,
            blob_name,
            exc_info=True,
            extra={"bucket": self.bucket, "key": blob_name, "error_type": type(exc).__name__},
        )
        raise
    duration_ms = (time.monotonic() - t0) * 1000

    url: str = blob.generate_signed_url(
        version="v4",
        expiration=timedelta(seconds=self.presign_expiry_seconds),
        method="GET",
    )
    _logger.debug(
        "GCS upload completed: bucket=%s key=%s (%d bytes, %.1fms)",
        self.bucket,
        blob_name,
        len(data),
        duration_ms,
        extra={
            "bucket": self.bucket,
            "key": blob_name,
            "size_bytes": len(data),
            "duration_ms": round(duration_ms, 2),
        },
    )
    return url

generate_upload_url

generate_upload_url(schema: Schema) -> UploadUrl

Generate signed PUT and GET URLs for client-side upload.

The created GCS object is not automatically deleted. Configure GCS Object Lifecycle Management on the bucket to expire objects after a suitable retention period.

PARAMETER DESCRIPTION
schema

The Arrow schema of the data to be uploaded (unused but available for metadata hints).

TYPE: Schema

RETURNS DESCRIPTION
UploadUrl

An UploadUrl with PUT and GET signed URLs for the

UploadUrl

same GCS object.

Source code in vgi_rpc/gcs.py
def generate_upload_url(self, schema: pa.Schema) -> UploadUrl:
    """Generate signed PUT and GET URLs for client-side upload.

    The created GCS object is not automatically deleted.  Configure
    GCS Object Lifecycle Management on the bucket to expire objects
    after a suitable retention period.

    Args:
        schema: The Arrow schema of the data to be uploaded
            (unused but available for metadata hints).

    Returns:
        An ``UploadUrl`` with PUT and GET signed URLs for the
        same GCS object.

    """
    client = self._get_client()
    bucket = client.bucket(self.bucket)
    blob_name = f"{self.prefix}{uuid.uuid4().hex}.arrow"
    blob = bucket.blob(blob_name)
    expiration = timedelta(seconds=self.presign_expiry_seconds)
    expires_at = datetime.now(UTC) + expiration

    put_url: str = blob.generate_signed_url(
        version="v4",
        expiration=expiration,
        method="PUT",
        content_type="application/octet-stream",
    )
    get_url: str = blob.generate_signed_url(
        version="v4",
        expiration=expiration,
        method="GET",
    )

    _logger.debug(
        "GCS upload URL generated: bucket=%s key=%s",
        self.bucket,
        blob_name,
        extra={"bucket": self.bucket, "key": blob_name},
    )
    return UploadUrl(upload_url=put_url, download_url=get_url, expires_at=expires_at)