Skip to content

Commit f8f6e58

Browse files
committed
End-to-end Glue source tests + kafka-ingest glue-wire-format
1 parent 4640628 commit f8f6e58

11 files changed

Lines changed: 861 additions & 12 deletions

File tree

ci/nightly/pipeline.template.yml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -949,6 +949,23 @@ steps:
949949
- ./ci/plugins/mzcompose:
950950
composition: aws-localstack
951951

952+
- id: aws-glue-schema-registry-real
953+
label: AWS Glue Schema Registry (Real)
954+
depends_on: build-aarch64
955+
timeout_in_minutes: 30
956+
retry:
957+
automatic:
958+
- exit_status: 1
959+
limit: 1
960+
agents:
961+
# Because of scratch-aws-access
962+
queue: linux-aarch64-small
963+
plugins:
964+
- ./ci/plugins/scratch-aws-access: ~
965+
- ./ci/plugins/mzcompose:
966+
composition: aws-glue-schema-registry
967+
run: aws
968+
952969
- id: secrets-local-file
953970
label: "Secrets Local File"
954971
depends_on: build-aarch64

ci/test/lint-main/checks/check-mzcompose-files.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ check_default_workflow_references_others() {
5050
-not -wholename "./test/cluster-spec-sheet/mzcompose.py" `# Handled differently` \
5151
-not -wholename "./test/orchestratord/mzcompose.py" `# Handled differently` \
5252
-not -wholename "./test/workload-replay/mzcompose.py" `# Handled differently` \
53+
-not -wholename "./test/aws-glue-schema-registry/mzcompose.py" `# 'aws' workflow runs against real AWS, opt-in via nightly only` \
5354
)
5455

5556
for file in "${MZCOMPOSE_TEST_FILES[@]}"; do

ci/test/pipeline.template.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,17 @@ steps:
467467
agents:
468468
queue: hetzner-aarch64-4cpu-8gb
469469

470+
- id: aws-glue-schema-registry
471+
label: AWS Glue Schema Registry (Moto)
472+
depends_on: build-aarch64
473+
timeout_in_minutes: 30
474+
artifact_paths: junit_*.xml
475+
plugins:
476+
- ./ci/plugins/mzcompose:
477+
composition: aws-glue-schema-registry
478+
agents:
479+
queue: hetzner-aarch64-4cpu-8gb
480+
470481
- id: zippy-kafka-sources-short
471482
label: "Short Zippy"
472483
depends_on: build-aarch64
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
# Copyright Materialize, Inc. and contributors. All rights reserved.
2+
#
3+
# Use of this software is governed by the Business Source License
4+
# included in the LICENSE file at the root of this repository.
5+
#
6+
# As of the Change Date specified in that file, in accordance with
7+
# the Business Source License, use of this software will be governed
8+
# by the Apache License, Version 2.0.
9+
10+
11+
from materialize.mzcompose.service import (
12+
Service,
13+
)
14+
15+
16+
class Moto(Service):
17+
"""A `moto_server` container that mocks AWS services.
18+
19+
Used for testing AWS surfaces that LocalStack Community doesn't cover
20+
(notably Glue / Glue Schema Registry).
21+
"""
22+
23+
def __init__(
24+
self,
25+
name: str = "moto",
26+
image: str = "motoserver/moto:5.2.1",
27+
port: int = 5000,
28+
) -> None:
29+
super().__init__(
30+
name=name,
31+
config={
32+
"image": image,
33+
"init": True,
34+
"ports": [port],
35+
"healthcheck": {
36+
"test": [
37+
"CMD-SHELL",
38+
f"python -c 'import urllib.request; urllib.request.urlopen(\"http://localhost:{port}/moto-api/\")'",
39+
],
40+
"interval": "1s",
41+
"start_period": "30s",
42+
},
43+
},
44+
)

src/interchange/src/avro/schema.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -549,8 +549,24 @@ impl fmt::Debug for AvroSchemaResolver {
549549
///
550550
/// Differences from the CSR cache:
551551
/// * Keys are UUIDs (Glue schema-version IDs), not `i32`s.
552-
/// * Glue schemas are single definitions — no `references` field on
553-
/// `GetSchemaVersion`, so the cache does not chase a dependency graph.
552+
/// * **No cross-schema references.** AWS Glue Schema Registry does not have
553+
/// a CSR-style "schema references" concept: `GetSchemaVersion` returns a
554+
/// single self-contained `SchemaDefinition` JSON blob with no `references`
555+
/// field, so the cache does not chase a dependency graph (compare
556+
/// [`SchemaCache::get`], which fetches transitive references). Avro's
557+
/// *intra-document* named-type references still work — they are resolved
558+
/// by the Avro parser from the single JSON document.
559+
/// * **Registry name enforcement.** Glue schema-version UUIDs are globally
560+
/// unique within an AWS account, and `GetSchemaVersion(uuid)` is not
561+
/// scoped to a registry. The AWS API forces a choice between
562+
/// `SchemaVersionId` (UUID) and `SchemaId(RegistryName, SchemaName) +
563+
/// SchemaVersionNumber`; combining them is not allowed (see
564+
/// <https://docs.aws.amazon.com/glue/latest/webapi/API_GetSchemaVersion.html>).
565+
/// At runtime we only have the UUID, so the cache validates each
566+
/// fetched version's `SchemaArn` against `expected_registry` and
567+
/// treats a mismatch as a permanent decode error — otherwise a record
568+
/// framed against the wrong registry would silently succeed as long as
569+
/// the credentials could see it.
554570
/// * No outer retry layer. `aws-sdk-glue` ships a "standard" retry policy
555571
/// by default that handles transient errors; layering our own
556572
/// `Retry::default()` on top would amplify backoff without adding

src/sql-parser/src/ast/defs/ddl.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,15 @@ pub enum AvroSchema<T: AstInfo> {
174174
///
175175
/// Parallel to the `Csr` variant. See
176176
/// `doc/developer/design/20260512_aws_glue_schema_registry.md`.
177+
///
178+
/// **Cross-schema references are intentionally unsupported.** Unlike the
179+
/// `Csr` variant — whose `key_reference_schemas` / `value_reference_schemas`
180+
/// fields are resolved transitively against the Confluent registry — AWS
181+
/// Glue Schema Registry has no analogue: `RegisterSchemaVersion` takes a
182+
/// single self-contained `SchemaDefinition` JSON blob with no
183+
/// cross-schema-reference field. Avro's *intra-document* named-type
184+
/// references still work because they are resolved by the Avro parser
185+
/// from a single JSON document.
177186
Glue {
178187
connection: T::ItemName,
179188
with_options: Vec<GlueAvroOption<T>>,

src/sql/src/plan/statement/ddl.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2415,6 +2415,12 @@ fn get_encoding_inner(
24152415
sql_bail!("Avro Glue seed resolution has not been performed");
24162416
};
24172417

2418+
// A Glue seed carries a single `value_schema` (unlike a
2419+
// CSR seed, which can hold both key and value), so a bare
2420+
// `FORMAT AVRO USING AWS GLUE` is always value-only. A
2421+
// key is still expressible via the separate `KEY FORMAT
2422+
// ... VALUE FORMAT ...` clause, where `get_encoding`
2423+
// promotes this value encoding into the key slot.
24182424
Schema {
24192425
key_schema: None,
24202426
value_schema: seed.value_schema.clone(),

src/testdrive/src/action/kafka/ingest.rs

Lines changed: 75 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use rdkafka::message::{Header, OwnedHeaders};
2121
use rdkafka::producer::FutureRecord;
2222
use serde::de::DeserializeOwned;
2323
use tokio::fs;
24+
use uuid::Uuid;
2425

2526
use crate::action::{self, ControlFlow, State};
2627
use crate::format::avro::{self, Schema};
@@ -160,6 +161,10 @@ enum Format {
160161
Avro {
161162
schema: String,
162163
confluent_wire_format: bool,
164+
/// If set, override the wire format with AWS Glue Schema Registry framing
165+
/// using the given schema-version UUID. Mutually exclusive with
166+
/// `confluent_wire_format=true`.
167+
glue_schema_version_id: Option<Uuid>,
163168
/// Schema references (subject names) for Confluent Schema Registry
164169
references: Vec<String>,
165170
},
@@ -183,6 +188,10 @@ enum Transcoder {
183188
schema: Schema,
184189
schema_id: i32,
185190
},
191+
GlueAvro {
192+
schema: Schema,
193+
schema_version_id: Uuid,
194+
},
186195
Protobuf {
187196
message: MessageDescriptor,
188197
confluent_wire_format: bool,
@@ -230,6 +239,26 @@ impl Transcoder {
230239
Ok(None)
231240
}
232241
}
242+
Transcoder::GlueAvro {
243+
schema,
244+
schema_version_id,
245+
} => {
246+
if let Some(val) = Self::decode_json(row)? {
247+
let val = avro::from_json(&val, schema.top_node())?;
248+
let mut out = vec![];
249+
// AWS Glue Schema Registry wire format:
250+
// byte 0 = 0x03 (header version)
251+
// byte 1 = compression byte (0x00 = none)
252+
// bytes 2..18 = 16-byte schema-version UUID
253+
out.write_u8(0x03).unwrap();
254+
out.write_u8(0x00).unwrap();
255+
out.extend_from_slice(schema_version_id.as_bytes());
256+
out.extend(avro::to_avro_datum(schema, val)?);
257+
Ok(Some(out))
258+
} else {
259+
Ok(None)
260+
}
261+
}
233262
Transcoder::PlainAvro { schema } => {
234263
if let Some(val) = Self::decode_json(row)? {
235264
let val = avro::from_json(&val, schema.top_node())?;
@@ -303,16 +332,31 @@ pub async fn run_ingest(
303332
let schema_id_var = cmd.args.opt_parse("set-schema-id-var")?;
304333
let key_schema_id_var = cmd.args.opt_parse("set-key-schema-id-var")?;
305334
let format = match cmd.args.string("format")?.as_str() {
306-
"avro" => Format::Avro {
307-
schema: cmd.args.string("schema")?,
308-
confluent_wire_format: cmd.args.opt_bool("confluent-wire-format")?.unwrap_or(true),
309-
// TODO (maz): update README!
310-
references: cmd
335+
"avro" => {
336+
let glue_schema_version_id = cmd
311337
.args
312-
.opt_string("references")
313-
.map(|s| s.split(',').map(|s| s.to_string()).collect())
314-
.unwrap_or_default(),
315-
},
338+
.opt_string("glue-schema-version-id")
339+
.map(|s| Uuid::parse_str(&s).context("parsing glue-schema-version-id as UUID"))
340+
.transpose()?;
341+
let confluent_wire_format_explicit = cmd.args.opt_bool("confluent-wire-format")?;
342+
if glue_schema_version_id.is_some() && confluent_wire_format_explicit == Some(true) {
343+
bail!("confluent-wire-format=true is incompatible with glue-schema-version-id");
344+
}
345+
// Default: confluent unless Glue framing is requested.
346+
let confluent_wire_format =
347+
confluent_wire_format_explicit.unwrap_or_else(|| glue_schema_version_id.is_none());
348+
Format::Avro {
349+
schema: cmd.args.string("schema")?,
350+
confluent_wire_format,
351+
glue_schema_version_id,
352+
// TODO (maz): update README!
353+
references: cmd
354+
.args
355+
.opt_string("references")
356+
.map(|s| s.split(',').map(|s| s.to_string()).collect())
357+
.unwrap_or_default(),
358+
}
359+
}
316360
"protobuf" => {
317361
let descriptor_file = cmd.args.string("descriptor-file")?;
318362
let message = cmd.args.string("message")?;
@@ -336,6 +380,7 @@ pub async fn run_ingest(
336380
anyhow!("key-schema parameter required when key-format is present")
337381
})?,
338382
confluent_wire_format: cmd.args.opt_bool("confluent-wire-format")?.unwrap_or(true),
383+
glue_schema_version_id: None,
339384
references: cmd
340385
.args
341386
.opt_string("key-references")
@@ -425,8 +470,16 @@ pub async fn run_ingest(
425470
match fmt {
426471
Format::Avro {
427472
confluent_wire_format,
473+
glue_schema_version_id,
428474
..
429-
} => Some(*confluent_wire_format),
475+
} => {
476+
// Glue framing is its own wire format — don't compare against CSR.
477+
if glue_schema_version_id.is_some() {
478+
None
479+
} else {
480+
Some(*confluent_wire_format)
481+
}
482+
}
430483
Format::Protobuf {
431484
confluent_wire_format,
432485
..
@@ -550,8 +603,20 @@ async fn make_transcoder(
550603
Format::Avro {
551604
schema,
552605
confluent_wire_format,
606+
glue_schema_version_id,
553607
references,
554608
} => {
609+
if let Some(schema_version_id) = glue_schema_version_id {
610+
if !references.is_empty() {
611+
bail!("schema references are not supported with glue-schema-version-id");
612+
}
613+
let schema = avro::parse_schema(&schema, &[])
614+
.with_context(|| format!("parsing avro schema: {}", schema))?;
615+
return Ok(Transcoder::GlueAvro {
616+
schema,
617+
schema_version_id,
618+
});
619+
}
555620
if confluent_wire_format {
556621
// Build references list by fetching each subject from the registry.
557622
// Start with immediate references and automatically resolve transitive ones.

0 commit comments

Comments
 (0)