Skip to content
Draft
4 changes: 2 additions & 2 deletions crates/cdk-sql-common/src/keyvalue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ where
// Validate parameters according to KV store requirements
validate_kvstore_params(primary_namespace, secondary_namespace, Some(key))?;

let conn = pool.get().map_err(|e| Error::Database(Box::new(e)))?;
let conn = pool.get().await.map_err(|e| Error::Database(Box::new(e)))?;
Ok(query(
r#"
SELECT value
Expand Down Expand Up @@ -195,7 +195,7 @@ where
// Validate namespace parameters according to KV store requirements
validate_kvstore_params(primary_namespace, secondary_namespace, None)?;

let conn = pool.get().map_err(|e| Error::Database(Box::new(e)))?;
let conn = pool.get().await.map_err(|e| Error::Database(Box::new(e)))?;
query(
r#"
SELECT key
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- Migrate amount columns to TEXT
-- u64 amounts cannot be faithfully represented as PostgreSQL INTEGER (signed i64)

-- blind_signature
ALTER TABLE blind_signature ALTER COLUMN amount TYPE TEXT USING amount::TEXT;
ALTER TABLE blind_signature ADD CONSTRAINT chk_blind_signature_amount_numeric CHECK (amount ~ '^\d+$');
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- Migrate amount columns to TEXT
-- u64 amounts cannot be faithfully represented as SQLite INTEGER (signed i64)

-- ============================================================================
-- blind_signature
-- ============================================================================
CREATE TABLE blind_signature_new (
blinded_message BLOB PRIMARY KEY,
amount TEXT NOT NULL,
keyset_id TEXT NOT NULL,
c BLOB NOT NULL
);

INSERT INTO blind_signature_new (blinded_message, amount, keyset_id, c)
SELECT blinded_message, CAST(amount AS TEXT), keyset_id, c
FROM blind_signature;

DROP TABLE blind_signature;
ALTER TABLE blind_signature_new RENAME TO blind_signature;

CREATE INDEX IF NOT EXISTS keyset_id_index ON blind_signature(keyset_id);
51 changes: 41 additions & 10 deletions crates/cdk-sql-common/src/mint/auth/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ where
X: Into<RM::Config>,
{
let pool = Pool::new(db.into());
Self::migrate(pool.get().map_err(|e| Error::Database(Box::new(e)))?).await?;
Self::migrate(pool.get().await.map_err(|e| Error::Database(Box::new(e)))?).await?;
Ok(Self { pool })
}

Expand Down Expand Up @@ -182,7 +182,7 @@ where
"#,
)?
.bind("blinded_message", message.to_bytes().to_vec())
.bind("amount", u64::from(signature.amount) as i64)
.bind("amount", u64::from(signature.amount))
.bind("keyset_id", signature.keyset_id.to_string())
.bind("c", signature.c.to_bytes().to_vec())
.execute(&self.inner)
Expand Down Expand Up @@ -251,14 +251,21 @@ where
{
Ok(Box::new(SQLTransaction {
inner: ConnectionWithTransaction::new(
self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
self.pool
.get()
.await
.map_err(|e| Error::Database(Box::new(e)))?,
)
.await?,
}))
}

async fn get_active_keyset_id(&self) -> Result<Option<Id>, Self::Err> {
let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
let conn = self
.pool
.get()
.await
.map_err(|e| Error::Database(Box::new(e)))?;
Ok(query(
r#"
SELECT
Expand All @@ -277,7 +284,11 @@ where
}

async fn get_keyset_info(&self, id: &Id) -> Result<Option<MintKeySetInfo>, Self::Err> {
let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
let conn = self
.pool
.get()
.await
.map_err(|e| Error::Database(Box::new(e)))?;
Ok(query(
r#"SELECT
id,
Expand All @@ -301,7 +312,11 @@ where
}

async fn get_keyset_infos(&self) -> Result<Vec<MintKeySetInfo>, Self::Err> {
let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
let conn = self
.pool
.get()
.await
.map_err(|e| Error::Database(Box::new(e)))?;
Ok(query(
r#"SELECT
id,
Expand All @@ -325,7 +340,11 @@ where
}

async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result<Vec<Option<State>>, Self::Err> {
let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
let conn = self
.pool
.get()
.await
.map_err(|e| Error::Database(Box::new(e)))?;
let mut current_states = query(r#"SELECT y, state FROM proof WHERE y IN (:ys)"#)?
.bind_vec("ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())?
.fetch_all(&*conn)
Expand All @@ -346,7 +365,11 @@ where
&self,
blinded_messages: &[PublicKey],
) -> Result<Vec<Option<BlindSignature>>, Self::Err> {
let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
let conn = self
.pool
.get()
.await
.map_err(|e| Error::Database(Box::new(e)))?;
let mut blinded_signatures = query(
r#"SELECT
keyset_id,
Expand Down Expand Up @@ -391,7 +414,11 @@ where
&self,
protected_endpoint: ProtectedEndpoint,
) -> Result<Option<AuthRequired>, Self::Err> {
let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
let conn = self
.pool
.get()
.await
.map_err(|e| Error::Database(Box::new(e)))?;
Ok(
query(r#"SELECT auth FROM protected_endpoints WHERE endpoint = :endpoint"#)?
.bind("endpoint", serde_json::to_string(&protected_endpoint)?)
Expand All @@ -411,7 +438,11 @@ where
async fn get_auth_for_endpoints(
&self,
) -> Result<HashMap<ProtectedEndpoint, Option<AuthRequired>>, Self::Err> {
let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
let conn = self
.pool
.get()
.await
.map_err(|e| Error::Database(Box::new(e)))?;
Ok(query(r#"SELECT endpoint, auth FROM protected_endpoints"#)?
.fetch_all(&*conn)
.await?
Expand Down
63 changes: 37 additions & 26 deletions crates/cdk-sql-common/src/mint/completed_operations.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Completed operations database implementation

use std::collections::HashMap;
use std::str::FromStr;

use async_trait::async_trait;
Expand Down Expand Up @@ -69,7 +70,7 @@ where
async fn add_completed_operation(
&mut self,
operation: &mint::Operation,
fee_by_keyset: &std::collections::HashMap<cdk_common::nuts::Id, cdk_common::Amount>,
fee_by_keyset: &HashMap<cdk_common::nuts::Id, cdk_common::Amount>,
) -> Result<(), Self::Err> {
query(
r#"
Expand All @@ -82,32 +83,30 @@ where
.bind("operation_id", operation.id().to_string())
.bind("operation_kind", operation.kind().to_string())
.bind("completed_at", operation.completed_at().unwrap_or(unix_time()) as i64)
.bind("total_issued", operation.total_issued().to_u64() as i64)
.bind("total_redeemed", operation.total_redeemed().to_u64() as i64)
.bind("fee_collected", operation.fee_collected().to_u64() as i64)
.bind("payment_amount", operation.payment_amount().map(|a| a.to_u64() as i64))
.bind("payment_fee", operation.payment_fee().map(|a| a.to_u64() as i64))
.bind("total_issued", operation.total_issued().to_u64())
.bind("total_redeemed", operation.total_redeemed().to_u64())
.bind("fee_collected", operation.fee_collected().to_u64())
.bind("payment_amount", operation.payment_amount().map(|a| a.to_u64()))
.bind("payment_fee", operation.payment_fee().map(|a| a.to_u64()))
.bind("payment_method", operation.payment_method().map(|m| m.to_string()))
.execute(&self.inner)
.await?;

// Update keyset_amounts with fee_collected from the breakdown
for (keyset_id, fee) in fee_by_keyset {
if fee.to_u64() > 0 {
query(
r#"
INSERT INTO keyset_amounts (keyset_id, total_issued, total_redeemed, fee_collected)
VALUES (:keyset_id, 0, 0, :fee)
ON CONFLICT (keyset_id)
DO UPDATE SET fee_collected = keyset_amounts.fee_collected + EXCLUDED.fee_collected
"#,
)?
.bind("keyset_id", keyset_id.to_string())
.bind("fee", fee.to_u64() as i64)
.execute(&self.inner)
.await?;
}
}
// Bulk-update keyset_amounts with fee_collected from the breakdown
let fee_deltas: HashMap<cdk_common::nuts::Id, u64> = fee_by_keyset
.iter()
.filter(|(_, fee)| fee.to_u64() > 0)
.map(|(id, fee)| (*id, fee.to_u64()))
.collect();

super::keyset_amounts::increment(
&self.inner,
fee_deltas,
"fee_collected",
|a| a.fee_collected,
|a, v| a.fee_collected = v,
)
.await?;

Ok(())
}
Expand All @@ -124,7 +123,11 @@ where
&self,
operation_id: &uuid::Uuid,
) -> Result<Option<mint::Operation>, Self::Err> {
let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
let conn = self
.pool
.get()
.await
.map_err(|e| Error::Database(Box::new(e)))?;
Ok(query(
r#"
SELECT
Expand Down Expand Up @@ -152,7 +155,11 @@ where
&self,
operation_kind: mint::OperationKind,
) -> Result<Vec<mint::Operation>, Self::Err> {
let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
let conn = self
.pool
.get()
.await
.map_err(|e| Error::Database(Box::new(e)))?;
Ok(query(
r#"
SELECT
Expand All @@ -179,7 +186,11 @@ where
}

async fn get_completed_operations(&self) -> Result<Vec<mint::Operation>, Self::Err> {
let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
let conn = self
.pool
.get()
.await
.map_err(|e| Error::Database(Box::new(e)))?;
Ok(query(
r#"
SELECT
Expand Down
29 changes: 24 additions & 5 deletions crates/cdk-sql-common/src/mint/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,10 @@ where
) -> Result<Box<dyn MintKeyDatabaseTransaction<'a, Error> + Send + Sync + 'a>, Error> {
let tx = SQLTransaction {
inner: ConnectionWithTransaction::new(
self.pool.get().map_err(|e| Error::Database(Box::new(e)))?,
self.pool
.get()
.await
.map_err(|e| Error::Database(Box::new(e)))?,
)
.await?,
};
Expand All @@ -150,7 +153,11 @@ where
}

async fn get_active_keyset_id(&self, unit: &CurrencyUnit) -> Result<Option<Id>, Self::Err> {
let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
let conn = self
.pool
.get()
.await
.map_err(|e| Error::Database(Box::new(e)))?;
Ok(
query(r#" SELECT id FROM keyset WHERE active = :active AND unit = :unit"#)?
.bind("active", true)
Expand All @@ -167,7 +174,11 @@ where
}

async fn get_active_keysets(&self) -> Result<HashMap<CurrencyUnit, Id>, Self::Err> {
let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
let conn = self
.pool
.get()
.await
.map_err(|e| Error::Database(Box::new(e)))?;
Ok(
query(r#"SELECT id, unit FROM keyset WHERE active = :active"#)?
.bind("active", true)
Expand All @@ -185,7 +196,11 @@ where
}

async fn get_keyset_info(&self, id: &Id) -> Result<Option<MintKeySetInfo>, Self::Err> {
let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
let conn = self
.pool
.get()
.await
.map_err(|e| Error::Database(Box::new(e)))?;
Ok(query(
r#"SELECT
id,
Expand All @@ -210,7 +225,11 @@ where
}

async fn get_keyset_infos(&self) -> Result<Vec<MintKeySetInfo>, Self::Err> {
let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
let conn = self
.pool
.get()
.await
.map_err(|e| Error::Database(Box::new(e)))?;
Ok(query(
r#"SELECT
id,
Expand Down
Loading
Loading