Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions src/sql-parser/src/ast/defs/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1550,8 +1550,11 @@ pub enum CreateSinkConnection<T: AstInfo> {
headers: Option<Ident>,
},
Iceberg {
connection: T::ItemName,
aws_connection: T::ItemName,
catalog_connection: T::ItemName,

/// AWS creds for the storage layer.
aws_connection: Option<T::ItemName>,

key: Option<SinkKey>,
options: Vec<IcebergSinkConfigOption<T>>,
},
Expand Down Expand Up @@ -1583,20 +1586,22 @@ impl<T: AstInfo> AstDisplay for CreateSinkConnection<T> {
}
}
CreateSinkConnection::Iceberg {
connection,
catalog_connection,
aws_connection,
key,
options,
} => {
f.write_str("ICEBERG CATALOG CONNECTION ");
f.write_node(connection);
f.write_node(catalog_connection);
if !options.is_empty() {
f.write_str(" (");
f.write_node(&display::comma_separated(options));
f.write_str(")");
}
f.write_str(" USING AWS CONNECTION ");
f.write_node(aws_connection);
if let Some(aws_connection) = aws_connection {
f.write_str(" USING AWS CONNECTION ");
f.write_node(aws_connection);
}
if let Some(key) = key.as_ref() {
f.write_str(" ");
f.write_node(key);
Expand Down
11 changes: 7 additions & 4 deletions src/sql-parser/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3861,7 +3861,7 @@ impl<'a> Parser<'a> {
&mut self,
) -> Result<CreateSinkConnection<Raw>, ParserError> {
self.expect_keyword(CONNECTION)?;
let connection = self.parse_raw_name()?;
let catalog_connection = self.parse_raw_name()?;

let options = if self.consume_token(&Token::LParen) {
let options = self.parse_comma_separated(Parser::parse_iceberg_sink_config_option)?;
Expand All @@ -3871,8 +3871,11 @@ impl<'a> Parser<'a> {
vec![]
};

self.expect_keywords(&[USING, AWS, CONNECTION])?;
let aws_connection = self.parse_raw_name()?;
let aws_connection = if self.parse_keywords(&[USING, AWS, CONNECTION]) {
Some(self.parse_raw_name()?)
} else {
None
};

let key = if self.parse_keyword(KEY) {
let key_columns = self.parse_parenthesized_column_list(Mandatory)?;
Expand All @@ -3887,7 +3890,7 @@ impl<'a> Parser<'a> {
};

Ok(CreateSinkConnection::Iceberg {
connection,
catalog_connection,
aws_connection,
key,
options,
Expand Down
13 changes: 10 additions & 3 deletions src/sql-parser/tests/testdata/ddl
Original file line number Diff line number Diff line change
Expand Up @@ -1054,19 +1054,26 @@ CREATE SINK foo FROM bar INTO KAFKA CONNECTION baz (TOPIC = 'topic') KEY FORMAT
=>
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("foo")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("bar")])), connection: Kafka { connection: Name(UnresolvedItemName([Ident("baz")])), options: [KafkaSinkConfigOption { name: Topic, value: Some(Value(String("topic"))) }], key: None, headers: None }, format: Some(KeyValue { key: Avro(Csr { csr_connection: CsrConnectionAvro { connection: CsrConnection { connection: Name(UnresolvedItemName([Ident("conn2")])), options: [] }, key_strategy: None, value_strategy: None, seed: None } }), value: Json { array: false } }), envelope: None, mode: None, with_options: [] })

parse-statement
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') KEY (a) NOT ENFORCED MODE UPSERT;
----
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') KEY (a) NOT ENFORCED MODE UPSERT
=>
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { catalog_connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: None, key: Some(SinkKey { key_columns: [Ident("a")], not_enforced: true }), options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Upsert), with_options: [] })

parse-statement
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn KEY (a) NOT ENFORCED MODE UPSERT;
----
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn KEY (a) NOT ENFORCED MODE UPSERT
=>
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Name(UnresolvedItemName([Ident("aws_conn")])), key: Some(SinkKey { key_columns: [Ident("a")], not_enforced: true }), options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Upsert), with_options: [] })
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { catalog_connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Some(Name(UnresolvedItemName([Ident("aws_conn")]))), key: Some(SinkKey { key_columns: [Ident("a")], not_enforced: true }), options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Upsert), with_options: [] })

parse-statement
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn KEY (a) MODE UPSERT;
----
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn KEY (a) MODE UPSERT
=>
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Name(UnresolvedItemName([Ident("aws_conn")])), key: Some(SinkKey { key_columns: [Ident("a")], not_enforced: false }), options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Upsert), with_options: [] })
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { catalog_connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Some(Name(UnresolvedItemName([Ident("aws_conn")]))), key: Some(SinkKey { key_columns: [Ident("a")], not_enforced: false }), options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Upsert), with_options: [] })

parse-statement
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (BLAH = 'boo!') USING AWS CONNECTION aws_conn MODE UPSERT;
Expand All @@ -1080,7 +1087,7 @@ CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = '
----
CREATE SINK bar FROM foo INTO ICEBERG CATALOG CONNECTION s3tables (NAMESPACE = 'testnamespace', TABLE = 'daily_sales') USING AWS CONNECTION aws_conn MODE APPEND
=>
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Name(UnresolvedItemName([Ident("aws_conn")])), key: None, options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Append), with_options: [] })
CreateSink(CreateSinkStatement { name: Some(UnresolvedItemName([Ident("bar")])), in_cluster: None, if_not_exists: false, from: Name(UnresolvedItemName([Ident("foo")])), connection: Iceberg { catalog_connection: Name(UnresolvedItemName([Ident("s3tables")])), aws_connection: Some(Name(UnresolvedItemName([Ident("aws_conn")]))), key: None, options: [IcebergSinkConfigOption { name: Namespace, value: Some(Value(String("testnamespace"))) }, IcebergSinkConfigOption { name: Table, value: Some(Value(String("daily_sales"))) }] }, format: None, envelope: None, mode: Some(Append), with_options: [] })

parse-statement
CREATE INDEX foo ON myschema.bar (a, b)
Expand Down
26 changes: 14 additions & 12 deletions src/sql/src/plan/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3482,13 +3482,13 @@ fn plan_sink(
commit_interval,
)?,
CreateSinkConnection::Iceberg {
connection,
catalog_connection,
aws_connection,
options,
..
} => iceberg_sink_builder(
scx,
connection,
catalog_connection,
aws_connection,
options,
relation_key_indices,
Expand Down Expand Up @@ -3660,7 +3660,7 @@ impl std::convert::TryFrom<Vec<CsrConfigOption<Aug>>> for CsrConfigOptionExtract
fn iceberg_sink_builder(
scx: &StatementContext,
catalog_connection: ResolvedItemName,
aws_connection: ResolvedItemName,
storage_connection: Option<ResolvedItemName>,
options: Vec<IcebergSinkConfigOption<Aug>>,
relation_key_indices: Option<Vec<usize>>,
key_desc_and_indices: Option<(RelationDesc, Vec<usize>)>,
Expand All @@ -3672,10 +3672,9 @@ fn iceberg_sink_builder(
// (e.g. interval -> string) don't trip the check.
ArrowBuilder::validate_desc_for_parquet(desc, iceberg_type_overrides)
.map_err(|e| sql_err!("{}", e))?;

let catalog_connection_item = scx.get_item_by_resolved_name(&catalog_connection)?;
let catalog_connection_id = catalog_connection_item.id();
let aws_connection_item = scx.get_item_by_resolved_name(&aws_connection)?;
let aws_connection_id = aws_connection_item.id();
if !matches!(
catalog_connection_item.connection()?,
Connection::IcebergCatalog(_)
Expand All @@ -3689,13 +3688,16 @@ fn iceberg_sink_builder(
);
};

if !matches!(aws_connection_item.connection()?, Connection::Aws(_)) {
let storage_connection_item = storage_connection
.map(|c| scx.get_item_by_resolved_name(&c))
.transpose()?;
let storage_connection_id = storage_connection_item.as_ref().map(|c| c.id());
if let Some(c) = &storage_connection_item
&& !matches!(c.connection()?, Connection::Aws(_))
{
sql_bail!(
"{} is not an AWS connection",
scx.catalog
.resolve_full_name(aws_connection_item.name())
.to_string()
.quoted()
scx.catalog.resolve_full_name(c.name()).to_string().quoted()
);
}

Expand All @@ -3718,8 +3720,8 @@ fn iceberg_sink_builder(
Ok(StorageSinkConnection::Iceberg(IcebergSinkConnection {
catalog_connection_id,
catalog_connection: catalog_connection_id,
aws_connection_id,
aws_connection: aws_connection_id,
storage_connection_id,
storage_connection: storage_connection_id,
table,
namespace,
relation_key_indices,
Expand Down
63 changes: 36 additions & 27 deletions src/sql/src/pure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,13 +542,13 @@ async fn purify_create_sink(
}
}
CreateSinkConnection::Iceberg {
connection,
catalog_connection,
aws_connection,
..
} => {
let scx = StatementContext::new(None, &catalog);
let connection = {
let item = scx.get_item_by_resolved_name(connection)?;
let item = scx.get_item_by_resolved_name(catalog_connection)?;
// Get Iceberg connection
match item.connection()? {
Connection::IcebergCatalog(connection) => {
Expand All @@ -561,20 +561,6 @@ async fn purify_create_sink(
}
};

let aws_conn_id = aws_connection.item_id();

let aws_connection = {
let item = scx.get_item_by_resolved_name(aws_connection)?;
// Get AWS connection
match item.connection()? {
Connection::Aws(aws_connection) => aws_connection.clone(),
_ => sql_bail!(
"{} is not an aws connection",
scx.catalog.resolve_full_name(item.name())
),
}
};

// For S3 Tables connections in the Materialize Cloud product, verify the
// AWS region matches the environment's region. This check only applies when
// the enable_s3_tables_region_check dyncfg is set.
Expand Down Expand Up @@ -603,21 +589,44 @@ async fn purify_create_sink(
}
}

// Validate the sink's (optional) AWS connection even though we never use it.
// TODO(kynan): If we do start using the sink's creds, check again that this validation
// accurately reflects what we need.
// Consider rolling the storage creds validation into the catalog connection's "connect" fn,
// which already validates the catalog creds (currently also used for the storage layer).
if let Some(aws_connection) = aws_connection {
let aws_conn_id = aws_connection.item_id();
let aws_connection = {
let item = scx.get_item_by_resolved_name(aws_connection)?;
// Get AWS connection
match item.connection()? {
Connection::Aws(aws_connection) => aws_connection.clone(),
_ => sql_bail!(
"{} is not an aws connection",
scx.catalog.resolve_full_name(item.name())
),
}
};

let _sdk_config = aws_connection
.load_sdk_config(
&storage_configuration.connection_context,
aws_conn_id.clone(),
InTask::No,
mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
.get(storage_configuration.config_set()),
)
.await
.map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
}

// Now that we've validated the sink's storage creds (if they exist)
// we _could_ use them to build a complete Iceberg client (both catalog and storage).
// TODO(kynan): Actually use those sink-specific creds here instead of ignoring them.
let _catalog = connection
.connect(storage_configuration, InTask::No)
.await
.map_err(|e| IcebergSinkPurificationError::CatalogError(Arc::new(e)))?;

let _sdk_config = aws_connection
.load_sdk_config(
&storage_configuration.connection_context,
aws_conn_id.clone(),
InTask::No,
mz_storage_types::dyncfgs::ENFORCE_EXTERNAL_ADDRESSES
.get(storage_configuration.config_set()),
)
.await
.map_err(|e| IcebergSinkPurificationError::AwsSdkContextError(Arc::new(e)))?;
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/storage-types/src/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,8 @@ impl IcebergCatalogConnection<InlinedConnection> {
signing_name: "s3tables".to_string(),
});

// N.B. We're using the AWS credentials from the catalog connection for the storage layer
// even though the sink comes with its own (unused) AWS credentials for storage.
let customized_credential_load = if matches!(aws_auth, AwsAuth::AssumeRole(_)) {
Some(CustomAwsCredentialLoader::new(Arc::new(
AwsSdkCredentialLoader::new(credentials_provider),
Expand Down
Loading
Loading