Skip to content

Airflow integration

Status: shipped (runnable example in docs/examples/python/airflow-dag-test/).

Airflow's BigQueryInsertJobOperator accepts a custom BigQueryHook configuration. Point the hook at the emulator:

from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator

task = BigQueryInsertJobOperator(
    task_id="daily_totals",
    configuration={
        "query": {
            "query": "SELECT * FROM sales.orders",
            "useLegacySql": False,
        },
    },
    gcp_conn_id="google_cloud_default",
)

Wiring the hook to the emulator

Two env vars + one monkey-patch is the full surface in test code.

import os
import google.auth
import google.auth._default
import google.auth.credentials

# 1. Tell the BQ client to talk to bqemulator instead of Google.
#    The ``http://`` prefix is required — the Airflow Google provider
#    forwards this verbatim into ``client_options.api_endpoint`` and
#    ``requests`` aborts without a scheme.
os.environ["BIGQUERY_EMULATOR_HOST"] = "http://localhost:9050"

# 2. Configure the Airflow connection. The extras carry the project
#    and scope; ``key_path`` is left empty because we replace the
#    whole credential-resolution path in step 3.
os.environ["AIRFLOW_CONN_GOOGLE_CLOUD_DEFAULT"] = (
    'google-cloud-platform://?{"project":"bqemu-demo","key_path":""'
    ',"scope":"https://www.googleapis.com/auth/bigquery"}'
)

# 3. Make ``google.auth.default()`` return AnonymousCredentials so
#    the hook never tries to exchange a JWT against
#    ``oauth2.googleapis.com/token`` (a synthetic SA fails there
#    with ``invalid_grant: account not found`` — bqemulator doesn't
#    serve the OAuth token endpoint).
anon = google.auth.credentials.AnonymousCredentials()
def _emu_default(scopes=None, request=None, quota_project_id=None,
                 default_scopes=None):
    return anon, "bqemu-demo"
google.auth.default = _emu_default
google.auth._default.default = _emu_default

Test DAGs using Airflow's dag.test() / TaskInstance.run(test_mode=True).