ADR 0033: Storage Read API — Arrow IPC bare-message contract¶
- Status: Accepted
Context¶
Issue #15 surfaced
a wire-format mismatch in the v1.0.0 BigQueryRead/ReadRows gRPC
servicer. Real BigQuery's
Storage Read API proto
documents two distinct Arrow byte fields:
| Field | Documented content |
|---|---|
ReadSession.arrow_schema.serialized_schema |
IPC-serialized Arrow schema (single Schema message). |
ArrowRecordBatch.serialized_record_batch |
IPC-serialized Arrow record batch (single RecordBatch message). |
bqemulator v1.0.0 packed a full Arrow IPC stream
([schema-message, batch-message, EOS-marker]) into
serialized_record_batch. Real Storage Read clients trip on the
format mismatch: google-cloud-bigquery-storage's
reader.to_arrow(session) internally calls
pyarrow.ipc.read_record_batch(bytes, schema) which refuses anything
that isn't a bare RecordBatch message — it raises
OSError: Expected IPC message of type record batch but got schema.
Decision¶
The producer-side helper
bqemulator.streaming.read_session.serialize_arrow_record_batch(batch)
emits only the bare RecordBatch IPC message bytes for the
batch passed in. Schema travels separately on
ReadSession.arrow_schema.serialized_schema (set once per
session) and the first ReadRowsResponse.arrow_schema (mirrored
for streaming readers that join mid-stream).
Contract¶
| Aspect | Behaviour |
|---|---|
| Return value | Exactly the IPC bytes for one RecordBatch message (continuation marker + metadata length + flatbuffer metadata + padding + body buffers). No schema-message prefix, no EOS-marker suffix. |
| Dictionary-encoded fields | Rejected at the producer boundary with ValueError. The wire format has only two slots (schema + batch); pyarrow's read_record_batch requires a populated DictionaryMemo to decode dict frames, which a bare-message format can't provide. The check recurses into nested types (struct, list, map, union, …) so dict children in any container also fail loudly. |
| Non-batch IPC messages | Skipped while walking the transient stream — defensive against future pyarrow message types (compression headers, etc.) that may interleave between the schema and the batch. |
| Empty input | A pyarrow stream with no RecordBatch message raises RuntimeError (writer-side defect, not a runtime case). |
Implementation sketch¶
def serialize_arrow_record_batch(batch: pa.RecordBatch) -> bytes:
for field in batch.schema:
if _type_contains_dictionary(field.type):
raise ValueError(...)
sink = pa.BufferOutputStream()
writer = pa.ipc.new_stream(sink, batch.schema)
writer.write_batch(batch)
writer.close()
reader = pa.ipc.MessageReader.open_stream(pa.BufferReader(sink.getvalue()))
while True:
msg = reader.read_next_message()
if msg.type == "record batch":
return _serialize_one_message_to_bytes(msg)
The implementation uses pyarrow's high-level new_stream writer
then peels off the schema-message prefix and EOS-marker suffix via a
MessageReader. This is portable across pyarrow versions; the
low-level pa.ipc.write_message API has signature drift between
14.x and 17.x+ that the helper avoids.
Rationale¶
Why bare-message, not full-stream¶
The bare-message format is the wire contract real BigQuery
uses. Emitting a full stream made the v1.0.0 emulator
non-interoperable with the canonical
google-cloud-bigquery-storage reader path. Bytes that "work"
with pa.ipc.open_stream(payload).read_all() (the v1.0.0
implementation's de-facto consumer) are silent compatibility with
nothing — the goal is silent compatibility with the real client.
Why reject dict-encoded batches, not preserve them¶
Three options were considered:
- Preserve dict frames by extending the proto with a
dictionary_batchesfield. Out of scope — diverges from real BigQuery's wire format; any real client would ignore the extra field and fail to decode. - Concatenate schema-message + dict-messages + batch-message
into the same
serialized_record_batchpayload. Breaks theArrowRecordBatchproto contract (which carries one record-batch message); any conforming consumer that callsread_record_batch(bytes, schema)still fails. - Reject at the producer. ✅ Real BigQuery doesn't surface
dict-encoded columns through Storage Read either — Avro for
low-cardinality, Arrow for the rest, both as plain types.
Producer-side rejection at the boundary matches the actual wire
contract. A clear
ValueErrorwith the offending column name beats a silent corrupt-payload bug.
Why recurse into nested types¶
pa.types.is_dictionary(t) only inspects the top-level type
(pyarrow 14 source).
Arrow's IPC format emits DictionaryBatch messages for
dict-encoded children inside structs / unions / lists / maps too
(arrow-rs commit 85402148c3af03d).
A flat check at the top level would silently allow nested dict
fields and produce the same corrupt-payload bug the rejection was
meant to prevent. The serializer recurses through
struct/list/large_list/fixed_size_list/map/union
children to catch every dict-encoded leaf.
Consequences¶
- The canonical
google-cloud-bigquery-storagereader path now works against bqemulator unchanged — the v1.0.0 pyspark-bigquery example's inlineopen_streamworkaround was dropped in the same commit. serialize_arrow_ipc(table)was removed from__all__(the full-stream helper was the v1.0.0 wrong path, no callers).- Tables containing dict-encoded columns at any nesting depth
must be flattened before going through the Storage Read API.
This is consistent with real BigQuery's behaviour but should be
documented in the per-language quickstart guides as users
encounter the
ValueError.
References¶
- Issue #15 — the consumer-side
read_record_batchfailure that surfaced the v1.0.0 mismatch. - Apache Arrow IPC format
- PyArrow IPC docs
- BigQuery Storage Read API proto
- ADR 0030 — sibling decision documenting the Avro alternative for the same RPC.