Spotify Scio pipeline against bqemulator¶
A minimal Scio pipeline that
writes a customers table to BigQuery and reads it back. The Scala
source under src/main/scala/ is production-shaped — what you'd run
against real BigQuery or Dataflow today, with a single optional flag
for the emulator-test path.
Why this matters¶
Scio is the canonical Scala-on-Beam idiom — most Beam-on-JVM workloads
on Google's stack run through it. Targeting it against bqemulator
means engineering teams can run the same job locally that they run in
Dataflow, without spinning up a real BigQuery dataset.
How the pipeline runs end-to-end against bqemulator (v1.0.2+)¶
Beam 2.55.1's Java SDK has no built-in BIGQUERY_EMULATOR_HOST
support (the Go SDK does — see
apache/beam#34037 — but
the Java side never adopted it). v1.0.2 closes
#17 by wiring a
test-services hook directly into the scio example, with no
bqemulator-side code change:
CustomersPipeline.runaccepts an optional--bqEmulatorEndpoint. Production callers (real BigQuery / Dataflow) don't pass the flag and take scio's idiomaticsaveAsBigQueryTable(...)branch unchanged. When the flag IS present, the pipeline drops to rawBigQueryIO.writeTableRows().withTestServices( EmulatorBigQueryServices(endpoint))for the BQ-traffic redirect.EmulatorBigQueryServices(in this example, atsrc/main/scala/org/apache/beam/sdk/io/gcp/bigquery/) extends Beam'sBigQueryServicesImpland overridesgetJobService/getDatasetServiceto construct the ApiaryBigqueryclient withsetRootUrl(emulator). The class lives in Beam's package so it can reach the@VisibleForTestingconstructors onJobServiceImpl/DatasetServiceImplthat accept a pre-built client.- A
fake-gcs-serversidecar (fsouza/fake-gcs-server) handles GCS staging for Beam's defaultBATCH_LOADSwrite method. The spec brings both containers up on a shared testcontainers network and bind-mounts a single host directory into both at their respective storage roots — so files Beam stages togs://bqemu-staging/...materialise on disk where bqemulator's existingBQEMU_GCS_LOCAL_ROOTshim resolves them. - Auth suppression via Beam's
NoopCredentialFactory—--gcpCredentialFactoryClass=org.apache.beam.sdk.extensions.gcp.auth.NoopCredentialFactoryshort-circuitsOAuth2Credentials.refresh()so no token grant fires againstoauth2.googleapis.com. The example'sbuild.sbtsetsCLOUDSDK_CONFIGandGOOGLE_APPLICATION_CREDENTIALSto empty values as belt-and-suspenders defence on developer laptops with stalegcloud auth application-default loginstate.
See ADR 0034 for the full design — alternatives considered, source-of-truth references in Beam, and consequences.
What the spec exercises¶
The ScalaTest spec
(CustomersPipelineSpec)
drives the real CustomersPipeline.run and asserts:
- bqemulator + fake-gcs-server both start.
- Dataset creation via the REST surface succeeds (idempotent —
{200, 201, 409}). CustomersPipeline.runreturns 3 (the rows it intended to write).- A REST
SELECT COUNT(*)against\bqemu-demo`.scio_demo.customers` returns 3 — confirming the LOAD job actually landed the rows on disk.
The pipeline source itself stays production-ready: pointed at real
BigQuery (Dataflow) with no --bqEmulatorEndpoint, the same code
takes the scio path and writes via the real GCS staging bucket.
Layout¶
build.sbt — sbt build definition
project/build.properties — sbt version pin
src/main/scala/com/example/bqemu/CustomersPipeline.scala
src/main/scala/org/apache/beam/sdk/io/gcp/bigquery/EmulatorBigQueryServices.scala
src/test/scala/com/example/bqemu/CustomersPipelineSpec.scala
Run¶
make test runs sbt test. Requires Docker (for Testcontainers to
spin up the bqemulator + fake-gcs-server pair).
What to look for¶
build.sbtholds the Jackson stack at2.14.3viadependencyOverrides. Scio 0.14.4's Scala Jackson module is pinned to 2.14.x; transitive deps drag in jackson-databind 2.16.x and the runtime guard refuses to load against the mismatch.testcontainersis at 1.21.4 — Docker 29+ rejects docker-java clients announcing API < 1.40, and the 1.20.x line still ships an older shaded docker-java that trips that check on modern Docker Desktop. Bumping to 1.21.x fixes it.- The
EmulatorBigQueryServicesScala file lives in Beam's package (org.apache.beam.sdk.io.gcp.bigquery) — Java's package scoping is by package, not JAR, so the split-package access compiles and runs cleanly. This is the same idiom Beam's own test suite uses for theBigQueryServicesImplsubclasses. BqemuContainerandFakeGcsContainerare one-line subclasses that resolve Scala 2'sNothingself-type collapse on Java'sGenericContainer<SELF>.- The spec's
runPipelineOrDumpDetailwrapper exists because Beam's BATCH_LOADSfinishBundlecollapses every per-writer close failure into a singleIOException("Failed to close some writers")and stashes the real causes as suppressed exceptions — the wrapper prints the full chain so a debug session starting from a green test failure doesn't lose the underlying GCS error.
Adapting this to your own pipeline¶
The EmulatorBigQueryServices class and the fake-gcs-server sidecar
recipe both generalise. Copy the file under your own
src/main/scala/org/apache/beam/sdk/io/gcp/bigquery/ and call
BigQueryIO.Write.withTestServices(new
EmulatorBigQueryServices(emulatorEndpoint)) from your test-mode
branch. The sidecar pattern (shared bind mount + --gcsEndpoint=...storage/v1/)
is documented in the spec.