Skip to content

Commit 41cc423

Browse files
committed
Kynan: Iceberg sink's AWS connection isn't used.
1 parent 16edb20 commit 41cc423

13 files changed

Lines changed: 137 additions & 140 deletions

File tree

src/sql-parser/src/ast/defs/ddl.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1542,8 +1542,11 @@ pub enum CreateSinkConnection<T: AstInfo> {
15421542
headers: Option<Ident>,
15431543
},
15441544
Iceberg {
1545-
connection: T::ItemName,
1546-
aws_connection: T::ItemName,
1545+
catalog_connection: T::ItemName,
1546+
1547+
/// AWS creds for the storage layer.
1548+
aws_connection: Option<T::ItemName>,
1549+
15471550
key: Option<SinkKey>,
15481551
options: Vec<IcebergSinkConfigOption<T>>,
15491552
},
@@ -1575,20 +1578,22 @@ impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
15751578
}
15761579
}
15771580
CreateSinkConnection::Iceberg {
1578-
connection,
1581+
catalog_connection,
15791582
aws_connection,
15801583
key,
15811584
options,
15821585
} => {
15831586
f.write_str("ICEBERG CATALOG CONNECTION ");
1584-
f.write_node(connection);
1587+
f.write_node(catalog_connection);
15851588
if !options.is_empty() {
15861589
f.write_str(" (");
15871590
f.write_node(&display::comma_separated(options));
15881591
f.write_str(")");
15891592
}
1590-
f.write_str(" USING AWS CONNECTION ");
1591-
f.write_node(aws_connection);
1593+
if let Some(aws_connection) = aws_connection {
1594+
f.write_str(" USING AWS CONNECTION ");
1595+
f.write_node(aws_connection);
1596+
}
15921597
if let Some(key) = key.as_ref() {
15931598
f.write_str(" ");
15941599
f.write_node(key);

src/sql-parser/src/parser.rs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3856,7 +3856,7 @@ impl<'a> Parser<'a> {
38563856
&mut self,
38573857
) -> Result<CreateSinkConnection<Raw>, ParserError> {
38583858
self.expect_keyword(CONNECTION)?;
3859-
let connection = self.parse_raw_name()?;
3859+
let catalog_connection = self.parse_raw_name()?;
38603860

38613861
let options = if self.consume_token(&Token::LParen) {
38623862
let options = self.parse_comma_separated(Parser::parse_iceberg_sink_config_option)?;
@@ -3866,8 +3866,11 @@ impl<'a> Parser<'a> {
38663866
vec![]
38673867
};
38683868

3869-
self.expect_keywords(&[USING, AWS, CONNECTION])?;
3870-
let aws_connection = self.parse_raw_name()?;
3869+
let aws_connection = if self.parse_keywords(&[USING, AWS, CONNECTION]) {
3870+
Some(self.parse_raw_name()?)
3871+
} else {
3872+
None
3873+
};
38713874

38723875
let key = if self.parse_keyword(KEY) {
38733876
let key_columns = self.parse_parenthesized_column_list(Mandatory)?;
@@ -3882,7 +3885,7 @@ impl<'a> Parser<'a> {
38823885
};
38833886

38843887
Ok(CreateSinkConnection::Iceberg {
3885-
connection,
3888+
catalog_connection,
38863889
aws_connection,
38873890
key,
38883891
options,

src/sql-parser/tests/testdata/ddl

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,19 +1009,26 @@ CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC = 'topic') KEY FORMAT
10091009
=>
10101010
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("foo")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("bar")])), connection: Kafka { connection: Name(UnresolvedItemName([Ident("baz")])), options: [KafkaSinkConfigOption { name: Topic, value: Some(Value(String("topic"))) }], key: None, headers: None }, format: Some(KeyValue { key: Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("conn2")])), options: [] }, key_strategy: None, value_strategy: None, seed: None } }), value: Json { array: false } }), envelope: None, mode: None, with_options: [] })
10111011

1012+
parse-statement
1013+
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') KEY (a) NOT ENFORCED MODE UPSERT;
1014+
----
1015+
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') KEY (a) NOT ENFORCED MODE UPSERT
1016+
=>
1017+
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { catalog_connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: None, key: Some(SinkKey { key_columns: [Ident("a")], not_enforced: true }), options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Upsert), with_options: [] })
1018+
10121019
parse-statement
10131020
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn KEY (a) NOT ENFORCED MODE UPSERT;
10141021
----
10151022
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn KEY (a) NOT ENFORCED MODE UPSERT
10161023
=>
1017-
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Name(UnresolvedItemName([Ident("aws_conn")])), key: Some(SinkKey { key_columns: [Ident("a")], not_enforced: true }), options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Upsert), with_options: [] })
1024+
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { catalog_connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Some(Name(UnresolvedItemName([Ident("aws_conn")]))), key: Some(SinkKey { key_columns: [Ident("a")], not_enforced: true }), options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Upsert), with_options: [] })
10181025

10191026
parse-statement
10201027
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn KEY (a) MODE UPSERT;
10211028
----
10221029
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn KEY (a) MODE UPSERT
10231030
=>
1024-
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Name(UnresolvedItemName([Ident("aws_conn")])), key: Some(SinkKey { key_columns: [Ident("a")], not_enforced: false }), options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Upsert), with_options: [] })
1031+
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { catalog_connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Some(Name(UnresolvedItemName([Ident("aws_conn")]))), key: Some(SinkKey { key_columns: [Ident("a")], not_enforced: false }), options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Upsert), with_options: [] })
10251032

10261033
parse-statement
10271034
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (BLAH = 'boo!') USING AWS CONNECTION aws_conn MODE UPSERT;
@@ -1035,7 +1042,7 @@ CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = '
10351042
----
10361043
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn MODE APPEND
10371044
=>
1038-
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Name(UnresolvedItemName([Ident("aws_conn")])), key: None, options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Append), with_options: [] })
1045+
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { catalog_connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Some(Name(UnresolvedItemName([Ident("aws_conn")]))), key: None, options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Append), with_options: [] })
10391046

10401047
parse-statement
10411048
CREATE INDEX foo ON myschema.bar (a, b)

src/sql/src/plan/statement/ddl.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3466,13 +3466,13 @@ fn plan_sink(
34663466
commit_interval,
34673467
)?,
34683468
CreateSinkConnection::Iceberg {
3469-
connection,
3469+
catalog_connection,
34703470
aws_connection,
34713471
options,
34723472
..
34733473
} => iceberg_sink_builder(
34743474
scx,
3475-
connection,
3475+
catalog_connection,
34763476
aws_connection,
34773477
options,
34783478
relation_key_indices,
@@ -3644,7 +3644,7 @@ impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtract
36443644
fn iceberg_sink_builder(
36453645
scx: &StatementContext,
36463646
catalog_connection: ResolvedItemName,
3647-
aws_connection: ResolvedItemName,
3647+
storage_connection: Option<ResolvedItemName>,
36483648
options: Vec<IcebergSinkConfigOption<Aug>>,
36493649
relation_key_indices: Option<Vec<usize>>,
36503650
key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
@@ -3658,10 +3658,9 @@ fn iceberg_sink_builder(
36583658
// (e.g. interval -> string) don't trip the check.
36593659
ArrowBuilder::validate_desc_for_parquet(desc, iceberg_type_overrides)
36603660
.map_err(|e| sql_err!("{}", e))?;
3661+
36613662
let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
36623663
let catalog_connection_id = catalog_connection_item.id();
3663-
let aws_connection_item = scx.get_item_by_resolved_name(&aws_connection)?;
3664-
let aws_connection_id = aws_connection_item.id();
36653664
if !matches!(
36663665
catalog_connection_item.connection()?,
36673666
Connection::IcebergCatalog(_)
@@ -3675,13 +3674,16 @@ fn iceberg_sink_builder(
36753674
);
36763675
};
36773676

3678-
if !matches!(aws_connection_item.connection()?, Connection::Aws(_)) {
3677+
let storage_connection_item = storage_connection
3678+
.map(|c| scx.get_item_by_resolved_name(&c))
3679+
.transpose()?;
3680+
let storage_connection_id = storage_connection_item.as_ref().map(|c| c.id());
3681+
if let Some(c) = &storage_connection_item
3682+
&& !matches!(c.connection()?, Connection::Aws(_))
3683+
{
36793684
sql_bail!(
36803685
"{} is not an AWS connection",
3681-
scx.catalog
3682-
.resolve_full_name(aws_connection_item.name())
3683-
.to_string()
3684-
.quoted()
3686+
scx.catalog.resolve_full_name(c.name()).to_string().quoted()
36853687
);
36863688
}
36873689

@@ -3704,8 +3706,8 @@ fn iceberg_sink_builder(
37043706
Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
37053707
catalog_connection_id,
37063708
catalog_connection: catalog_connection_id,
3707-
aws_connection_id,
3708-
aws_connection: aws_connection_id,
3709+
storage_connection_id,
3710+
storage_connection: storage_connection_id,
37093711
table,
37103712
namespace,
37113713
relation_key_indices,

src/sql/src/pure.rs

Lines changed: 58 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -544,13 +544,13 @@ async fn purify_create_sink(
544544
}
545545
}
546546
CreateSinkConnection::Iceberg {
547-
connection,
547+
catalog_connection,
548548
aws_connection,
549549
..
550550
} => {
551551
let scx = StatementContext::new(None, &catalog);
552552
let connection = {
553-
let item = scx.get_item_by_resolved_name(connection)?;
553+
let item = scx.get_item_by_resolved_name(catalog_connection)?;
554554
// Get Iceberg connection
555555
match item.connection()? {
556556
Connection::IcebergCatalog(connection) => {
@@ -563,63 +563,72 @@ async fn purify_create_sink(
563563
}
564564
};
565565

566-
let aws_conn_id = aws_connection.item_id();
567-
568-
let aws_connection = {
569-
let item = scx.get_item_by_resolved_name(aws_connection)?;
570-
// Get AWS connection
571-
match item.connection()? {
572-
Connection::Aws(aws_connection) => aws_connection.clone(),
573-
_ => sql_bail!(
574-
"{} is not an aws connection",
575-
scx.catalog.resolve_full_name(item.name())
576-
),
577-
}
578-
};
566+
// Validate the sink's (optional) AWS connection even though we never use it.
567+
// TODO(kynan): If we do start using the sink's creds, check again that this validation
568+
// accurately reflects what we need.
569+
// Consider rolling the storage creds validation into the catalog connection's "connect" fn,
570+
// which already validates the catalog creds (currently also used for the storage layer).
571+
if let Some(aws_connection) = aws_connection {
572+
let aws_conn_id = aws_connection.item_id();
573+
let aws_connection = {
574+
let item = scx.get_item_by_resolved_name(aws_connection)?;
575+
// Get AWS connection
576+
match item.connection()? {
577+
Connection::Aws(aws_connection) => aws_connection.clone(),
578+
_ => sql_bail!(
579+
"{} is not an aws connection",
580+
scx.catalog.resolve_full_name(item.name())
581+
),
582+
}
583+
};
579584

580-
// For S3 Tables connections in the Materialize Cloud product, verify the
581-
// AWS region matches the environment's region. This check only applies when
582-
// the enable_s3_tables_region_check dyncfg is set.
583-
if let Some(s3tables) = connection.s3tables_catalog() {
584-
let enable_region_check =
585-
ENABLE_S3_TABLES_REGION_CHECK.get(scx.catalog.system_vars().dyncfgs());
586-
if enable_region_check {
587-
let env_id = &catalog.config().environment_id;
588-
if matches!(env_id.cloud_provider(), CloudProvider::Aws) {
589-
let env_region = env_id.cloud_provider_region();
590-
// Later on we default to "us-east-1" if the region is not set on the S3 Tables
591-
// connection, so we need to do the same check here.
592-
let s3_tables_region = s3tables
593-
.aws_connection
594-
.connection
595-
.region
596-
.clone()
597-
.unwrap_or_else(|| "us-east-1".to_string());
598-
if s3_tables_region != env_region {
599-
Err(IcebergSinkPurificationError::S3TablesRegionMismatch {
600-
s3_tables_region,
601-
environment_region: env_region.to_string(),
602-
})?;
585+
// For S3 Tables connections in the Materialize Cloud product, verify the
586+
// AWS region matches the environment's region. This check only applies when
587+
// the enable_s3_tables_region_check dyncfg is set.
588+
if let Some(s3tables) = connection.s3tables_catalog() {
589+
let enable_region_check =
590+
ENABLE_S3_TABLES_REGION_CHECK.get(scx.catalog.system_vars().dyncfgs());
591+
if enable_region_check {
592+
let env_id = &catalog.config().environment_id;
593+
if matches!(env_id.cloud_provider(), CloudProvider::Aws) {
594+
let env_region = env_id.cloud_provider_region();
595+
// Later on we default to "us-east-1" if the region is not set on the S3 Tables
596+
// connection, so we need to do the same check here.
597+
let s3_tables_region = s3tables
598+
.aws_connection
599+
.connection
600+
.region
601+
.clone()
602+
.unwrap_or_else(|| "us-east-1".to_string());
603+
if s3_tables_region != env_region {
604+
Err(IcebergSinkPurificationError::S3TablesRegionMismatch {
605+
s3_tables_region,
606+
environment_region: env_region.to_string(),
607+
})?;
608+
}
603609
}
604610
}
605611
}
612+
613+
let _sdk_config = aws_connection
614+
.load_sdk_config(
615+
&storage_configuration.connection_context,
616+
aws_conn_id.clone(),
617+
InTask::No,
618+
mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
619+
.get(storage_configuration.config_set()),
620+
)
621+
.await
622+
.map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
606623
}
607624

625+
// Now that we've validated the sink's storage creds (if they exist)
626+
// we _could_ use them to build a complete Iceberg client (both catalog and storage).
627+
// TODO(kynan): Actually use those sink-specific creds here instead of ignoring them.
608628
let _catalog = connection
609629
.connect(storage_configuration, InTask::No)
610630
.await
611631
.map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?;
612-
613-
let _sdk_config = aws_connection
614-
.load_sdk_config(
615-
&storage_configuration.connection_context,
616-
aws_conn_id.clone(),
617-
InTask::No,
618-
mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
619-
.get(storage_configuration.config_set()),
620-
)
621-
.await
622-
.map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
623632
}
624633
}
625634

src/storage-types/src/connections.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -757,6 +757,8 @@ impl IcebergCatalogConnection<InlinedConnection> {
757757
signing_name: "s3tables".to_string(),
758758
});
759759

760+
// N.B. We're using the AWS credentials from the catalog connection for the storage layer
761+
// even though the sink comes with its own (unused) AWS credentials for storage.
760762
let customized_credential_load = if matches!(aws_auth, AwsAuth::AssumeRole(_)) {
761763
Some(CustomAwsCredentialLoader::new(Arc::new(
762764
AwsSdkCredentialLoader::new(credentials_provider),

0 commit comments

Comments
 (0)