Skip to content

API reference

Auto-generated from source docstrings. Re-regenerated on every docs build.

bqemulator

bqemulator — a local emulator for Google BigQuery.

This is the root package. Public re-exports here are intentionally minimal; consumers should import from dedicated submodules.

Typical uses

from bqemulator import version from bqemulator.server import EmulatorServer from bqemulator.config import Settings

bqemulator.config

bqemulator.config

Settings — the single source of runtime configuration.

Configuration sources in priority order (high to low):

  1. Explicit constructor kwargs (set by CLI flags in :mod:bqemulator.cli).
  2. Environment variables prefixed BQEMU_.
  3. .bqemu.toml file in the current working directory.
  4. Built-in defaults.

The settings object is constructed once at startup by the composition root (:mod:bqemulator.server) and injected into every subsystem that needs it. Never access settings via a module-level global.

PersistenceMode

Bases: StrEnum

How the emulator persists its DuckDB data.

Source code in src/bqemulator/config.py
class PersistenceMode(StrEnum):
    """How the emulator persists its DuckDB data."""

    EPHEMERAL = "ephemeral"  # :memory: — fastest start, no persistence
    PERSISTENT = "persistent"  # file on disk, survives restart
    IMPORT = "import"  # file + schema sync from real project

LogLevel

Bases: StrEnum

Supported log levels.

Source code in src/bqemulator/config.py
class LogLevel(StrEnum):
    """Supported log levels."""

    TRACE = "trace"
    DEBUG = "debug"
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"
    CRITICAL = "critical"

LogFormat

Bases: StrEnum

Log output format.

Source code in src/bqemulator/config.py
class LogFormat(StrEnum):
    """Log output format."""

    JSON = "json"
    CONSOLE = "console"

Settings

Bases: BaseSettings

Runtime settings for bqemulator.

Attributes map 1:1 to BQEMU_* env vars via the prefix and to CLI flags via the :mod:bqemulator.cli definitions.

Source code in src/bqemulator/config.py
class Settings(BaseSettings):
    """Runtime settings for bqemulator.

    Attributes map 1:1 to ``BQEMU_*`` env vars via the prefix and to CLI
    flags via the :mod:`bqemulator.cli` definitions.
    """

    model_config = SettingsConfigDict(
        env_prefix="BQEMU_",
        env_file=None,
        case_sensitive=False,
        extra="ignore",
        frozen=False,
        validate_assignment=True,
    )

    # -- Network ------------------------------------------------------------
    rest_host: str = Field(default="127.0.0.1", description="REST bind host")
    rest_port: int = Field(
        default=9050,
        ge=0,
        le=65535,
        description="REST bind port (0 = random free port)",
    )
    grpc_host: str = Field(default="127.0.0.1", description="gRPC bind host")
    grpc_port: int = Field(
        default=9060,
        ge=0,
        le=65535,
        description="gRPC bind port (0 = random free port)",
    )

    # -- Persistence --------------------------------------------------------
    persistence_mode: PersistenceMode = Field(
        default=PersistenceMode.EPHEMERAL,
        description="How DuckDB data persists across restarts",
    )
    data_dir: Path | None = Field(
        default=None,
        description="Directory for persistent DuckDB file and working state",
    )

    # -- Emulation --------------------------------------------------------
    default_project_id: str = Field(
        default="test-project",
        description="Default project id used when a request omits it",
    )
    gcs_local_root: Path | None = Field(
        default=None,
        description="Local directory that 'gs://' URIs are resolved under",
    )
    max_concurrent_jobs: int = Field(
        default=8,
        ge=1,
        le=1024,
        description="Maximum concurrent query/load/extract/copy jobs",
    )
    query_cache_ttl_seconds: int = Field(
        default=86400,
        ge=0,
        description="Query result cache TTL (seconds). 0 disables the cache.",
    )
    time_travel_retention_days: int = Field(
        default=7,
        ge=0,
        le=90,
        description="How long table snapshots are retained for time travel",
    )
    write_api_max_request_bytes: int = Field(
        default=10 * 1024 * 1024,
        ge=1024,
        le=1024 * 1024 * 1024,
        description=(
            "Maximum serialized AppendRowsRequest size in bytes (BigQuery "
            "caps production at 10 MiB; reject anything larger with "
            "RESOURCE_EXHAUSTED)."
        ),
    )
    write_api_max_stream_rows: int = Field(
        default=10_000_000,
        ge=1,
        description=(
            "Maximum number of rows a PENDING/BUFFERED stream may buffer "
            "before the server returns RESOURCE_EXHAUSTED on further "
            "AppendRows. Protects the server from unbounded memory growth "
            "when a client never commits or flushes."
        ),
    )
    udf_js_timeout_ms: int = Field(
        default=5000,
        ge=1,
        le=600_000,
        description=(
            "Per-invocation CPU timeout (milliseconds) for JavaScript UDFs. "
            "BigQuery defaults to 5 s; the real service caps at 60 min. We "
            "cap at 10 min to keep the emulator responsive."
        ),
    )
    udf_js_memory_bytes: int = Field(
        default=256 * 1024 * 1024,
        ge=16 * 1024 * 1024,
        le=4 * 1024 * 1024 * 1024,
        description=(
            "Per-routine V8 heap cap (bytes) for JavaScript UDFs. "
            "256 MiB matches BigQuery's production cap."
        ),
    )
    scripting_max_statements: int = Field(
        default=10_000,
        ge=1,
        description=(
            "Maximum number of statements a single script may execute. "
            "Protects the server from run-away loops in user scripts."
        ),
    )
    scripting_max_loop_iterations: int = Field(
        default=1_000_000,
        ge=1,
        description=(
            "Maximum number of iterations a single loop may run before "
            "the interpreter raises a QuotaExceededError. Per-loop, not "
            "per-script."
        ),
    )
    enable_format_extensions: bool = Field(
        default=True,
        description=(
            "Install and load DuckDB's ``avro`` extension at engine boot "
            "(needed for Avro load/extract). Disable in constrained "
            "deployments that cannot reach the DuckDB extension repository "
            "at ``extensions.duckdb.org``. When disabled, ``AVRO`` load + "
            "extract jobs surface ``UnsupportedFeatureError``; ``ORC`` "
            "load continues to work because it uses the Python "
            "``pyorc`` package (optional ``[orc]`` extra) rather than a "
            "DuckDB extension."
        ),
    )

    # -- Upload host (multipart / resumable upload endpoints) -------------
    upload_max_bytes: int = Field(
        default=1024 * 1024 * 1024,
        ge=1024,
        le=10 * 1024 * 1024 * 1024,
        description=(
            "Maximum total bytes accepted on a single ``/upload/bigquery/"
            "v2`` request (media, multipart, or all resumable chunks "
            "combined). Requests exceeding the cap are rejected with HTTP "
            "413 before the bytes are materialised to disk. BigQuery's "
            "production cap is 5 TiB; the emulator default of 1 GiB keeps "
            "local CI runs bounded. See ADR 0029."
        ),
    )
    upload_session_ttl_seconds: int = Field(
        default=3600,
        ge=60,
        le=24 * 3600,
        description=(
            "How long a resumable upload session is retained after its "
            "last chunk before being evicted. Sessions older than this "
            "are pruned lazily on the next request that touches the "
            "manager (no background sweeper). See ADR 0029."
        ),
    )
    upload_staging_dir: Path | None = Field(
        default=None,
        description=(
            "Directory used to materialise inbound upload bodies before "
            "the load executor consumes them. Defaults to "
            "``<system tempdir>/bqemu_uploads`` when unset. Files are "
            "deleted in a ``finally`` arm whether the load succeeds or "
            "fails. See ADR 0029."
        ),
    )

    # -- Observability ------------------------------------------------------
    log_level: LogLevel = LogLevel.INFO
    log_format: LogFormat = LogFormat.JSON
    metrics_enabled: bool = True
    tracing_enabled: bool = False
    otlp_endpoint: str | None = Field(
        default=None,
        description="OTLP gRPC endpoint for trace export (enables tracing when set)",
    )

    # -- Admin --------------------------------------------------------------
    admin_enabled: bool = False

    @field_validator("data_dir", mode="after")
    @classmethod
    def _expand_data_dir(cls, value: Path | None) -> Path | None:
        if value is None:
            return None
        return value.expanduser().resolve()

    @field_validator("gcs_local_root", mode="after")
    @classmethod
    def _expand_gcs_root(cls, value: Path | None) -> Path | None:
        if value is None:
            return None
        return value.expanduser().resolve()

    @field_validator("upload_staging_dir", mode="after")
    @classmethod
    def _expand_upload_staging_dir(cls, value: Path | None) -> Path | None:
        if value is None:
            return None
        return value.expanduser().resolve()

    def duckdb_path(self) -> str:
        """Return the DuckDB connection string for this configuration.

        ``':memory:'`` for ephemeral, otherwise the file path under
        ``data_dir``. Raises :class:`ValueError` if ``data_dir`` is required
        but missing.
        """
        if self.persistence_mode is PersistenceMode.EPHEMERAL:
            return ":memory:"
        if self.data_dir is None:
            raise ValueError(f"persistence_mode={self.persistence_mode.value} requires data_dir")
        return str(self.data_dir / "bqemulator.duckdb")

duckdb_path

duckdb_path() -> str

Return the DuckDB connection string for this configuration.

':memory:' for ephemeral, otherwise the file path under data_dir. Raises :class:ValueError if data_dir is required but missing.

Source code in src/bqemulator/config.py
def duckdb_path(self) -> str:
    """Return the DuckDB connection string for this configuration.

    ``':memory:'`` for ephemeral, otherwise the file path under
    ``data_dir``. Raises :class:`ValueError` if ``data_dir`` is required
    but missing.
    """
    if self.persistence_mode is PersistenceMode.EPHEMERAL:
        return ":memory:"
    if self.data_dir is None:
        raise ValueError(f"persistence_mode={self.persistence_mode.value} requires data_dir")
    return str(self.data_dir / "bqemulator.duckdb")

bqemulator.domain.errors

bqemulator.domain.errors

Domain-error hierarchy.

Every expected error in the emulator inherits from :class:DomainError and maps to the exact JSON shape the real BigQuery service returns via :meth:DomainError.to_bigquery_error.

Reference: https://cloud.google.com/bigquery/docs/error-messages

The HTTP status code, BigQuery reason string, and gRPC canonical status code are attached at the subclass level and used by adapter layers to render responses consistently.

ErrorDetail dataclass

A single error entry in BigQuery's ErrorProto.errors array.

Source code in src/bqemulator/domain/errors.py
@dataclass(slots=True)
class ErrorDetail:
    """A single error entry in BigQuery's ``ErrorProto.errors`` array."""

    reason: str
    message: str
    domain: str = "global"
    location: str | None = None
    location_type: str | None = None

    def to_dict(self) -> dict[str, Any]:
        """Render as a dict matching BigQuery's JSON ``ErrorProto`` shape."""
        out: dict[str, Any] = {
            "domain": self.domain,
            "reason": self.reason,
            "message": self.message,
        }
        if self.location is not None:
            out["location"] = self.location
        if self.location_type is not None:
            out["locationType"] = self.location_type
        return out

to_dict

to_dict() -> dict[str, Any]

Render as a dict matching BigQuery's JSON ErrorProto shape.

Source code in src/bqemulator/domain/errors.py
def to_dict(self) -> dict[str, Any]:
    """Render as a dict matching BigQuery's JSON ``ErrorProto`` shape."""
    out: dict[str, Any] = {
        "domain": self.domain,
        "reason": self.reason,
        "message": self.message,
    }
    if self.location is not None:
        out["location"] = self.location
    if self.location_type is not None:
        out["locationType"] = self.location_type
    return out

DomainError

Bases: Exception

Base class for all expected domain errors.

Subclasses define three class variables that adapters use to render the error:

  • http_status — HTTP status code for REST responses.
  • bq_reason — BigQuery reason string. Matches the values the real service returns in ErrorProto.reason.
  • grpc_status_name — canonical gRPC status name (INVALID_ARGUMENT, NOT_FOUND, etc.).
Source code in src/bqemulator/domain/errors.py
class DomainError(Exception):
    """Base class for all expected domain errors.

    Subclasses define three class variables that adapters use to render the
    error:

    * ``http_status`` — HTTP status code for REST responses.
    * ``bq_reason`` — BigQuery ``reason`` string. Matches the values the
      real service returns in ``ErrorProto.reason``.
    * ``grpc_status_name`` — canonical gRPC status name
      (``INVALID_ARGUMENT``, ``NOT_FOUND``, etc.).
    """

    http_status: ClassVar[int] = 500
    bq_reason: ClassVar[str] = "internalError"
    grpc_status_name: ClassVar[str] = "INTERNAL"

    def __init__(
        self,
        message: str,
        *,
        details: list[ErrorDetail] | None = None,
        location: str | None = None,
    ) -> None:
        super().__init__(message)
        self.message = message
        self.details: list[ErrorDetail] = details or []
        self.location = location

    def to_bigquery_error(self) -> dict[str, Any]:
        """Render as BigQuery's JSON error shape.

        Matches::

            {
                "error": {
                    "code": 400,
                    "message": "...",
                    "errors": [{"domain": "global", "reason": "...", "message": "..."}],
                    "status": "INVALID_ARGUMENT",
                }
            }
        """
        errors: list[dict[str, Any]]
        if self.details:
            errors = [d.to_dict() for d in self.details]
        else:
            errors = [
                ErrorDetail(
                    reason=self.bq_reason,
                    message=self.message,
                    location=self.location,
                ).to_dict()
            ]
        return {
            "error": {
                "code": self.http_status,
                "message": self.message,
                "errors": errors,
                "status": self.grpc_status_name,
            }
        }

to_bigquery_error

to_bigquery_error() -> dict[str, Any]

Render as BigQuery's JSON error shape.

Matches::

{
    "error": {
        "code": 400,
        "message": "...",
        "errors": [{"domain": "global", "reason": "...", "message": "..."}],
        "status": "INVALID_ARGUMENT",
    }
}
Source code in src/bqemulator/domain/errors.py
def to_bigquery_error(self) -> dict[str, Any]:
    """Render as BigQuery's JSON error shape.

    Matches::

        {
            "error": {
                "code": 400,
                "message": "...",
                "errors": [{"domain": "global", "reason": "...", "message": "..."}],
                "status": "INVALID_ARGUMENT",
            }
        }
    """
    errors: list[dict[str, Any]]
    if self.details:
        errors = [d.to_dict() for d in self.details]
    else:
        errors = [
            ErrorDetail(
                reason=self.bq_reason,
                message=self.message,
                location=self.location,
            ).to_dict()
        ]
    return {
        "error": {
            "code": self.http_status,
            "message": self.message,
            "errors": errors,
            "status": self.grpc_status_name,
        }
    }

InvalidQueryError

Bases: DomainError

Malformed SQL, unknown function, or semantic analysis failure.

Source code in src/bqemulator/domain/errors.py
class InvalidQueryError(DomainError):
    """Malformed SQL, unknown function, or semantic analysis failure."""

    http_status = 400
    bq_reason = "invalidQuery"
    grpc_status_name = "INVALID_ARGUMENT"

ValidationError

Bases: DomainError

Request failed schema or semantic validation (non-SQL).

Source code in src/bqemulator/domain/errors.py
class ValidationError(DomainError):
    """Request failed schema or semantic validation (non-SQL)."""

    http_status = 400
    bq_reason = "invalid"
    grpc_status_name = "INVALID_ARGUMENT"

NotFoundError

Bases: DomainError

A resource (dataset, table, job, routine, model) was not found.

Source code in src/bqemulator/domain/errors.py
class NotFoundError(DomainError):
    """A resource (dataset, table, job, routine, model) was not found."""

    http_status = 404
    bq_reason = "notFound"
    grpc_status_name = "NOT_FOUND"

AlreadyExistsError

Bases: DomainError

Create request targeted a resource that already exists.

Source code in src/bqemulator/domain/errors.py
class AlreadyExistsError(DomainError):
    """Create request targeted a resource that already exists."""

    http_status = 409
    bq_reason = "duplicate"
    grpc_status_name = "ALREADY_EXISTS"

PermissionDeniedError

Bases: DomainError

Row-access policy, authorized view, or other policy check failed.

The emulator does not enforce IAM, but it does enforce row-access policies on configured tables. This is raised when a query would return rows the caller is not permitted to see.

Source code in src/bqemulator/domain/errors.py
class PermissionDeniedError(DomainError):
    """Row-access policy, authorized view, or other policy check failed.

    The emulator does not enforce IAM, but it does enforce row-access
    policies on configured tables. This is raised when a query would
    return rows the caller is not permitted to see.
    """

    http_status = 403
    bq_reason = "accessDenied"
    grpc_status_name = "PERMISSION_DENIED"

QuotaExceededError

Bases: DomainError

A configurable emulator quota was exceeded (e.g. max concurrent jobs).

Source code in src/bqemulator/domain/errors.py
class QuotaExceededError(DomainError):
    """A configurable emulator quota was exceeded (e.g. max concurrent jobs)."""

    http_status = 429
    bq_reason = "quotaExceeded"
    grpc_status_name = "RESOURCE_EXHAUSTED"

UnsupportedFeatureError

Bases: DomainError

A feature explicitly out of scope for v1 was invoked.

Raised for BigQuery ML statements, scheduled queries, Data Transfer Service operations, and any other feature enumerated in docs/reference/out-of-scope.md.

Source code in src/bqemulator/domain/errors.py
class UnsupportedFeatureError(DomainError):
    """A feature explicitly out of scope for v1 was invoked.

    Raised for BigQuery ML statements, scheduled queries, Data Transfer
    Service operations, and any other feature enumerated in
    ``docs/reference/out-of-scope.md``.
    """

    http_status = 501
    bq_reason = "notImplemented"
    grpc_status_name = "UNIMPLEMENTED"

InternalError

Bases: DomainError

Unexpected condition that should never happen in a healthy build.

Source code in src/bqemulator/domain/errors.py
class InternalError(DomainError):
    """Unexpected condition that should never happen in a healthy build."""

    http_status = 500
    bq_reason = "internalError"
    grpc_status_name = "INTERNAL"

OutOfRangeError

Bases: DomainError

A requested position/time fell outside the valid range.

BigQuery returns outOfRange when FOR SYSTEM_TIME AS OF is called with a timestamp outside the time-travel retention window or before the table existed.

Source code in src/bqemulator/domain/errors.py
class OutOfRangeError(DomainError):
    """A requested position/time fell outside the valid range.

    BigQuery returns ``outOfRange`` when ``FOR SYSTEM_TIME AS OF`` is
    called with a timestamp outside the time-travel retention window or
    before the table existed.
    """

    http_status = 400
    bq_reason = "outOfRange"
    grpc_status_name = "OUT_OF_RANGE"

ResourceRef dataclass

Reference to a BigQuery-style resource (for error messages).

Source code in src/bqemulator/domain/errors.py
@dataclass(slots=True, frozen=True)
class ResourceRef:
    """Reference to a BigQuery-style resource (for error messages)."""

    kind: str  # "dataset" | "table" | "job" | "routine" | "model"
    project_id: str
    dataset_id: str | None = None
    resource_id: str | None = None

    def format(self) -> str:
        """Human-readable ``project.dataset.resource`` form."""
        parts: list[str] = [self.project_id]
        if self.dataset_id is not None:
            parts.append(self.dataset_id)
        if self.resource_id is not None:
            parts.append(self.resource_id)
        return f"{self.kind}:{'.'.join(parts)}"

format

format() -> str

Human-readable project.dataset.resource form.

Source code in src/bqemulator/domain/errors.py
def format(self) -> str:
    """Human-readable ``project.dataset.resource`` form."""
    parts: list[str] = [self.project_id]
    if self.dataset_id is not None:
        parts.append(self.dataset_id)
    if self.resource_id is not None:
        parts.append(self.resource_id)
    return f"{self.kind}:{'.'.join(parts)}"

resource_not_found

resource_not_found(ref: ResourceRef) -> NotFoundError

Helper to raise a consistent 'not found' error for any resource.

Source code in src/bqemulator/domain/errors.py
def resource_not_found(ref: ResourceRef) -> NotFoundError:
    """Helper to raise a consistent 'not found' error for any resource."""
    return NotFoundError(
        f"Not found: {ref.format()}",
        details=[
            ErrorDetail(
                reason="notFound",
                message=f"Not found: {ref.format()}",
            )
        ],
    )

resource_already_exists

resource_already_exists(ref: ResourceRef) -> AlreadyExistsError

Helper to raise a consistent 'already exists' error for any resource.

Source code in src/bqemulator/domain/errors.py
def resource_already_exists(ref: ResourceRef) -> AlreadyExistsError:
    """Helper to raise a consistent 'already exists' error for any resource."""
    return AlreadyExistsError(
        f"Already Exists: {ref.format()}",
        details=[
            ErrorDetail(
                reason="duplicate",
                message=f"Already Exists: {ref.format()}",
            )
        ],
    )

bqemulator.domain.result

bqemulator.domain.result

Result type — explicit success/failure for expected domain outcomes.

Exceptions are reserved for unexpected failures. For expected outcomes (SQL parse error, catalog miss, validation failure), use :class:Result.

Example::

def translate(sql: str) -> Result[str, InvalidQueryError]:
    try:
        return Ok(_translate(sql))
    except sqlglot.errors.ParseError as exc:
        return Err(InvalidQueryError(str(exc)))


match translate(user_sql):
    case Ok(duckdb_sql):
        ...
    case Err(error):
        ...

Result module-attribute

Result = Ok[T] | Err[E]

A disjoint union of :class:Ok and :class:Err.

Use with Python 3.11+ match statements for exhaustive handling.

Ok dataclass

Bases: Generic[T]

Success case of :class:Result.

Source code in src/bqemulator/domain/result.py
@final
@dataclass(slots=True, frozen=True)
class Ok(Generic[T]):
    """Success case of :class:`Result`."""

    value: T

    def is_ok(self) -> bool:
        """Return ``True``."""
        return True

    def is_err(self) -> bool:
        """Return ``False``."""
        return False

    def unwrap(self) -> T:
        """Return the contained value."""
        return self.value

    def map(self, fn: object) -> Ok[object]:
        """Apply ``fn`` to the contained value, returning a new ``Ok``."""
        return Ok(fn(self.value))  # type: ignore[operator]

is_ok

is_ok() -> bool

Return True.

Source code in src/bqemulator/domain/result.py
def is_ok(self) -> bool:
    """Return ``True``."""
    return True

is_err

is_err() -> bool

Return False.

Source code in src/bqemulator/domain/result.py
def is_err(self) -> bool:
    """Return ``False``."""
    return False

unwrap

unwrap() -> T

Return the contained value.

Source code in src/bqemulator/domain/result.py
def unwrap(self) -> T:
    """Return the contained value."""
    return self.value

map

map(fn: object) -> Ok[object]

Apply fn to the contained value, returning a new Ok.

Source code in src/bqemulator/domain/result.py
def map(self, fn: object) -> Ok[object]:
    """Apply ``fn`` to the contained value, returning a new ``Ok``."""
    return Ok(fn(self.value))  # type: ignore[operator]

Err dataclass

Bases: Generic[E]

Failure case of :class:Result.

Source code in src/bqemulator/domain/result.py
@final
@dataclass(slots=True, frozen=True)
class Err(Generic[E]):
    """Failure case of :class:`Result`."""

    error: E

    def is_ok(self) -> bool:
        """Return ``False``."""
        return False

    def is_err(self) -> bool:
        """Return ``True``."""
        return True

    def unwrap(self) -> object:
        """Raise the contained :class:`DomainError`.

        Named ``unwrap`` (rather than ``raise``) to mirror the Ok variant.
        """
        raise self.error

    def map(self, fn: object) -> Err[E]:  # noqa: ARG002
        """Return ``self`` — mapping is a no-op on the failure branch."""
        return self

is_ok

is_ok() -> bool

Return False.

Source code in src/bqemulator/domain/result.py
def is_ok(self) -> bool:
    """Return ``False``."""
    return False

is_err

is_err() -> bool

Return True.

Source code in src/bqemulator/domain/result.py
def is_err(self) -> bool:
    """Return ``True``."""
    return True

unwrap

unwrap() -> object

Raise the contained :class:DomainError.

Named unwrap (rather than raise) to mirror the Ok variant.

Source code in src/bqemulator/domain/result.py
def unwrap(self) -> object:
    """Raise the contained :class:`DomainError`.

    Named ``unwrap`` (rather than ``raise``) to mirror the Ok variant.
    """
    raise self.error

map

map(fn: object) -> Err[E]

Return self — mapping is a no-op on the failure branch.

Source code in src/bqemulator/domain/result.py
def map(self, fn: object) -> Err[E]:  # noqa: ARG002
    """Return ``self`` — mapping is a no-op on the failure branch."""
    return self

bqemulator.domain.ids

bqemulator.domain.ids

Typed identifiers for BigQuery resources.

BigQuery's identifier rules (from the docs):

  • project_id: 6-30 characters; lowercase letters, digits, hyphens; must start with a letter and not end with a hyphen.
  • dataset_id: up to 1024 characters; letters (any case), digits, underscores. No hyphens or dots.
  • table_id: up to 1024 characters; same character set as dataset, plus hyphens and some Unicode letters in certain contexts.
  • job_id: up to 1024 characters; letters, digits, dashes, underscores.
  • routine_id: same as dataset.

We model each as a frozen dataclass with a validating constructor. The raw string is exposed via the value attribute; equality and hashing work as expected.

ProjectId dataclass

A validated BigQuery project id.

Source code in src/bqemulator/domain/ids.py
@dataclass(slots=True, frozen=True)
class ProjectId:
    """A validated BigQuery project id."""

    value: str

    def __post_init__(self) -> None:
        _validate(_PROJECT_RE, self.value, "project")

    def __str__(self) -> str:
        return self.value

DatasetId dataclass

A validated BigQuery dataset id (without project qualification).

Source code in src/bqemulator/domain/ids.py
@dataclass(slots=True, frozen=True)
class DatasetId:
    """A validated BigQuery dataset id (without project qualification)."""

    value: str

    def __post_init__(self) -> None:
        _validate(_DATASET_RE, self.value, "dataset")

    def __str__(self) -> str:
        return self.value

TableId dataclass

A validated BigQuery table id (without dataset qualification).

Source code in src/bqemulator/domain/ids.py
@dataclass(slots=True, frozen=True)
class TableId:
    """A validated BigQuery table id (without dataset qualification)."""

    value: str

    def __post_init__(self) -> None:
        _validate(_TABLE_RE, self.value, "table")

    def __str__(self) -> str:
        return self.value

JobId dataclass

A validated BigQuery job id.

Source code in src/bqemulator/domain/ids.py
@dataclass(slots=True, frozen=True)
class JobId:
    """A validated BigQuery job id."""

    value: str

    def __post_init__(self) -> None:
        _validate(_JOB_RE, self.value, "job")

    def __str__(self) -> str:
        return self.value

RoutineId dataclass

A validated BigQuery routine id (without dataset qualification).

Source code in src/bqemulator/domain/ids.py
@dataclass(slots=True, frozen=True)
class RoutineId:
    """A validated BigQuery routine id (without dataset qualification)."""

    value: str

    def __post_init__(self) -> None:
        _validate(_ROUTINE_RE, self.value, "routine")

    def __str__(self) -> str:
        return self.value

validate_project_id

validate_project_id(value: str) -> str

Return value unchanged if it is a valid project id; else raise.

Source code in src/bqemulator/domain/ids.py
def validate_project_id(value: str) -> str:
    """Return ``value`` unchanged if it is a valid project id; else raise."""
    _validate(_PROJECT_RE, value, "project")
    return value

validate_dataset_id

validate_dataset_id(value: str) -> str

Return value unchanged if it is a valid dataset id; else raise.

Source code in src/bqemulator/domain/ids.py
def validate_dataset_id(value: str) -> str:
    """Return ``value`` unchanged if it is a valid dataset id; else raise."""
    _validate(_DATASET_RE, value, "dataset")
    return value

validate_table_id

validate_table_id(value: str) -> str

Return value unchanged if it is a valid table id; else raise.

Source code in src/bqemulator/domain/ids.py
def validate_table_id(value: str) -> str:
    """Return ``value`` unchanged if it is a valid table id; else raise."""
    _validate(_TABLE_RE, value, "table")
    return value

validate_routine_id

validate_routine_id(value: str) -> str

Return value unchanged if it is a valid routine id; else raise.

Source code in src/bqemulator/domain/ids.py
def validate_routine_id(value: str) -> str:
    """Return ``value`` unchanged if it is a valid routine id; else raise."""
    _validate(_ROUTINE_RE, value, "routine")
    return value

validate_job_id

validate_job_id(value: str) -> str

Return value unchanged if it is a valid job id; else raise.

Source code in src/bqemulator/domain/ids.py
def validate_job_id(value: str) -> str:
    """Return ``value`` unchanged if it is a valid job id; else raise."""
    _validate(_JOB_RE, value, "job")
    return value

validate_table_ref

validate_table_ref(
    project_id: str, dataset_id: str, table_id: str
) -> tuple[str, str, str]

Validate a (project, dataset, table) triple in one call.

Source code in src/bqemulator/domain/ids.py
def validate_table_ref(
    project_id: str,
    dataset_id: str,
    table_id: str,
) -> tuple[str, str, str]:
    """Validate a (project, dataset, table) triple in one call."""
    return (
        validate_project_id(project_id),
        validate_dataset_id(dataset_id),
        validate_table_id(table_id),
    )

bqemulator.domain.clock

bqemulator.domain.clock

Clock protocol — injectable time source for deterministic tests.

Production code uses :class:SystemClock. Tests inject :class:FrozenClock to make timestamps predictable.

Every timestamp emitted by the emulator (job start/end, table creation, row insert time) flows through a :class:Clock; no code should call :func:datetime.now directly outside of this module.

Clock

Bases: Protocol

Protocol for injectable time sources.

Source code in src/bqemulator/domain/clock.py
class Clock(Protocol):
    """Protocol for injectable time sources."""

    def now(self) -> datetime:
        """Return the current UTC datetime."""
        ...

    def now_ms(self) -> int:
        """Return the current UTC epoch milliseconds."""
        ...

now

now() -> datetime

Return the current UTC datetime.

Source code in src/bqemulator/domain/clock.py
def now(self) -> datetime:
    """Return the current UTC datetime."""
    ...

now_ms

now_ms() -> int

Return the current UTC epoch milliseconds.

Source code in src/bqemulator/domain/clock.py
def now_ms(self) -> int:
    """Return the current UTC epoch milliseconds."""
    ...

SystemClock

Wall-clock implementation of :class:Clock.

Source code in src/bqemulator/domain/clock.py
class SystemClock:
    """Wall-clock implementation of :class:`Clock`."""

    def now(self) -> datetime:
        """Return current UTC time from the system clock."""
        return datetime.now(tz=UTC)

    def now_ms(self) -> int:
        """Return current UTC epoch milliseconds."""
        return int(self.now().timestamp() * 1000)

now

now() -> datetime

Return current UTC time from the system clock.

Source code in src/bqemulator/domain/clock.py
def now(self) -> datetime:
    """Return current UTC time from the system clock."""
    return datetime.now(tz=UTC)

now_ms

now_ms() -> int

Return current UTC epoch milliseconds.

Source code in src/bqemulator/domain/clock.py
def now_ms(self) -> int:
    """Return current UTC epoch milliseconds."""
    return int(self.now().timestamp() * 1000)

FrozenClock dataclass

Test clock that advances only when :meth:advance is called.

Example::

clock = FrozenClock(datetime(2026, 4, 15, tzinfo=UTC))
clock.now()  # datetime(2026, 4, 15, 0, 0, tzinfo=UTC)
clock.advance(seconds=60)
clock.now()  # datetime(2026, 4, 15, 0, 1, tzinfo=UTC)
Source code in src/bqemulator/domain/clock.py
@dataclass(slots=True)
class FrozenClock:
    """Test clock that advances only when :meth:`advance` is called.

    Example::

        clock = FrozenClock(datetime(2026, 4, 15, tzinfo=UTC))
        clock.now()  # datetime(2026, 4, 15, 0, 0, tzinfo=UTC)
        clock.advance(seconds=60)
        clock.now()  # datetime(2026, 4, 15, 0, 1, tzinfo=UTC)
    """

    current: datetime = field(default_factory=lambda: datetime(2026, 1, 1, tzinfo=UTC))

    def now(self) -> datetime:
        """Return the frozen current time."""
        return self.current

    def now_ms(self) -> int:
        """Return the frozen current time in epoch milliseconds."""
        return int(self.current.timestamp() * 1000)

    def advance(
        self,
        *,
        seconds: float = 0,
        milliseconds: float = 0,
        minutes: float = 0,
        hours: float = 0,
        days: float = 0,
    ) -> None:
        """Advance the clock by the given amount."""
        from datetime import timedelta

        delta = timedelta(
            days=days,
            hours=hours,
            minutes=minutes,
            seconds=seconds,
            milliseconds=milliseconds,
        )
        self.current = self.current + delta

now

now() -> datetime

Return the frozen current time.

Source code in src/bqemulator/domain/clock.py
def now(self) -> datetime:
    """Return the frozen current time."""
    return self.current

now_ms

now_ms() -> int

Return the frozen current time in epoch milliseconds.

Source code in src/bqemulator/domain/clock.py
def now_ms(self) -> int:
    """Return the frozen current time in epoch milliseconds."""
    return int(self.current.timestamp() * 1000)

advance

advance(
    *,
    seconds: float = 0,
    milliseconds: float = 0,
    minutes: float = 0,
    hours: float = 0,
    days: float = 0,
) -> None

Advance the clock by the given amount.

Source code in src/bqemulator/domain/clock.py
def advance(
    self,
    *,
    seconds: float = 0,
    milliseconds: float = 0,
    minutes: float = 0,
    hours: float = 0,
    days: float = 0,
) -> None:
    """Advance the clock by the given amount."""
    from datetime import timedelta

    delta = timedelta(
        days=days,
        hours=hours,
        minutes=minutes,
        seconds=seconds,
        milliseconds=milliseconds,
    )
    self.current = self.current + delta

bqemulator.domain.events

bqemulator.domain.events

Internal domain events.

Events are emitted when observable state changes. They power:

  • Materialized view auto-refresh (base-table DML invalidates cached MVs).
  • Query result cache invalidation.
  • Structured audit logging for debugging.

Events are consumed in-process via a simple synchronous bus. No persistent event log; events never cross process boundaries.

DomainEvent dataclass

Base class for all domain events.

Source code in src/bqemulator/domain/events.py
@dataclass(slots=True, frozen=True)
class DomainEvent:
    """Base class for all domain events."""

DatasetCreated dataclass

Bases: DomainEvent

A dataset has been created.

Source code in src/bqemulator/domain/events.py
@dataclass(slots=True, frozen=True)
class DatasetCreated(DomainEvent):
    """A dataset has been created."""

    project_id: str
    dataset_id: str

DatasetDeleted dataclass

Bases: DomainEvent

A dataset has been deleted.

Source code in src/bqemulator/domain/events.py
@dataclass(slots=True, frozen=True)
class DatasetDeleted(DomainEvent):
    """A dataset has been deleted."""

    project_id: str
    dataset_id: str

TableCreated dataclass

Bases: DomainEvent

A table has been created.

Source code in src/bqemulator/domain/events.py
@dataclass(slots=True, frozen=True)
class TableCreated(DomainEvent):
    """A table has been created."""

    project_id: str
    dataset_id: str
    table_id: str

TableSchemaChanged dataclass

Bases: DomainEvent

A table's schema (columns, modes) has changed.

Source code in src/bqemulator/domain/events.py
@dataclass(slots=True, frozen=True)
class TableSchemaChanged(DomainEvent):
    """A table's schema (columns, modes) has changed."""

    project_id: str
    dataset_id: str
    table_id: str

TableDataChanged dataclass

Bases: DomainEvent

Rows in a table have been inserted, updated, or deleted.

Triggers query-cache invalidation for anything that depends on the table and materialized-view refresh for dependent MVs.

Source code in src/bqemulator/domain/events.py
@dataclass(slots=True, frozen=True)
class TableDataChanged(DomainEvent):
    """Rows in a table have been inserted, updated, or deleted.

    Triggers query-cache invalidation for anything that depends on the table
    and materialized-view refresh for dependent MVs.
    """

    project_id: str
    dataset_id: str
    table_id: str

TableDeleted dataclass

Bases: DomainEvent

A table has been deleted.

Source code in src/bqemulator/domain/events.py
@dataclass(slots=True, frozen=True)
class TableDeleted(DomainEvent):
    """A table has been deleted."""

    project_id: str
    dataset_id: str
    table_id: str

JobStarted dataclass

Bases: DomainEvent

A job has transitioned from PENDING to RUNNING.

Source code in src/bqemulator/domain/events.py
@dataclass(slots=True, frozen=True)
class JobStarted(DomainEvent):
    """A job has transitioned from PENDING to RUNNING."""

    project_id: str
    job_id: str

JobCompleted dataclass

Bases: DomainEvent

A job has transitioned to DONE (success or failure).

Source code in src/bqemulator/domain/events.py
@dataclass(slots=True, frozen=True)
class JobCompleted(DomainEvent):
    """A job has transitioned to DONE (success or failure)."""

    project_id: str
    job_id: str
    successful: bool

EventBus

Synchronous, type-dispatched event bus.

Intentionally minimal — no persistence, no async, no ordering guarantees beyond registration order. Adequate for in-process fan-out.

Usage::

bus = EventBus()
bus.subscribe(TableDataChanged, invalidate_query_cache)
bus.publish(TableDataChanged("proj", "sales", "orders"))
Source code in src/bqemulator/domain/events.py
class EventBus:
    """Synchronous, type-dispatched event bus.

    Intentionally minimal — no persistence, no async, no ordering guarantees
    beyond registration order. Adequate for in-process fan-out.

    Usage::

        bus = EventBus()
        bus.subscribe(TableDataChanged, invalidate_query_cache)
        bus.publish(TableDataChanged("proj", "sales", "orders"))
    """

    _handlers: dict[type[DomainEvent], list[Handler]]

    DEFAULT_BUS: ClassVar[EventBus | None] = None

    def __init__(self) -> None:
        self._handlers = {}

    def subscribe(self, event_type: type[DomainEvent], handler: Handler) -> None:
        """Register ``handler`` to be called for every event of ``event_type``."""
        self._handlers.setdefault(event_type, []).append(handler)

    def unsubscribe(self, event_type: type[DomainEvent], handler: Handler) -> None:
        """Remove a previously registered ``handler``. Silently skip if absent."""
        import contextlib

        handlers = self._handlers.get(event_type)
        if handlers is None:
            return
        with contextlib.suppress(ValueError):
            handlers.remove(handler)

    def publish(self, event: DomainEvent) -> None:
        """Invoke every handler registered for the event's type."""
        for handler in self._handlers.get(type(event), []):
            handler(event)

subscribe

subscribe(event_type: type[DomainEvent], handler: Handler) -> None

Register handler to be called for every event of event_type.

Source code in src/bqemulator/domain/events.py
def subscribe(self, event_type: type[DomainEvent], handler: Handler) -> None:
    """Register ``handler`` to be called for every event of ``event_type``."""
    self._handlers.setdefault(event_type, []).append(handler)

unsubscribe

unsubscribe(event_type: type[DomainEvent], handler: Handler) -> None

Remove a previously registered handler. Silently skip if absent.

Source code in src/bqemulator/domain/events.py
def unsubscribe(self, event_type: type[DomainEvent], handler: Handler) -> None:
    """Remove a previously registered ``handler``. Silently skip if absent."""
    import contextlib

    handlers = self._handlers.get(event_type)
    if handlers is None:
        return
    with contextlib.suppress(ValueError):
        handlers.remove(handler)

publish

publish(event: DomainEvent) -> None

Invoke every handler registered for the event's type.

Source code in src/bqemulator/domain/events.py
def publish(self, event: DomainEvent) -> None:
    """Invoke every handler registered for the event's type."""
    for handler in self._handlers.get(type(event), []):
        handler(event)

bqemulator.storage.engine

bqemulator.storage.engine

DuckDB engine — single-writer connection with async-safe lifecycle.

The emulator uses exactly one :class:duckdb.DuckDBPyConnection for the entire process. Writes serialize on an :class:asyncio.Lock; reads do not take the lock (DuckDB provides internal read/write concurrency for the same connection object).

The engine also handles startup tasks:

  • Ensure the reserved _bqemulator_catalog schema exists.
  • Set the connection's time zone to UTC (BigQuery TIMESTAMP semantics).
  • Install and load the spatial extension. Required — startup fails fast with a clear error if the extension cannot be installed/loaded (e.g. offline build with no cached extension), because GEOGRAPHY queries depend on it.

DuckDBEngine

Async-friendly wrapper around a single DuckDB connection.

Usage::

engine = DuckDBEngine(settings)
await engine.start()
async with engine.write_lock():
    engine.execute("INSERT INTO ...")
await engine.stop()

:class:DuckDBEngine is intentionally synchronous-under-the-hood. DuckDB releases the GIL during query execution, so awaiting the write lock gives other tasks a chance to progress. Long queries should be run inside asyncio.to_thread by the caller.

Source code in src/bqemulator/storage/engine.py
class DuckDBEngine:
    """Async-friendly wrapper around a single DuckDB connection.

    Usage::

        engine = DuckDBEngine(settings)
        await engine.start()
        async with engine.write_lock():
            engine.execute("INSERT INTO ...")
        await engine.stop()

    :class:`DuckDBEngine` is intentionally synchronous-under-the-hood. DuckDB
    releases the GIL during query execution, so awaiting the write lock
    gives other tasks a chance to progress. Long queries should be run
    inside ``asyncio.to_thread`` by the caller.
    """

    def __init__(self, settings: Settings) -> None:
        self._settings = settings
        self._connection: duckdb.DuckDBPyConnection | None = None
        self._write_lock = asyncio.Lock()
        self._started = False

    # -- Lifecycle ---------------------------------------------------------

    async def start(self) -> None:
        """Open the DuckDB connection and run startup hooks."""
        if self._started:
            return

        path = self._resolve_path()
        _log.info("duckdb.open", path=path)

        # Import here to keep module import cheap.
        import duckdb

        self._connection = duckdb.connect(path, read_only=False)
        self._apply_session_pragmas()
        self._ensure_catalog_schema()
        self._load_spatial()
        if self._settings.enable_format_extensions:
            self._load_format_extensions()
        self._register_builtin_udfs()
        self._started = True

    async def stop(self) -> None:
        """Close the DuckDB connection (idempotent)."""
        if self._connection is not None:
            try:
                self._connection.close()
            except Exception as exc:  # noqa: BLE001
                _log.warning("duckdb.close_failed", error=str(exc))
            self._connection = None
        self._started = False

    # -- Execution ---------------------------------------------------------

    @property
    def connection(self) -> duckdb.DuckDBPyConnection:
        """Return the underlying DuckDB connection.

        Raises :class:`InternalError` if :meth:`start` has not been called.
        """
        if self._connection is None:
            raise InternalError("DuckDBEngine not started")
        return self._connection

    def execute(self, sql: str, parameters: list[Any] | None = None) -> duckdb.DuckDBPyConnection:
        """Execute a SQL statement. Returns the cursor-like connection."""
        conn = self.connection
        return conn.execute(sql, parameters) if parameters is not None else conn.execute(sql)

    def fetch_arrow(self, sql: str, parameters: list[Any] | None = None) -> pa.Table:
        """Execute and fetch results as a pyarrow.Table.

        Uses ``to_arrow_table()`` (DuckDB >=1.4) with a fallback to the
        deprecated ``fetch_arrow_table()`` for older builds.

        Annotates each field with the original DuckDB column type as
        ``bqemu.duckdb_type`` metadata. DuckDB's JSON / DECIMAL /
        TIMESTAMP_TZ etc. flatten to their underlying physical Arrow
        type (``string`` / ``int64`` / …) at conversion time, so the
        REST schema renderer has no way to distinguish a JSON-typed
        column from a regular VARCHAR after the fact. Preserving the
        DuckDB-side type as field metadata lets
        :func:`bqemulator.jobs.executor._arrow_field_to_schema_entry`
        recover the BigQuery wire-format type for those columns.
        """
        result = self.execute(sql, parameters)
        description = result.description
        arrow_table = (
            result.to_arrow_table()
            if hasattr(result, "to_arrow_table")
            else result.fetch_arrow_table()
        )
        return _annotate_with_duckdb_types(arrow_table, description)

    @asynccontextmanager
    async def write_lock(self) -> AsyncIterator[None]:
        """Acquire the exclusive write lock for this engine.

        All DDL and DML must be wrapped in ``async with engine.write_lock()``.
        Concurrent readers may proceed without the lock.
        """
        async with self._write_lock:
            yield

    # -- Helpers ------------------------------------------------------------

    def _resolve_path(self) -> str:
        mode = self._settings.persistence_mode
        if mode is PersistenceMode.EPHEMERAL:
            return ":memory:"
        if self._settings.data_dir is None:
            raise InternalError(
                f"persistence_mode={mode.value} requires data_dir to be set",
            )
        data_dir = Path(self._settings.data_dir)
        data_dir.mkdir(parents=True, exist_ok=True)
        return str(data_dir / "bqemulator.duckdb")

    def _apply_session_pragmas(self) -> None:
        if self._connection is None:  # internal invariant
            raise InternalError("DuckDBEngine not started")
        # BigQuery TIMESTAMP is always UTC; align DuckDB.
        self._connection.execute("SET TimeZone = 'UTC'")

    def _ensure_catalog_schema(self) -> None:
        if self._connection is None:  # internal invariant
            raise InternalError("DuckDBEngine not started")
        self._connection.execute(f'CREATE SCHEMA IF NOT EXISTS "{CATALOG_SCHEMA}"')
        self._connection.execute(f'CREATE SCHEMA IF NOT EXISTS "{SNAPSHOTS_SCHEMA}"')

    def _register_builtin_udfs(self) -> None:
        """Register Python-backed scalar UDFs that fill DuckDB gaps.

        See :mod:`bqemulator.sql.builtin_udfs` for the list — helpers
        cover BigQuery builtins DuckDB lacks (``JSON_REMOVE``,
        ``JSON_SET``, ``JSON_STRIP_NULLS``, ``NORMALIZE``,
        ``NORMALIZE_AND_CASEFOLD``, ``FARM_FINGERPRINT``).
        """
        if self._connection is None:  # internal invariant
            raise InternalError("DuckDBEngine not started")
        # Import here so the engine module remains importable even if
        # the SQL package has an unrelated load error.
        from bqemulator.sql.builtin_udfs import register_builtin_udfs

        register_builtin_udfs(self._connection)
        _log.debug("duckdb.builtin_udfs_registered")

    def _load_spatial(self) -> None:
        """Load DuckDB's spatial extension; fail fast if unavailable.

        GEOGRAPHY support is backed by DuckDB's spatial extension. The
        extension powers every ``ST_*`` function the emulator translates
        BigQuery GEOGRAPHY queries into. Without it the emulator cannot
        honour its GEOGRAPHY contract, so we surface the failure at
        startup rather than letting query-time ``ST_*`` calls fail with
        confusing catalog errors.
        """
        if self._connection is None:  # internal invariant
            raise InternalError("DuckDBEngine not started")
        try:
            self._connection.execute("INSTALL spatial")
            self._connection.execute("LOAD spatial")
        except Exception as exc:
            _log.error("duckdb.spatial_unavailable", error=str(exc))
            raise InternalError(
                "DuckDB spatial extension is required for bqemulator (GEOGRAPHY "
                "support). INSTALL/LOAD spatial failed — check network access "
                "for the DuckDB extension repository or pre-bundle the extension "
                f"in the image. Underlying error: {exc}",
            ) from exc
        _log.debug("duckdb.spatial_loaded")

    def _load_format_extensions(self) -> None:
        """Best-effort load of DuckDB's ``avro`` extension (G1).

        Unlike ``spatial`` this is best-effort: the load/extract path
        gracefully reports ``UnsupportedFeatureError`` to the client if
        the extension is absent, rather than failing startup. This lets
        offline / air-gapped deployments keep all non-Avro functionality
        even when ``extensions.duckdb.org`` is unreachable. ORC support
        is provided by the Python ``pyorc`` package (optional ``[orc]``
        extra) and does not touch the DuckDB extension repo.
        """
        if self._connection is None:  # internal invariant
            raise InternalError("DuckDBEngine not started")
        try:
            self._connection.execute("INSTALL avro")
            self._connection.execute("LOAD avro")
        except Exception as exc:  # noqa: BLE001 — best-effort
            _log.warning("duckdb.avro_unavailable", error=str(exc))
            return
        _log.debug("duckdb.avro_loaded")

connection property

connection: DuckDBPyConnection

Return the underlying DuckDB connection.

Raises :class:InternalError if :meth:start has not been called.

start async

start() -> None

Open the DuckDB connection and run startup hooks.

Source code in src/bqemulator/storage/engine.py
async def start(self) -> None:
    """Open the DuckDB connection and run startup hooks."""
    if self._started:
        return

    path = self._resolve_path()
    _log.info("duckdb.open", path=path)

    # Import here to keep module import cheap.
    import duckdb

    self._connection = duckdb.connect(path, read_only=False)
    self._apply_session_pragmas()
    self._ensure_catalog_schema()
    self._load_spatial()
    if self._settings.enable_format_extensions:
        self._load_format_extensions()
    self._register_builtin_udfs()
    self._started = True

stop async

stop() -> None

Close the DuckDB connection (idempotent).

Source code in src/bqemulator/storage/engine.py
async def stop(self) -> None:
    """Close the DuckDB connection (idempotent)."""
    if self._connection is not None:
        try:
            self._connection.close()
        except Exception as exc:  # noqa: BLE001
            _log.warning("duckdb.close_failed", error=str(exc))
        self._connection = None
    self._started = False

execute

execute(sql: str, parameters: list[Any] | None = None) -> duckdb.DuckDBPyConnection

Execute a SQL statement. Returns the cursor-like connection.

Source code in src/bqemulator/storage/engine.py
def execute(self, sql: str, parameters: list[Any] | None = None) -> duckdb.DuckDBPyConnection:
    """Execute a SQL statement. Returns the cursor-like connection."""
    conn = self.connection
    return conn.execute(sql, parameters) if parameters is not None else conn.execute(sql)

fetch_arrow

fetch_arrow(sql: str, parameters: list[Any] | None = None) -> pa.Table

Execute and fetch results as a pyarrow.Table.

Uses to_arrow_table() (DuckDB >=1.4) with a fallback to the deprecated fetch_arrow_table() for older builds.

Annotates each field with the original DuckDB column type as bqemu.duckdb_type metadata. DuckDB's JSON / DECIMAL / TIMESTAMP_TZ etc. flatten to their underlying physical Arrow type (string / int64 / …) at conversion time, so the REST schema renderer has no way to distinguish a JSON-typed column from a regular VARCHAR after the fact. Preserving the DuckDB-side type as field metadata lets :func:bqemulator.jobs.executor._arrow_field_to_schema_entry recover the BigQuery wire-format type for those columns.

Source code in src/bqemulator/storage/engine.py
def fetch_arrow(self, sql: str, parameters: list[Any] | None = None) -> pa.Table:
    """Execute and fetch results as a pyarrow.Table.

    Uses ``to_arrow_table()`` (DuckDB >=1.4) with a fallback to the
    deprecated ``fetch_arrow_table()`` for older builds.

    Annotates each field with the original DuckDB column type as
    ``bqemu.duckdb_type`` metadata. DuckDB's JSON / DECIMAL /
    TIMESTAMP_TZ etc. flatten to their underlying physical Arrow
    type (``string`` / ``int64`` / …) at conversion time, so the
    REST schema renderer has no way to distinguish a JSON-typed
    column from a regular VARCHAR after the fact. Preserving the
    DuckDB-side type as field metadata lets
    :func:`bqemulator.jobs.executor._arrow_field_to_schema_entry`
    recover the BigQuery wire-format type for those columns.
    """
    result = self.execute(sql, parameters)
    description = result.description
    arrow_table = (
        result.to_arrow_table()
        if hasattr(result, "to_arrow_table")
        else result.fetch_arrow_table()
    )
    return _annotate_with_duckdb_types(arrow_table, description)

write_lock async

write_lock() -> AsyncIterator[None]

Acquire the exclusive write lock for this engine.

All DDL and DML must be wrapped in async with engine.write_lock(). Concurrent readers may proceed without the lock.

Source code in src/bqemulator/storage/engine.py
@asynccontextmanager
async def write_lock(self) -> AsyncIterator[None]:
    """Acquire the exclusive write lock for this engine.

    All DDL and DML must be wrapped in ``async with engine.write_lock()``.
    Concurrent readers may proceed without the lock.
    """
    async with self._write_lock:
        yield

bqemulator.catalog.repository

bqemulator.catalog.repository

CatalogRepository protocol.

Any implementation must honor the contracts documented on each method:

  • get_* returns None when the resource does not exist. The API adapter is responsible for converting None to :class:NotFoundError.
  • create_* raises :class:AlreadyExistsError when a resource with the same identity already exists.
  • update_* raises :class:NotFoundError when the resource is absent.
  • delete_* is idempotent when called with not_found_ok=True; it otherwise raises :class:NotFoundError.
  • list_* returns an empty tuple when no resources match; never None.

Implementations must be safe to call from a single asyncio task at a time. Concurrent writes should be gated by the caller (the storage engine's write lock) — the repository itself does not serialize writes.

CatalogRepository

Bases: Protocol

Repository protocol for BigQuery-style metadata.

Source code in src/bqemulator/catalog/repository.py
@runtime_checkable
class CatalogRepository(Protocol):
    """Repository protocol for BigQuery-style metadata."""

    # -- Datasets ---------------------------------------------------------

    def list_datasets(self, project_id: str) -> tuple[DatasetMeta, ...]:
        """Return all datasets in ``project_id`` (possibly empty)."""
        ...

    def list_all_datasets(self) -> tuple[DatasetMeta, ...]:
        """Return every dataset across every project (possibly empty).

        Used by the admin / export / seed paths that need a full catalog
        walk without knowing the project ids in advance. Order is
        implementation-defined.
        """
        ...

    def get_dataset(self, project_id: str, dataset_id: str) -> DatasetMeta | None:
        """Return the dataset, or ``None`` if it does not exist."""
        ...

    def create_dataset(self, dataset: DatasetMeta) -> DatasetMeta:
        """Insert a new dataset. Raises AlreadyExistsError on conflict."""
        ...

    def update_dataset(self, dataset: DatasetMeta) -> DatasetMeta:
        """Replace an existing dataset. Raises NotFoundError if absent."""
        ...

    def delete_dataset(
        self,
        project_id: str,
        dataset_id: str,
        *,
        not_found_ok: bool = False,
        delete_contents: bool = False,
    ) -> None:
        """Delete a dataset. ``delete_contents`` cascades to tables/routines."""
        ...

    # -- Tables -----------------------------------------------------------

    def list_tables(self, project_id: str, dataset_id: str) -> tuple[TableMeta, ...]:
        """Return all tables in the dataset (possibly empty)."""
        ...

    def list_storage_tables(self, project_id: str, dataset_id: str) -> tuple[str, ...]:
        """Return table IDs physically present in storage for this dataset.

        Unlike :meth:`list_tables` (which returns BigQuery-level
        :class:`TableMeta` for catalog-registered tables only), this
        method also surfaces tables created directly via SQL DDL
        (``CREATE TABLE … AS SELECT``). The wildcard-table expander
        uses it so wildcard references engage on DDL-created shards
        the catalog cache hasn't been notified about.

        Returns table IDs without metadata; order is
        implementation-defined. Returns an empty tuple if no tables
        exist in the dataset.
        """
        ...

    def get_table(
        self,
        project_id: str,
        dataset_id: str,
        table_id: str,
    ) -> TableMeta | None:
        """Return the table, or ``None`` if it does not exist."""
        ...

    def create_table(self, table: TableMeta) -> TableMeta:
        """Insert a new table. Raises AlreadyExistsError on conflict."""
        ...

    def update_table(self, table: TableMeta) -> TableMeta:
        """Replace an existing table. Raises NotFoundError if absent."""
        ...

    def delete_table(
        self,
        project_id: str,
        dataset_id: str,
        table_id: str,
        *,
        not_found_ok: bool = False,
    ) -> None:
        """Delete a table."""
        ...

    def list_views(self, project_id: str, dataset_id: str) -> tuple[TableMeta, ...]:
        """Return all VIEW-typed tables in the dataset (possibly empty).

        Backs ``INFORMATION_SCHEMA.VIEWS``. The returned
        :class:`TableMeta` instances carry ``table_type='VIEW'`` and
        ``view_query`` populated with the BigQuery SQL view definition.
        """
        ...

    def list_partitions(
        self,
        project_id: str,
        dataset_id: str,
        table_id: str,
    ) -> tuple[PartitionMeta, ...]:
        """Return the distinct partitions for a (possibly partitioned) table.

        For a time-partitioned table this enumerates the distinct
        partition-grain values (``YYYYMMDD`` for DAY, ``YYYYMMDDHH`` for
        HOUR, etc.) plus row counts per partition. For an integer-range
        partitioned table the bucket starts are stringified
        (``"0"``, ``"100"``, …). For an unpartitioned table the entire
        table is treated as a single partition with
        ``partition_id='__NULL__'`` (BigQuery's documented sentinel).

        Implementations that have a live DuckDB engine query the
        physical storage; in-memory unit tests without a wired engine
        return an empty tuple.
        """
        ...

    # -- Routines ---------------------------------------------------------

    def list_routines(self, project_id: str, dataset_id: str) -> tuple[RoutineMeta, ...]:
        """Return all routines in the dataset."""
        ...

    def get_routine(
        self,
        project_id: str,
        dataset_id: str,
        routine_id: str,
    ) -> RoutineMeta | None:
        """Return the routine or ``None``."""
        ...

    def create_routine(self, routine: RoutineMeta) -> RoutineMeta:
        """Insert a new routine."""
        ...

    def update_routine(self, routine: RoutineMeta) -> RoutineMeta:
        """Replace an existing routine."""
        ...

    def delete_routine(
        self,
        project_id: str,
        dataset_id: str,
        routine_id: str,
        *,
        not_found_ok: bool = False,
    ) -> None:
        """Delete a routine."""
        ...

    # -- Jobs -------------------------------------------------------------

    def list_jobs(
        self,
        project_id: str,
        *,
        state_filter: str | None = None,
        max_results: int = 100,
    ) -> tuple[JobMeta, ...]:
        """Return recent jobs for the project."""
        ...

    def get_job(self, project_id: str, job_id: str) -> JobMeta | None:
        """Return the job or ``None``."""
        ...

    def upsert_job(self, job: JobMeta) -> JobMeta:
        """Insert a new job or replace the existing state for the same id."""
        ...

    def delete_job(
        self,
        project_id: str,
        job_id: str,
        *,
        not_found_ok: bool = False,
    ) -> None:
        """Delete a job record (metadata only; job results handled separately)."""
        ...

    # -- Snapshots ---------------------------------------------

    def list_snapshots_for_table(
        self,
        project_id: str,
        dataset_id: str,
        table_id: str,
    ) -> tuple[SnapshotMeta, ...]:
        """Return all snapshots for a base table ordered by ``snapshot_time``."""
        ...

    def list_all_snapshots(self) -> tuple[SnapshotMeta, ...]:
        """Return every snapshot known to the catalog."""
        ...

    def create_snapshot(self, snapshot: SnapshotMeta) -> SnapshotMeta:
        """Insert a new snapshot metadata entry."""
        ...

    def delete_snapshot(
        self,
        snapshot_id: str,
        *,
        not_found_ok: bool = False,
    ) -> None:
        """Delete a snapshot metadata entry by id."""
        ...

    # -- Materialized views ------------------------------------

    def list_materialized_views(
        self,
        project_id: str,
        dataset_id: str,
    ) -> tuple[MaterializedViewMeta, ...]:
        """Return all materialized views in a dataset."""
        ...

    def list_all_materialized_views(self) -> tuple[MaterializedViewMeta, ...]:
        """Return every materialized view known to the catalog."""
        ...

    def get_materialized_view(
        self,
        project_id: str,
        dataset_id: str,
        table_id: str,
    ) -> MaterializedViewMeta | None:
        """Return the MV entry or ``None``."""
        ...

    def upsert_materialized_view(
        self,
        view: MaterializedViewMeta,
    ) -> MaterializedViewMeta:
        """Insert or replace a materialized view entry."""
        ...

    def delete_materialized_view(
        self,
        project_id: str,
        dataset_id: str,
        table_id: str,
        *,
        not_found_ok: bool = False,
    ) -> None:
        """Delete a materialized view entry."""
        ...

    # -- Row access policies ----------------------------------

    def list_row_access_policies(
        self,
        project_id: str,
        dataset_id: str,
        table_id: str,
    ) -> tuple[RowAccessPolicyMeta, ...]:
        """Return all row access policies on the table (possibly empty)."""
        ...

    def list_all_row_access_policies(self) -> tuple[RowAccessPolicyMeta, ...]:
        """Return every row access policy known to the catalog."""
        ...

    def get_row_access_policy(
        self,
        project_id: str,
        dataset_id: str,
        table_id: str,
        policy_id: str,
    ) -> RowAccessPolicyMeta | None:
        """Return a single policy or ``None``."""
        ...

    def create_row_access_policy(
        self,
        policy: RowAccessPolicyMeta,
    ) -> RowAccessPolicyMeta:
        """Insert a new row access policy. Raises AlreadyExistsError on conflict."""
        ...

    def update_row_access_policy(
        self,
        policy: RowAccessPolicyMeta,
    ) -> RowAccessPolicyMeta:
        """Replace an existing row access policy. Raises NotFoundError if absent."""
        ...

    def delete_row_access_policy(
        self,
        project_id: str,
        dataset_id: str,
        table_id: str,
        policy_id: str,
        *,
        not_found_ok: bool = False,
    ) -> None:
        """Delete a row access policy."""
        ...

list_datasets

list_datasets(project_id: str) -> tuple[DatasetMeta, ...]

Return all datasets in project_id (possibly empty).

Source code in src/bqemulator/catalog/repository.py
def list_datasets(self, project_id: str) -> tuple[DatasetMeta, ...]:
    """Return all datasets in ``project_id`` (possibly empty)."""
    ...

list_all_datasets

list_all_datasets() -> tuple[DatasetMeta, ...]

Return every dataset across every project (possibly empty).

Used by the admin / export / seed paths that need a full catalog walk without knowing the project ids in advance. Order is implementation-defined.

Source code in src/bqemulator/catalog/repository.py
def list_all_datasets(self) -> tuple[DatasetMeta, ...]:
    """Return every dataset across every project (possibly empty).

    Used by the admin / export / seed paths that need a full catalog
    walk without knowing the project ids in advance. Order is
    implementation-defined.
    """
    ...

get_dataset

get_dataset(project_id: str, dataset_id: str) -> DatasetMeta | None

Return the dataset, or None if it does not exist.

Source code in src/bqemulator/catalog/repository.py
def get_dataset(self, project_id: str, dataset_id: str) -> DatasetMeta | None:
    """Return the dataset, or ``None`` if it does not exist."""
    ...

create_dataset

create_dataset(dataset: DatasetMeta) -> DatasetMeta

Insert a new dataset. Raises AlreadyExistsError on conflict.

Source code in src/bqemulator/catalog/repository.py
def create_dataset(self, dataset: DatasetMeta) -> DatasetMeta:
    """Insert a new dataset. Raises AlreadyExistsError on conflict."""
    ...

update_dataset

update_dataset(dataset: DatasetMeta) -> DatasetMeta

Replace an existing dataset. Raises NotFoundError if absent.

Source code in src/bqemulator/catalog/repository.py
def update_dataset(self, dataset: DatasetMeta) -> DatasetMeta:
    """Replace an existing dataset. Raises NotFoundError if absent."""
    ...

delete_dataset

delete_dataset(
    project_id: str,
    dataset_id: str,
    *,
    not_found_ok: bool = False,
    delete_contents: bool = False,
) -> None

Delete a dataset. delete_contents cascades to tables/routines.

Source code in src/bqemulator/catalog/repository.py
def delete_dataset(
    self,
    project_id: str,
    dataset_id: str,
    *,
    not_found_ok: bool = False,
    delete_contents: bool = False,
) -> None:
    """Delete a dataset. ``delete_contents`` cascades to tables/routines."""
    ...

list_tables

list_tables(project_id: str, dataset_id: str) -> tuple[TableMeta, ...]

Return all tables in the dataset (possibly empty).

Source code in src/bqemulator/catalog/repository.py
def list_tables(self, project_id: str, dataset_id: str) -> tuple[TableMeta, ...]:
    """Return all tables in the dataset (possibly empty)."""
    ...

list_storage_tables

list_storage_tables(project_id: str, dataset_id: str) -> tuple[str, ...]

Return table IDs physically present in storage for this dataset.

Unlike :meth:list_tables (which returns BigQuery-level :class:TableMeta for catalog-registered tables only), this method also surfaces tables created directly via SQL DDL (CREATE TABLE … AS SELECT). The wildcard-table expander uses it so wildcard references engage on DDL-created shards the catalog cache hasn't been notified about.

Returns table IDs without metadata; order is implementation-defined. Returns an empty tuple if no tables exist in the dataset.

Source code in src/bqemulator/catalog/repository.py
def list_storage_tables(self, project_id: str, dataset_id: str) -> tuple[str, ...]:
    """Return table IDs physically present in storage for this dataset.

    Unlike :meth:`list_tables` (which returns BigQuery-level
    :class:`TableMeta` for catalog-registered tables only), this
    method also surfaces tables created directly via SQL DDL
    (``CREATE TABLE … AS SELECT``). The wildcard-table expander
    uses it so wildcard references engage on DDL-created shards
    the catalog cache hasn't been notified about.

    Returns table IDs without metadata; order is
    implementation-defined. Returns an empty tuple if no tables
    exist in the dataset.
    """
    ...

get_table

get_table(project_id: str, dataset_id: str, table_id: str) -> TableMeta | None

Return the table, or None if it does not exist.

Source code in src/bqemulator/catalog/repository.py
def get_table(
    self,
    project_id: str,
    dataset_id: str,
    table_id: str,
) -> TableMeta | None:
    """Return the table, or ``None`` if it does not exist."""
    ...

create_table

create_table(table: TableMeta) -> TableMeta

Insert a new table. Raises AlreadyExistsError on conflict.

Source code in src/bqemulator/catalog/repository.py
def create_table(self, table: TableMeta) -> TableMeta:
    """Insert a new table. Raises AlreadyExistsError on conflict."""
    ...

update_table

update_table(table: TableMeta) -> TableMeta

Replace an existing table. Raises NotFoundError if absent.

Source code in src/bqemulator/catalog/repository.py
def update_table(self, table: TableMeta) -> TableMeta:
    """Replace an existing table. Raises NotFoundError if absent."""
    ...

delete_table

delete_table(
    project_id: str, dataset_id: str, table_id: str, *, not_found_ok: bool = False
) -> None

Delete a table.

Source code in src/bqemulator/catalog/repository.py
def delete_table(
    self,
    project_id: str,
    dataset_id: str,
    table_id: str,
    *,
    not_found_ok: bool = False,
) -> None:
    """Delete a table."""
    ...

list_views

list_views(project_id: str, dataset_id: str) -> tuple[TableMeta, ...]

Return all VIEW-typed tables in the dataset (possibly empty).

Backs INFORMATION_SCHEMA.VIEWS. The returned :class:TableMeta instances carry table_type='VIEW' and view_query populated with the BigQuery SQL view definition.

Source code in src/bqemulator/catalog/repository.py
def list_views(self, project_id: str, dataset_id: str) -> tuple[TableMeta, ...]:
    """Return all VIEW-typed tables in the dataset (possibly empty).

    Backs ``INFORMATION_SCHEMA.VIEWS``. The returned
    :class:`TableMeta` instances carry ``table_type='VIEW'`` and
    ``view_query`` populated with the BigQuery SQL view definition.
    """
    ...

list_partitions

list_partitions(
    project_id: str, dataset_id: str, table_id: str
) -> tuple[PartitionMeta, ...]

Return the distinct partitions for a (possibly partitioned) table.

For a time-partitioned table this enumerates the distinct partition-grain values (YYYYMMDD for DAY, YYYYMMDDHH for HOUR, etc.) plus row counts per partition. For an integer-range partitioned table the bucket starts are stringified ("0", "100", …). For an unpartitioned table the entire table is treated as a single partition with partition_id='__NULL__' (BigQuery's documented sentinel).

Implementations that have a live DuckDB engine query the physical storage; in-memory unit tests without a wired engine return an empty tuple.

Source code in src/bqemulator/catalog/repository.py
def list_partitions(
    self,
    project_id: str,
    dataset_id: str,
    table_id: str,
) -> tuple[PartitionMeta, ...]:
    """Return the distinct partitions for a (possibly partitioned) table.

    For a time-partitioned table this enumerates the distinct
    partition-grain values (``YYYYMMDD`` for DAY, ``YYYYMMDDHH`` for
    HOUR, etc.) plus row counts per partition. For an integer-range
    partitioned table the bucket starts are stringified
    (``"0"``, ``"100"``, …). For an unpartitioned table the entire
    table is treated as a single partition with
    ``partition_id='__NULL__'`` (BigQuery's documented sentinel).

    Implementations that have a live DuckDB engine query the
    physical storage; in-memory unit tests without a wired engine
    return an empty tuple.
    """
    ...

list_routines

list_routines(project_id: str, dataset_id: str) -> tuple[RoutineMeta, ...]

Return all routines in the dataset.

Source code in src/bqemulator/catalog/repository.py
def list_routines(self, project_id: str, dataset_id: str) -> tuple[RoutineMeta, ...]:
    """Return all routines in the dataset."""
    ...

get_routine

get_routine(project_id: str, dataset_id: str, routine_id: str) -> RoutineMeta | None

Return the routine or None.

Source code in src/bqemulator/catalog/repository.py
def get_routine(
    self,
    project_id: str,
    dataset_id: str,
    routine_id: str,
) -> RoutineMeta | None:
    """Return the routine or ``None``."""
    ...

create_routine

create_routine(routine: RoutineMeta) -> RoutineMeta

Insert a new routine.

Source code in src/bqemulator/catalog/repository.py
def create_routine(self, routine: RoutineMeta) -> RoutineMeta:
    """Insert a new routine."""
    ...

update_routine

update_routine(routine: RoutineMeta) -> RoutineMeta

Replace an existing routine.

Source code in src/bqemulator/catalog/repository.py
def update_routine(self, routine: RoutineMeta) -> RoutineMeta:
    """Replace an existing routine."""
    ...

delete_routine

delete_routine(
    project_id: str, dataset_id: str, routine_id: str, *, not_found_ok: bool = False
) -> None

Delete a routine.

Source code in src/bqemulator/catalog/repository.py
def delete_routine(
    self,
    project_id: str,
    dataset_id: str,
    routine_id: str,
    *,
    not_found_ok: bool = False,
) -> None:
    """Delete a routine."""
    ...

list_jobs

list_jobs(
    project_id: str, *, state_filter: str | None = None, max_results: int = 100
) -> tuple[JobMeta, ...]

Return recent jobs for the project.

Source code in src/bqemulator/catalog/repository.py
def list_jobs(
    self,
    project_id: str,
    *,
    state_filter: str | None = None,
    max_results: int = 100,
) -> tuple[JobMeta, ...]:
    """Return recent jobs for the project."""
    ...

get_job

get_job(project_id: str, job_id: str) -> JobMeta | None

Return the job or None.

Source code in src/bqemulator/catalog/repository.py
def get_job(self, project_id: str, job_id: str) -> JobMeta | None:
    """Return the job or ``None``."""
    ...

upsert_job

upsert_job(job: JobMeta) -> JobMeta

Insert a new job or replace the existing state for the same id.

Source code in src/bqemulator/catalog/repository.py
def upsert_job(self, job: JobMeta) -> JobMeta:
    """Insert a new job or replace the existing state for the same id."""
    ...

delete_job

delete_job(project_id: str, job_id: str, *, not_found_ok: bool = False) -> None

Delete a job record (metadata only; job results handled separately).

Source code in src/bqemulator/catalog/repository.py
def delete_job(
    self,
    project_id: str,
    job_id: str,
    *,
    not_found_ok: bool = False,
) -> None:
    """Delete a job record (metadata only; job results handled separately)."""
    ...

list_snapshots_for_table

list_snapshots_for_table(
    project_id: str, dataset_id: str, table_id: str
) -> tuple[SnapshotMeta, ...]

Return all snapshots for a base table ordered by snapshot_time.

Source code in src/bqemulator/catalog/repository.py
def list_snapshots_for_table(
    self,
    project_id: str,
    dataset_id: str,
    table_id: str,
) -> tuple[SnapshotMeta, ...]:
    """Return all snapshots for a base table ordered by ``snapshot_time``."""
    ...

list_all_snapshots

list_all_snapshots() -> tuple[SnapshotMeta, ...]

Return every snapshot known to the catalog.

Source code in src/bqemulator/catalog/repository.py
def list_all_snapshots(self) -> tuple[SnapshotMeta, ...]:
    """Return every snapshot known to the catalog."""
    ...

create_snapshot

create_snapshot(snapshot: SnapshotMeta) -> SnapshotMeta

Insert a new snapshot metadata entry.

Source code in src/bqemulator/catalog/repository.py
def create_snapshot(self, snapshot: SnapshotMeta) -> SnapshotMeta:
    """Insert a new snapshot metadata entry."""
    ...

delete_snapshot

delete_snapshot(snapshot_id: str, *, not_found_ok: bool = False) -> None

Delete a snapshot metadata entry by id.

Source code in src/bqemulator/catalog/repository.py
def delete_snapshot(
    self,
    snapshot_id: str,
    *,
    not_found_ok: bool = False,
) -> None:
    """Delete a snapshot metadata entry by id."""
    ...

list_materialized_views

list_materialized_views(
    project_id: str, dataset_id: str
) -> tuple[MaterializedViewMeta, ...]

Return all materialized views in a dataset.

Source code in src/bqemulator/catalog/repository.py
def list_materialized_views(
    self,
    project_id: str,
    dataset_id: str,
) -> tuple[MaterializedViewMeta, ...]:
    """Return all materialized views in a dataset."""
    ...

list_all_materialized_views

list_all_materialized_views() -> tuple[MaterializedViewMeta, ...]

Return every materialized view known to the catalog.

Source code in src/bqemulator/catalog/repository.py
def list_all_materialized_views(self) -> tuple[MaterializedViewMeta, ...]:
    """Return every materialized view known to the catalog."""
    ...

get_materialized_view

get_materialized_view(
    project_id: str, dataset_id: str, table_id: str
) -> MaterializedViewMeta | None

Return the MV entry or None.

Source code in src/bqemulator/catalog/repository.py
def get_materialized_view(
    self,
    project_id: str,
    dataset_id: str,
    table_id: str,
) -> MaterializedViewMeta | None:
    """Return the MV entry or ``None``."""
    ...

upsert_materialized_view

upsert_materialized_view(view: MaterializedViewMeta) -> MaterializedViewMeta

Insert or replace a materialized view entry.

Source code in src/bqemulator/catalog/repository.py
def upsert_materialized_view(
    self,
    view: MaterializedViewMeta,
) -> MaterializedViewMeta:
    """Insert or replace a materialized view entry."""
    ...

delete_materialized_view

delete_materialized_view(
    project_id: str, dataset_id: str, table_id: str, *, not_found_ok: bool = False
) -> None

Delete a materialized view entry.

Source code in src/bqemulator/catalog/repository.py
def delete_materialized_view(
    self,
    project_id: str,
    dataset_id: str,
    table_id: str,
    *,
    not_found_ok: bool = False,
) -> None:
    """Delete a materialized view entry."""
    ...

list_row_access_policies

list_row_access_policies(
    project_id: str, dataset_id: str, table_id: str
) -> tuple[RowAccessPolicyMeta, ...]

Return all row access policies on the table (possibly empty).

Source code in src/bqemulator/catalog/repository.py
def list_row_access_policies(
    self,
    project_id: str,
    dataset_id: str,
    table_id: str,
) -> tuple[RowAccessPolicyMeta, ...]:
    """Return all row access policies on the table (possibly empty)."""
    ...

list_all_row_access_policies

list_all_row_access_policies() -> tuple[RowAccessPolicyMeta, ...]

Return every row access policy known to the catalog.

Source code in src/bqemulator/catalog/repository.py
def list_all_row_access_policies(self) -> tuple[RowAccessPolicyMeta, ...]:
    """Return every row access policy known to the catalog."""
    ...

get_row_access_policy

get_row_access_policy(
    project_id: str, dataset_id: str, table_id: str, policy_id: str
) -> RowAccessPolicyMeta | None

Return a single policy or None.

Source code in src/bqemulator/catalog/repository.py
def get_row_access_policy(
    self,
    project_id: str,
    dataset_id: str,
    table_id: str,
    policy_id: str,
) -> RowAccessPolicyMeta | None:
    """Return a single policy or ``None``."""
    ...

create_row_access_policy

create_row_access_policy(policy: RowAccessPolicyMeta) -> RowAccessPolicyMeta

Insert a new row access policy. Raises AlreadyExistsError on conflict.

Source code in src/bqemulator/catalog/repository.py
def create_row_access_policy(
    self,
    policy: RowAccessPolicyMeta,
) -> RowAccessPolicyMeta:
    """Insert a new row access policy. Raises AlreadyExistsError on conflict."""
    ...

update_row_access_policy

update_row_access_policy(policy: RowAccessPolicyMeta) -> RowAccessPolicyMeta

Replace an existing row access policy. Raises NotFoundError if absent.

Source code in src/bqemulator/catalog/repository.py
def update_row_access_policy(
    self,
    policy: RowAccessPolicyMeta,
) -> RowAccessPolicyMeta:
    """Replace an existing row access policy. Raises NotFoundError if absent."""
    ...

delete_row_access_policy

delete_row_access_policy(
    project_id: str,
    dataset_id: str,
    table_id: str,
    policy_id: str,
    *,
    not_found_ok: bool = False,
) -> None

Delete a row access policy.

Source code in src/bqemulator/catalog/repository.py
def delete_row_access_policy(
    self,
    project_id: str,
    dataset_id: str,
    table_id: str,
    policy_id: str,
    *,
    not_found_ok: bool = False,
) -> None:
    """Delete a row access policy."""
    ...

bqemulator.catalog.models

bqemulator.catalog.models

Frozen Pydantic models for catalog entities.

These models are the single in-memory shape for metadata. They are serialized to JSON for storage (in DuckDB catalog tables) and to REST responses.

Models are frozen=True — any mutation must use .model_copy(update=...) which produces a new instance. This is cheap (shallow copy) and makes the catalog thread/task-safe by construction.

TableFieldSchema

Bases: _Frozen

A single column in a BigQuery table schema.

Matches the TableFieldSchema REST resource.

Source code in src/bqemulator/catalog/models.py
class TableFieldSchema(_Frozen):
    """A single column in a BigQuery table schema.

    Matches the ``TableFieldSchema`` REST resource.
    """

    name: str
    type: str  # INT64 | FLOAT64 | ... | GEOGRAPHY | RANGE | INTERVAL | ...
    mode: FieldMode = "NULLABLE"
    fields: tuple[TableFieldSchema, ...] = ()  # nested STRUCT fields
    description: str | None = None
    max_length: int | None = None
    precision: int | None = None
    scale: int | None = None
    default_value_expression: str | None = None
    collation: str | None = None
    rounding_mode: str | None = None
    # RANGE field subtype. Required when ``type == "RANGE"``; the inner
    # type is one of DATE / DATETIME / TIMESTAMP. Matches BigQuery's
    # REST ``rangeElementType`` shape.
    range_element_type: TableFieldSchema | None = None

TableSchema

Bases: _Frozen

Ordered collection of :class:TableFieldSchema.

Source code in src/bqemulator/catalog/models.py
class TableSchema(_Frozen):
    """Ordered collection of :class:`TableFieldSchema`."""

    fields: tuple[TableFieldSchema, ...] = ()

TimePartitioning

Bases: _Frozen

Time-unit partitioning configuration.

Source code in src/bqemulator/catalog/models.py
class TimePartitioning(_Frozen):
    """Time-unit partitioning configuration."""

    type: PartitionType = "DAY"
    field: str | None = None  # None => ingestion-time partitioned
    expiration_ms: int | None = None
    require_partition_filter: bool = False

RangePartitioning

Bases: _Frozen

Integer-range partitioning configuration.

Source code in src/bqemulator/catalog/models.py
class RangePartitioning(_Frozen):
    """Integer-range partitioning configuration."""

    field: str
    start: int
    end: int
    interval: int

Clustering

Bases: _Frozen

Clustering configuration.

Source code in src/bqemulator/catalog/models.py
class Clustering(_Frozen):
    """Clustering configuration."""

    fields: tuple[str, ...]

AccessEntry

Bases: _Frozen

A single entry in a dataset's access array.

Mirrors the BigQuery REST Dataset.access shape. The fields are mutually exclusive — exactly one of role + (user_by_email | group_by_email | domain | special_group | iam_member), view, dataset, or routine should be populated. The model does not enforce mutual exclusion (the REST adapter does), so callers can deserialize a heterogeneous array uniformly.

Source code in src/bqemulator/catalog/models.py
class AccessEntry(_Frozen):
    """A single entry in a dataset's ``access`` array.

    Mirrors the BigQuery REST ``Dataset.access`` shape. The fields are
    mutually exclusive — exactly one of ``role + (user_by_email |
    group_by_email | domain | special_group | iam_member)``, ``view``,
    ``dataset``, or ``routine`` should be populated. The model does
    not enforce mutual exclusion (the REST adapter does), so callers
    can deserialize a heterogeneous array uniformly.
    """

    role: str | None = None  # OWNER / WRITER / READER, or roles/...
    user_by_email: str | None = None
    group_by_email: str | None = None
    domain: str | None = None
    special_group: str | None = None
    iam_member: str | None = None
    view: tuple[str, str, str] | None = None  # (project, dataset, view-id)
    routine: tuple[str, str, str] | None = None
    dataset: tuple[str, str] | None = None  # (project, dataset)

DatasetMeta

Bases: _Frozen

Metadata for a BigQuery dataset.

Source code in src/bqemulator/catalog/models.py
class DatasetMeta(_Frozen):
    """Metadata for a BigQuery dataset."""

    project_id: str
    dataset_id: str
    friendly_name: str | None = None
    description: str | None = None
    labels: dict[str, str] = Field(default_factory=dict)
    location: str = "US"
    default_table_expiration_ms: int | None = None
    default_partition_expiration_ms: int | None = None
    default_collation: str | None = None
    creation_time: datetime
    last_modified_time: datetime
    etag: str
    is_case_insensitive: bool = False
    access_entries: tuple[AccessEntry, ...] = ()

TableMeta

Bases: _Frozen

Metadata for a BigQuery table, view, or related entity.

Source code in src/bqemulator/catalog/models.py
class TableMeta(_Frozen):
    """Metadata for a BigQuery table, view, or related entity."""

    project_id: str
    dataset_id: str
    table_id: str
    table_type: TableType = "TABLE"
    schema_: TableSchema = Field(default_factory=TableSchema, alias="schema")
    friendly_name: str | None = None
    description: str | None = None
    labels: dict[str, str] = Field(default_factory=dict)
    time_partitioning: TimePartitioning | None = None
    range_partitioning: RangePartitioning | None = None
    clustering: Clustering | None = None
    expiration_time: datetime | None = None
    creation_time: datetime
    last_modified_time: datetime
    num_rows: int = 0
    num_bytes: int = 0
    etag: str

    # View-specific
    view_query: str | None = None
    use_legacy_sql: bool = False

    # Snapshot / Clone / Materialized-view specific
    base_table: str | None = None  # project.dataset.table of source
    snapshot_time: datetime | None = None

SnapshotMeta

Bases: _Frozen

Metadata for a captured snapshot.

AUTO snapshots power FOR SYSTEM_TIME AS OF time travel. They live in the reserved _bqemulator_snapshots DuckDB schema and expire after the configured retention window.

USER snapshots back CREATE SNAPSHOT TABLE statements. They live in the regular project__dataset schema, appear in the tables catalog with table_type=SNAPSHOT, and never expire under the retention policy — they are only removed by an explicit DROP SNAPSHOT TABLE.

Source code in src/bqemulator/catalog/models.py
class SnapshotMeta(_Frozen):
    """Metadata for a captured snapshot.

    ``AUTO`` snapshots power ``FOR SYSTEM_TIME AS OF`` time travel. They
    live in the reserved ``_bqemulator_snapshots`` DuckDB schema and
    expire after the configured retention window.

    ``USER`` snapshots back ``CREATE SNAPSHOT TABLE`` statements. They
    live in the regular ``project__dataset`` schema, appear in the
    ``tables`` catalog with ``table_type=SNAPSHOT``, and never expire
    under the retention policy — they are only removed by an explicit
    ``DROP SNAPSHOT TABLE``.
    """

    snapshot_id: str  # DuckDB identifier in the snapshots schema
    project_id: str
    dataset_id: str
    table_id: str  # source table id
    snapshot_time: datetime
    kind: SnapshotKind = "AUTO"
    duckdb_schema: str  # where the snapshot physically lives
    duckdb_table: str  # snapshot table name within the schema
    expires_at: datetime | None = None

MaterializedViewMeta

Bases: _Frozen

Metadata for a materialized view.

The view's physical rows live in a regular DuckDB table in the dataset's schema, so TableMeta still carries the schema and identity. MaterializedViewMeta captures the additional data the refresh subsystem needs: the BigQuery source query, its base table dependencies, staleness, and refresh bookkeeping.

Source code in src/bqemulator/catalog/models.py
class MaterializedViewMeta(_Frozen):
    """Metadata for a materialized view.

    The view's physical rows live in a regular DuckDB table in the
    dataset's schema, so ``TableMeta`` still carries the schema and
    identity. ``MaterializedViewMeta`` captures the additional data
    the refresh subsystem needs: the BigQuery source query, its base
    table dependencies, staleness, and refresh bookkeeping.
    """

    project_id: str
    dataset_id: str
    table_id: str
    view_query: str  # BigQuery SQL stored verbatim
    base_tables: tuple[tuple[str, str, str], ...]  # (project, dataset, table)
    last_refresh_time: datetime
    is_stale: bool = False

PartitionMeta

Bases: _Frozen

Derived metadata for a single partition slice of a partitioned table.

Not a persisted catalog entity — synthesised on demand from the underlying DuckDB rows when INFORMATION_SCHEMA.PARTITIONS is queried. partition_id follows BigQuery's documented format:

  • DAY-partitioned: "YYYYMMDD" (e.g. "20260520").
  • HOUR-partitioned: "YYYYMMDDHH".
  • MONTH-partitioned: "YYYYMM".
  • YEAR-partitioned: "YYYY".
  • Integer-range partitioned: stringified bucket start (e.g. "100").
  • Unpartitioned tables: "__NULL__" (BigQuery's documented sentinel).
Source code in src/bqemulator/catalog/models.py
class PartitionMeta(_Frozen):
    """Derived metadata for a single partition slice of a partitioned table.

    Not a persisted catalog entity — synthesised on demand from the
    underlying DuckDB rows when ``INFORMATION_SCHEMA.PARTITIONS`` is
    queried. ``partition_id`` follows BigQuery's documented format:

    * DAY-partitioned: ``"YYYYMMDD"`` (e.g. ``"20260520"``).
    * HOUR-partitioned: ``"YYYYMMDDHH"``.
    * MONTH-partitioned: ``"YYYYMM"``.
    * YEAR-partitioned: ``"YYYY"``.
    * Integer-range partitioned: stringified bucket start (e.g. ``"100"``).
    * Unpartitioned tables: ``"__NULL__"`` (BigQuery's documented sentinel).
    """

    table_catalog: str
    table_schema: str
    table_name: str
    partition_id: str
    total_rows: int = 0
    total_logical_bytes: int = 0
    last_modified_time: datetime
    storage_tier: Literal["ACTIVE", "LONG_TERM"] = "ACTIVE"

RowAccessPolicyMeta

Bases: _Frozen

Metadata for a BigQuery row access policy.

A row access policy restricts a SELECT against project.dataset.table to the rows for which filter_predicate evaluates to TRUE and where the caller's IAM-member identity matches one of grantees. See ADR 0018 for the enforcement model and matching rules.

Source code in src/bqemulator/catalog/models.py
class RowAccessPolicyMeta(_Frozen):
    """Metadata for a BigQuery row access policy.

    A row access policy restricts a SELECT against ``project.dataset.table``
    to the rows for which ``filter_predicate`` evaluates to TRUE *and*
    where the caller's IAM-member identity matches one of ``grantees``.
    See ADR 0018 for the enforcement model and matching rules.
    """

    project_id: str
    dataset_id: str
    table_id: str
    policy_id: str
    filter_predicate: str
    grantees: tuple[str, ...] = ()
    creation_time: datetime
    last_modified_time: datetime
    etag: str

RoutineArgument

Bases: _Frozen

A single argument to a routine.

Source code in src/bqemulator/catalog/models.py
class RoutineArgument(_Frozen):
    """A single argument to a routine."""

    name: str
    argument_kind: Literal["FIXED_TYPE", "ANY_TYPE"] = "FIXED_TYPE"
    mode: Literal["IN", "OUT", "INOUT"] = "IN"
    data_type: dict[str, Any] | None = None  # BigQuery-typed structure

RoutineMeta

Bases: _Frozen

Metadata for a BigQuery routine (UDF, procedure, or TVF).

Source code in src/bqemulator/catalog/models.py
class RoutineMeta(_Frozen):
    """Metadata for a BigQuery routine (UDF, procedure, or TVF)."""

    project_id: str
    dataset_id: str
    routine_id: str
    routine_type: RoutineType
    language: RoutineLanguage = "SQL"
    definition_body: str
    arguments: tuple[RoutineArgument, ...] = ()
    return_type: dict[str, Any] | None = None
    imported_libraries: tuple[str, ...] = ()
    description: str | None = None
    determinism_level: Literal["DETERMINISTIC", "NOT_DETERMINISTIC"] | None = None
    creation_time: datetime
    last_modified_time: datetime
    etag: str

JobMeta

Bases: _Frozen

Metadata for a BigQuery job.

Source code in src/bqemulator/catalog/models.py
class JobMeta(_Frozen):
    """Metadata for a BigQuery job."""

    project_id: str
    job_id: str
    job_type: JobType
    state: JobState
    configuration: dict[str, Any]
    statistics: dict[str, Any] = Field(default_factory=dict)
    error_result: dict[str, Any] | None = None
    creation_time: datetime
    start_time: datetime | None = None
    end_time: datetime | None = None
    user_email: str | None = None
    etag: str

bqemulator.server

bqemulator.server

Composition root — wires every subsystem into a running server.

This module is the ONLY place that constructs top-level objects. Every other subsystem takes its collaborators via constructor injection.

Public entry points:

  • :func:run_forever — blocking, used by the CLI's start command.
  • :class:EmulatorServer — programmatic, used by the pytest fixture and for embedding into other Python processes.

EmulatorServer

Programmatic lifecycle for the emulator.

Example::

server = EmulatorServer(Settings())
await server.start()
# ... use the emulator ...
await server.stop()

Or synchronously via :meth:run_forever, used by the CLI.

Source code in src/bqemulator/server.py
class EmulatorServer:
    """Programmatic lifecycle for the emulator.

    Example::

        server = EmulatorServer(Settings())
        await server.start()
        # ... use the emulator ...
        await server.stop()

    Or synchronously via :meth:`run_forever`, used by the CLI.
    """

    def __init__(self, settings: Settings, *, clock: Clock | None = None) -> None:
        self._settings = settings
        self._clock: Clock = clock or SystemClock()
        self._engine = DuckDBEngine(settings)
        self._metrics = MetricsRegistry()
        self._events = EventBus()
        self._catalog: CatalogRepository
        if settings.persistence_mode is PersistenceMode.EPHEMERAL:
            # Ephemeral mode: memory catalog is sufficient and avoids DuckDB
            # catalog startup cost in tight CI loops. The engine is
            # threaded through so storage introspection (used by the
            # wildcard expander) still sees DDL-created tables — REST
            # CRUD doesn't see them but the catalog cache always
            # mirrors REST, so the only "missing" surface is SQL DDL.
            self._catalog = MemoryCatalogRepository(self._engine)
        else:
            self._catalog = DuckDBCatalogRepository(self._engine)

        self._context: AppContext | None = None
        self._fastapi_server: uvicorn.Server | None = None
        self._fastapi_task: asyncio.Task[None] | None = None
        self._grpc_server: object | None = None  # grpc.aio.Server, typed loosely
        self._grpc_port: int | None = None
        self._rest_port: int | None = None
        self._gc_task: asyncio.Task[None] | None = None

    # -- Properties (post-start) -------------------------------------------

    @property
    def rest_port(self) -> int:
        """The actual REST port (useful when the user passed 0)."""
        if self._rest_port is None:
            raise RuntimeError("EmulatorServer not started")
        return self._rest_port

    @property
    def grpc_port(self) -> int:
        """The actual gRPC port (useful when the user passed 0)."""
        if self._grpc_port is None:
            raise RuntimeError("EmulatorServer not started")
        return self._grpc_port

    @property
    def rest_url(self) -> str:
        """Base URL for the REST endpoint."""
        return f"http://{self._settings.rest_host}:{self.rest_port}"

    @property
    def grpc_endpoint(self) -> str:
        """``host:port`` form of the gRPC endpoint."""
        return f"{self._settings.grpc_host}:{self.grpc_port}"

    # -- Lifecycle ---------------------------------------------------------

    async def start(self) -> None:
        """Start all subsystems. Returns once the servers are accepting traffic."""
        configure_logging(level=self._settings.log_level, fmt=self._settings.log_format)
        configure_tracing(self._settings)
        _log.info(
            "bqemulator.start",
            version=__version__,
            persistence_mode=self._settings.persistence_mode.value,
        )

        await self._engine.start()

        # For the DuckDB-backed catalog, run migrations up-front so readiness
        # probes succeed immediately.
        if isinstance(self._catalog, DuckDBCatalogRepository):
            self._catalog.ensure_ready()

        udf_registry = UDFRegistry(self._settings)
        snapshot_manager = SnapshotManager(
            engine=self._engine,
            catalog=self._catalog,
            clock=self._clock,
            events=self._events,
            retention_days=self._settings.time_travel_retention_days,
        )
        row_access_manager = RowAccessPolicyManager(
            catalog=self._catalog,
            clock=self._clock,
        )
        # Storage Write API stream registry shared between the gRPC servicer
        # (which owns lifecycle) and the admin /admin/streams endpoint
        # (which reads it for diagnostics). The servicer wires the
        # metric-cleanup callback after AppContext is built (see
        # ``BigQueryWriteHandler.__init__``).
        write_stream_manager = WriteStreamManager()
        # Upload host (resumable / multipart). The manager owns the
        # in-memory session map and the temp staging directory under
        # ``Settings.upload_staging_dir`` (or the system tempdir when
        # unset). See ADR 0029.
        from bqemulator.jobs.upload_session_manager import UploadSessionManager

        upload_session_manager = UploadSessionManager(
            staging_dir=self._settings.upload_staging_dir,
            max_bytes=self._settings.upload_max_bytes,
            ttl_seconds=self._settings.upload_session_ttl_seconds,
            clock=self._clock,
        )
        self._context = AppContext(
            settings=self._settings,
            clock=self._clock,
            engine=self._engine,
            catalog=self._catalog,
            metrics=self._metrics,
            events=self._events,
            udf_registry=udf_registry,
            snapshots=snapshot_manager,
            row_access=row_access_manager,
            write_streams=write_stream_manager,
            upload_sessions=upload_session_manager,
        )
        # Hydrate routines from the catalog into DuckDB. Best-effort —
        # a broken routine in the catalog should not block startup.
        udf_registry.hydrate(self._catalog, self._engine)
        # Rebuild materialized-view event subscriptions from the catalog.
        hydrate_subscriptions(self._context)
        # Kick off snapshot GC in the background.
        gc_interval = max(
            60.0,
            self._settings.time_travel_retention_days * 86400.0 / 48.0,
        )
        self._gc_task = asyncio.create_task(
            snapshot_manager.run_gc_loop(interval_seconds=gc_interval),
            name="bqemulator.snapshot-gc",
        )

        # REST (uvicorn).
        # Lifespan events are disabled because we manage all resources
        # (DuckDB, gRPC) from the composition root, not from FastAPI's
        # lifespan. This avoids an async deadlock when embedding uvicorn
        # in an already-running event loop.
        app = create_app(self._context)
        uvicorn_config = uvicorn.Config(
            app,
            host=self._settings.rest_host,
            port=self._settings.rest_port,
            log_config=None,  # structlog handles logging
            access_log=False,
            lifespan="off",
            loop="none",  # reuse the caller's event loop
        )
        self._fastapi_server = uvicorn.Server(uvicorn_config)
        # uvicorn's default signal handlers conflict with our own. We use
        # setattr to bypass mypy's attribute-defined check since uvicorn's
        # type stubs don't declare install_signal_handlers (though it is
        # a real instance method on uvicorn.Server).
        setattr(self._fastapi_server, "install_signal_handlers", lambda: None)  # noqa: B010

        self._fastapi_task = asyncio.create_task(
            self._fastapi_server.serve(),
            name="bqemulator.rest",
        )
        # Wait for uvicorn to bind the socket, then pull the actual port.
        await self._wait_for_uvicorn_started(timeout=10.0)
        servers = getattr(self._fastapi_server, "servers", ())
        if servers and servers[0].sockets:
            self._rest_port = servers[0].sockets[0].getsockname()[1]
        else:  # pragma: no cover
            self._rest_port = self._settings.rest_port
        _log.info("rest.listen", host=self._settings.rest_host, port=self._rest_port)

        # gRPC
        grpc_server, grpc_port = build_grpc_server(self._context)
        await grpc_server.start()
        self._grpc_server = grpc_server
        self._grpc_port = grpc_port

    async def _wait_for_uvicorn_started(self, *, timeout: float) -> None:  # noqa: ASYNC109 — deliberate timeout parameter
        """Poll ``uvicorn.Server.started`` until True or raise on timeout."""
        if self._fastapi_server is None:
            raise InternalError("FastAPI server not initialized")
        deadline = asyncio.get_event_loop().time() + timeout
        while not self._fastapi_server.started:
            if asyncio.get_event_loop().time() > deadline:
                raise TimeoutError("uvicorn did not signal 'started' in time")
            await asyncio.sleep(0.02)

    async def stop(self) -> None:
        """Stop all subsystems. Idempotent."""
        _log.info("bqemulator.stop")

        # Snapshot GC
        if self._gc_task is not None:
            self._gc_task.cancel()
            with contextlib.suppress(asyncio.CancelledError, Exception):
                await self._gc_task
            self._gc_task = None

        # MV subscriptions — unwire from the event bus.
        if self._context is not None:
            clear_subscriptions_for_context(self._context)

        # gRPC
        if self._grpc_server is not None:
            with contextlib.suppress(Exception):
                await self._grpc_server.stop(grace=2.0)  # type: ignore[attr-defined]
            self._grpc_server = None

        # REST
        if self._fastapi_server is not None:
            self._fastapi_server.should_exit = True
        if self._fastapi_task is not None:
            with contextlib.suppress(asyncio.CancelledError, Exception):
                await self._fastapi_task
            self._fastapi_task = None
        self._fastapi_server = None

        # Storage
        await self._engine.stop()

    # -- Synchronous entry --------------------------------------------------

    def run_forever(self) -> None:
        """Start the server and block on SIGINT/SIGTERM."""
        asyncio.run(self._run_forever_async())

    async def _run_forever_async(self) -> None:
        await self.start()
        stop_event = asyncio.Event()

        def _on_signal(_signum: int, _frame: FrameType | None) -> None:
            stop_event.set()

        loop = asyncio.get_running_loop()
        for sig in (signal.SIGINT, signal.SIGTERM):
            try:
                loop.add_signal_handler(sig, stop_event.set)
            except NotImplementedError:  # pragma: no cover — Windows
                signal.signal(sig, _on_signal)

        try:
            await stop_event.wait()
        finally:
            await self.stop()

rest_port property

rest_port: int

The actual REST port (useful when the user passed 0).

grpc_port property

grpc_port: int

The actual gRPC port (useful when the user passed 0).

rest_url property

rest_url: str

Base URL for the REST endpoint.

grpc_endpoint property

grpc_endpoint: str

host:port form of the gRPC endpoint.

start async

start() -> None

Start all subsystems. Returns once the servers are accepting traffic.

Source code in src/bqemulator/server.py
async def start(self) -> None:
    """Start all subsystems. Returns once the servers are accepting traffic."""
    configure_logging(level=self._settings.log_level, fmt=self._settings.log_format)
    configure_tracing(self._settings)
    _log.info(
        "bqemulator.start",
        version=__version__,
        persistence_mode=self._settings.persistence_mode.value,
    )

    await self._engine.start()

    # For the DuckDB-backed catalog, run migrations up-front so readiness
    # probes succeed immediately.
    if isinstance(self._catalog, DuckDBCatalogRepository):
        self._catalog.ensure_ready()

    udf_registry = UDFRegistry(self._settings)
    snapshot_manager = SnapshotManager(
        engine=self._engine,
        catalog=self._catalog,
        clock=self._clock,
        events=self._events,
        retention_days=self._settings.time_travel_retention_days,
    )
    row_access_manager = RowAccessPolicyManager(
        catalog=self._catalog,
        clock=self._clock,
    )
    # Storage Write API stream registry shared between the gRPC servicer
    # (which owns lifecycle) and the admin /admin/streams endpoint
    # (which reads it for diagnostics). The servicer wires the
    # metric-cleanup callback after AppContext is built (see
    # ``BigQueryWriteHandler.__init__``).
    write_stream_manager = WriteStreamManager()
    # Upload host (resumable / multipart). The manager owns the
    # in-memory session map and the temp staging directory under
    # ``Settings.upload_staging_dir`` (or the system tempdir when
    # unset). See ADR 0029.
    from bqemulator.jobs.upload_session_manager import UploadSessionManager

    upload_session_manager = UploadSessionManager(
        staging_dir=self._settings.upload_staging_dir,
        max_bytes=self._settings.upload_max_bytes,
        ttl_seconds=self._settings.upload_session_ttl_seconds,
        clock=self._clock,
    )
    self._context = AppContext(
        settings=self._settings,
        clock=self._clock,
        engine=self._engine,
        catalog=self._catalog,
        metrics=self._metrics,
        events=self._events,
        udf_registry=udf_registry,
        snapshots=snapshot_manager,
        row_access=row_access_manager,
        write_streams=write_stream_manager,
        upload_sessions=upload_session_manager,
    )
    # Hydrate routines from the catalog into DuckDB. Best-effort —
    # a broken routine in the catalog should not block startup.
    udf_registry.hydrate(self._catalog, self._engine)
    # Rebuild materialized-view event subscriptions from the catalog.
    hydrate_subscriptions(self._context)
    # Kick off snapshot GC in the background.
    gc_interval = max(
        60.0,
        self._settings.time_travel_retention_days * 86400.0 / 48.0,
    )
    self._gc_task = asyncio.create_task(
        snapshot_manager.run_gc_loop(interval_seconds=gc_interval),
        name="bqemulator.snapshot-gc",
    )

    # REST (uvicorn).
    # Lifespan events are disabled because we manage all resources
    # (DuckDB, gRPC) from the composition root, not from FastAPI's
    # lifespan. This avoids an async deadlock when embedding uvicorn
    # in an already-running event loop.
    app = create_app(self._context)
    uvicorn_config = uvicorn.Config(
        app,
        host=self._settings.rest_host,
        port=self._settings.rest_port,
        log_config=None,  # structlog handles logging
        access_log=False,
        lifespan="off",
        loop="none",  # reuse the caller's event loop
    )
    self._fastapi_server = uvicorn.Server(uvicorn_config)
    # uvicorn's default signal handlers conflict with our own. We use
    # setattr to bypass mypy's attribute-defined check since uvicorn's
    # type stubs don't declare install_signal_handlers (though it is
    # a real instance method on uvicorn.Server).
    setattr(self._fastapi_server, "install_signal_handlers", lambda: None)  # noqa: B010

    self._fastapi_task = asyncio.create_task(
        self._fastapi_server.serve(),
        name="bqemulator.rest",
    )
    # Wait for uvicorn to bind the socket, then pull the actual port.
    await self._wait_for_uvicorn_started(timeout=10.0)
    servers = getattr(self._fastapi_server, "servers", ())
    if servers and servers[0].sockets:
        self._rest_port = servers[0].sockets[0].getsockname()[1]
    else:  # pragma: no cover
        self._rest_port = self._settings.rest_port
    _log.info("rest.listen", host=self._settings.rest_host, port=self._rest_port)

    # gRPC
    grpc_server, grpc_port = build_grpc_server(self._context)
    await grpc_server.start()
    self._grpc_server = grpc_server
    self._grpc_port = grpc_port

stop async

stop() -> None

Stop all subsystems. Idempotent.

Source code in src/bqemulator/server.py
async def stop(self) -> None:
    """Stop all subsystems. Idempotent."""
    _log.info("bqemulator.stop")

    # Snapshot GC
    if self._gc_task is not None:
        self._gc_task.cancel()
        with contextlib.suppress(asyncio.CancelledError, Exception):
            await self._gc_task
        self._gc_task = None

    # MV subscriptions — unwire from the event bus.
    if self._context is not None:
        clear_subscriptions_for_context(self._context)

    # gRPC
    if self._grpc_server is not None:
        with contextlib.suppress(Exception):
            await self._grpc_server.stop(grace=2.0)  # type: ignore[attr-defined]
        self._grpc_server = None

    # REST
    if self._fastapi_server is not None:
        self._fastapi_server.should_exit = True
    if self._fastapi_task is not None:
        with contextlib.suppress(asyncio.CancelledError, Exception):
            await self._fastapi_task
        self._fastapi_task = None
    self._fastapi_server = None

    # Storage
    await self._engine.stop()

run_forever

run_forever() -> None

Start the server and block on SIGINT/SIGTERM.

Source code in src/bqemulator/server.py
def run_forever(self) -> None:
    """Start the server and block on SIGINT/SIGTERM."""
    asyncio.run(self._run_forever_async())

run_forever

run_forever(settings: Settings) -> None

Module-level entry point used by the CLI.

Source code in src/bqemulator/server.py
def run_forever(settings: Settings) -> None:
    """Module-level entry point used by the CLI."""
    EmulatorServer(settings).run_forever()

bqemulator.testing.fixtures

bqemulator.testing.fixtures

Pytest fixtures for consumers of bqemulator.

Registered as a pytest plugin via the pytest11 entry point declared in pyproject.toml. Installing bqemulator automatically makes these fixtures available — no conftest.py wiring required.

Fixtures

bqemu_settings Session-scoped. Returns a :class:Settings configured for ephemeral in-memory use on random free ports. Override via indirect parametrization for per-test tweaks.

bqemu_server Session-scoped. A running :class:EmulatorServer. Sets the BIGQUERY_EMULATOR_HOST env var for the session and unsets it on teardown.

bqemu_endpoint Session-scoped. {"rest_url": ..., "grpc_endpoint": ...} dict.

bqemu_client Function-scoped. A configured google.cloud.bigquery.Client pointing at the emulator. Only available if google-cloud-bigquery is installed.

EmulatorEndpoint dataclass

Connection info for fixtures and tests.

Source code in src/bqemulator/testing/fixtures.py
@dataclass(slots=True, frozen=True)
class EmulatorEndpoint:
    """Connection info for fixtures and tests."""

    rest_url: str
    grpc_endpoint: str
    project_id: str

bqemu_settings

bqemu_settings() -> Settings

Default settings for session-scoped fixtures.

Ephemeral mode, random ports, INFO logging.

Source code in src/bqemulator/testing/fixtures.py
@pytest.fixture(scope="session")
def bqemu_settings() -> Settings:
    """Default settings for session-scoped fixtures.

    Ephemeral mode, random ports, INFO logging.
    """
    from bqemulator.config import PersistenceMode, Settings

    return Settings(
        persistence_mode=PersistenceMode.EPHEMERAL,
        rest_host="127.0.0.1",
        rest_port=0,
        grpc_host="127.0.0.1",
        grpc_port=0,
    )

bqemu_server

bqemu_server(bqemu_settings: Settings) -> Iterator[EmulatorServer]

Start an in-process emulator for the entire test session.

Runs the server on a background asyncio loop to avoid conflicting with test event loops.

Source code in src/bqemulator/testing/fixtures.py
@pytest.fixture(scope="session")
def bqemu_server(
    bqemu_settings: Settings,
) -> Iterator[EmulatorServer]:
    """Start an in-process emulator for the entire test session.

    Runs the server on a background asyncio loop to avoid conflicting with
    test event loops.
    """
    # Deferred import so the plugin module itself stays cheap.
    from bqemulator.testing._thread_runner import ThreadedEmulator

    threaded = ThreadedEmulator(bqemu_settings)
    threaded.start()

    previous = os.environ.get("BIGQUERY_EMULATOR_HOST")
    os.environ["BIGQUERY_EMULATOR_HOST"] = f"{bqemu_settings.rest_host}:{threaded.server.rest_port}"
    try:
        yield threaded.server
    finally:
        if previous is None:
            os.environ.pop("BIGQUERY_EMULATOR_HOST", None)
        else:
            os.environ["BIGQUERY_EMULATOR_HOST"] = previous
        threaded.stop()

bqemu_endpoint

bqemu_endpoint(
    bqemu_settings: Settings, bqemu_server: EmulatorServer
) -> EmulatorEndpoint

Session-scoped :class:EmulatorEndpoint.

Source code in src/bqemulator/testing/fixtures.py
@pytest.fixture(scope="session")
def bqemu_endpoint(
    bqemu_settings: Settings,
    bqemu_server: EmulatorServer,
) -> EmulatorEndpoint:
    """Session-scoped :class:`EmulatorEndpoint`."""
    return EmulatorEndpoint(
        rest_url=bqemu_server.rest_url,
        grpc_endpoint=bqemu_server.grpc_endpoint,
        project_id=bqemu_settings.default_project_id,
    )

bqemu_client

bqemu_client(bqemu_endpoint: EmulatorEndpoint) -> object

Return a google.cloud.bigquery.Client pointed at the emulator.

Raises :class:pytest.skip.Exception if google-cloud-bigquery is not installed.

Source code in src/bqemulator/testing/fixtures.py
@pytest.fixture
def bqemu_client(bqemu_endpoint: EmulatorEndpoint) -> object:
    """Return a ``google.cloud.bigquery.Client`` pointed at the emulator.

    Raises :class:`pytest.skip.Exception` if ``google-cloud-bigquery`` is
    not installed.
    """
    try:
        from google.api_core.client_options import ClientOptions
        from google.auth.credentials import AnonymousCredentials
        from google.cloud import bigquery
    except ImportError:  # pragma: no cover
        pytest.skip("google-cloud-bigquery not installed")

    return bigquery.Client(
        project=bqemu_endpoint.project_id,
        credentials=AnonymousCredentials(),  # type: ignore[no-untyped-call]
        client_options=ClientOptions(api_endpoint=bqemu_endpoint.rest_url),
    )

bqemulator.testing.testcontainers

bqemulator.testing.testcontainers

Testcontainers wrapper for the published Docker image.

Use this when:

  • You want a real subprocess (matches CI/CD more closely than in-process).
  • You are testing clients in languages other than Python.
  • You want persistence across test functions.

Example::

with BigQueryEmulatorContainer() as emu:
    rest_url = emu.get_rest_url()
    grpc_endpoint = emu.get_grpc_endpoint()
    # ... run tests ...

BigQueryEmulatorContainer

Bases: DockerContainer

Manages a bqemulator Docker container for testing.

Source code in src/bqemulator/testing/testcontainers.py
class BigQueryEmulatorContainer(DockerContainer):
    """Manages a bqemulator Docker container for testing."""

    def __init__(
        self,
        image: str | None = None,
        *,
        rest_port: int = DEFAULT_REST_PORT,
        grpc_port: int = DEFAULT_GRPC_PORT,
        gcs_local_root_host: str | None = None,
    ) -> None:
        # Allow overriding via env var — useful in CI where an in-tree
        # build is loaded as `ghcr.io/jjviscomi/bqemulator:ci-<sha>`.
        effective_image = image or os.environ.get("BQEMU_IMAGE", DEFAULT_IMAGE)
        super().__init__(effective_image)
        self._rest_internal = rest_port
        self._grpc_internal = grpc_port
        self.with_exposed_ports(rest_port, grpc_port)
        # Inside the container, bind on all interfaces so the exposed
        # ports are reachable from the host. The container's network
        # scope already restricts access.
        self.with_env("BQEMU_REST_HOST", "0.0.0.0")  # noqa: S104
        self.with_env("BQEMU_GRPC_HOST", "0.0.0.0")  # noqa: S104
        self.with_env("BQEMU_REST_PORT", str(rest_port))
        self.with_env("BQEMU_GRPC_PORT", str(grpc_port))
        # Admin endpoints are off by default in the published image.
        # The testcontainer wrapper always opts them in so the Python /
        # Node / Go / Java E2E suites can exercise the ``/admin/*``
        # surface without a custom image build.
        self.with_env("BQEMU_ADMIN_ENABLED", "1")
        # Load/extract jobs that reference ``gs://`` URIs need a
        # host→container bind mount so the file the test writes on the
        # host is visible to the executor inside the container. The
        # caller passes a host directory; the wrapper mounts it at
        # ``/var/lib/bqemu-gcs`` and points ``BQEMU_GCS_LOCAL_ROOT``
        # at the same path. The test writes its Avro/ORC files under
        # the host dir and references them via ``gs://anybucket/<file>``
        # — the executor's ``_resolve_uri`` strips the ``gs://``
        # prefix and joins the remaining path under the local root.
        if gcs_local_root_host is not None:
            self.with_volume_mapping(
                gcs_local_root_host,
                "/var/lib/bqemu-gcs",
                mode="rw",
            )
            self.with_env("BQEMU_GCS_LOCAL_ROOT", "/var/lib/bqemu-gcs")

    def start(self) -> Self:
        """Start the container and wait until the REST listener is ready.

        ``testcontainers`` emits a ``DeprecationWarning`` when
        ``wait_for_logs`` is passed a plain string / regex because the
        library plans to replace it with a structured wait strategy.
        We silence that warning at the call site so consumers of this
        wrapper don't need to edit their own pytest filter list.
        """
        super().start()
        with warnings.catch_warnings():
            warnings.simplefilter("ignore", DeprecationWarning)
            wait_for_logs(self, _READY_RE.pattern, timeout=30)
        return self

    def get_rest_url(self) -> str:
        """Return the externally-reachable REST URL."""
        host = self.get_container_host_ip()
        port = self.get_exposed_port(self._rest_internal)
        return f"http://{host}:{port}"

    def get_grpc_endpoint(self) -> str:
        """Return the externally-reachable gRPC endpoint (host:port)."""
        host = self.get_container_host_ip()
        port = self.get_exposed_port(self._grpc_internal)
        return f"{host}:{port}"

start

start() -> Self

Start the container and wait until the REST listener is ready.

testcontainers emits a DeprecationWarning when wait_for_logs is passed a plain string / regex because the library plans to replace it with a structured wait strategy. We silence that warning at the call site so consumers of this wrapper don't need to edit their own pytest filter list.

Source code in src/bqemulator/testing/testcontainers.py
def start(self) -> Self:
    """Start the container and wait until the REST listener is ready.

    ``testcontainers`` emits a ``DeprecationWarning`` when
    ``wait_for_logs`` is passed a plain string / regex because the
    library plans to replace it with a structured wait strategy.
    We silence that warning at the call site so consumers of this
    wrapper don't need to edit their own pytest filter list.
    """
    super().start()
    with warnings.catch_warnings():
        warnings.simplefilter("ignore", DeprecationWarning)
        wait_for_logs(self, _READY_RE.pattern, timeout=30)
    return self

get_rest_url

get_rest_url() -> str

Return the externally-reachable REST URL.

Source code in src/bqemulator/testing/testcontainers.py
def get_rest_url(self) -> str:
    """Return the externally-reachable REST URL."""
    host = self.get_container_host_ip()
    port = self.get_exposed_port(self._rest_internal)
    return f"http://{host}:{port}"

get_grpc_endpoint

get_grpc_endpoint() -> str

Return the externally-reachable gRPC endpoint (host:port).

Source code in src/bqemulator/testing/testcontainers.py
def get_grpc_endpoint(self) -> str:
    """Return the externally-reachable gRPC endpoint (host:port)."""
    host = self.get_container_host_ip()
    port = self.get_exposed_port(self._grpc_internal)
    return f"{host}:{port}"