Skip to content
Open
37 changes: 32 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ concurrency:
cancel-in-progress: true

permissions:
contents: write
contents: read
pull-requests: write

jobs:
check-rust:
if: (github.event.action != 'closed' || github.event.pull_request.merged == true)
name: Check Rust
check-rust-pr:
if: github.event_name == 'pull_request' && (github.event.action != 'closed' || github.event.pull_request.merged == true)
name: Check Rust (PR)
runs-on: ubuntu-latest
permissions:
contents: write # needed for commit-changes push
contents: write
pull-requests: write # needed for PR comments (e.g., Codecov)
steps:
- name: Checkout
Expand All @@ -36,3 +36,30 @@ jobs:
with:
run-coverage: true
codecov-token: ${{ secrets.CODECOV_TOKEN }}

check-rust-push:
if: github.event_name == 'push'
name: Check Rust (Push)
runs-on: ubuntu-latest
permissions:
contents: read
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Install Rust
uses: dtolnay/rust-toolchain@stable
with:
components: rustfmt, clippy

- name: Cache Cargo
uses: Swatinem/rust-cache@v2

- name: Format
run: cargo fmt --all -- --check

- name: Clippy
run: cargo clippy --all-targets --all-features -D warnings

- name: Test
run: cargo test --workspace
164 changes: 124 additions & 40 deletions src/adapter/solana.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,30 @@
//! Solana adapter — TODO: implement Solana support behind the `solana` feature.
//! Solana adapter — partial Solana support behind the `solana` feature.
//!
//! [`Sol`] is the zero-sized tag type. Instantiate a [`Sender<Sol>`] once the
//! Solana adapter is implemented.
//!
//! [`Sender<Sol>`]: crate::Sender

use crate::adapter::{
BlockReceiver, ChainAdapter, ChainClient, FeeManager, PendingTransaction, ReplayProtection,
RetryDecision, RetryStrategy, SendOutcome, TransactionStatus,
use crate::{
Error,
adapter::{
BlockReceiver, ChainAdapter, ChainClient, FeeManager, PendingTransaction, ReplayProtection,
RetryDecision, RetryStrategy, SendOutcome, TransactionStatus,
},
};
use crate::Error;
use async_trait::async_trait;
use solana_client::{
nonblocking::rpc_client::RpcClient,
pubsub_client::PubsubClient,
};
use log::warn;
use solana_client::{nonblocking::rpc_client::RpcClient, pubsub_client::PubsubClient};
use solana_sdk::{
commitment_config::CommitmentConfig,
hash::Hash, instruction::Instruction, signature::Signature, transaction::VersionedTransaction,
compute_budget::ComputeBudgetInstruction,
hash::Hash,
message::Message as SolanaMessage,
signature::{Keypair, Signature, Signer},
transaction::{TransactionError, VersionedTransaction},
};
use solana_transaction_status::TransactionConfirmationStatus;
use std::sync::Arc;
use std::time::Duration;
use std::{cmp::min, sync::Arc, time::Duration};
use tokio::sync::Mutex;

/// Solana fee parameters (placeholder).
Expand All @@ -33,6 +36,15 @@ pub struct SolFeeParams {
pub compute_unit_limit: u32,
}

impl Default for SolFeeParams {
fn default() -> Self {
Self {
compute_unit_price: DEFAULT_COMPUTE_UNIT_PRICE,
compute_unit_limit: DEFAULT_COMPUTE_UNIT_LIMIT,
}
}
}

/// Solana adapter tag type. All SOL-specific types are associated here.
pub struct Sol;

Expand All @@ -50,25 +62,25 @@ impl ChainAdapter for Sol {
pub struct SolClient {
rpc: Arc<RpcClient>,
pubsub_url: String,
payer: Arc<Keypair>,
}

impl SolClient {
#[allow(dead_code)]
pub fn new(rpc: Arc<RpcClient>, pubsub_url: impl Into<String>) -> Self {
pub fn new(rpc: Arc<RpcClient>, pubsub_url: impl Into<String>, payer: Arc<Keypair>) -> Self {
Self {
rpc,
pubsub_url: pubsub_url.into(),
payer,
}
}
}

#[async_trait]
impl ChainClient<Sol> for SolClient {
async fn subscribe_new_blocks(&self) -> Result<BlockReceiver, Error> {
let (subscription, receiver) =
PubsubClient::slot_subscribe(&self.pubsub_url).map_err(|err| {
Error::SolanaError(format!("slot_subscribe failed: {err}"))
})?;
let (subscription, receiver) = PubsubClient::slot_subscribe(&self.pubsub_url)
.map_err(|err| Error::SolanaError(format!("slot_subscribe failed: {err}")))?;

let (tx, rx) = tokio::sync::mpsc::channel(32);
std::thread::spawn(move || {
Expand All @@ -93,13 +105,58 @@ impl ChainClient<Sol> for SolClient {

async fn send_transaction(
&self,
_msg: &crate::Message,
_fee: &SolFeeParams,
_replay_token: &Hash,
msg: &crate::Message,
fee: &SolFeeParams,
replay_token: &Hash,
) -> Result<SendOutcome<Sol>, Error> {
Err(Error::SolanaError(
"send_transaction not implemented for Solana yet".to_string(),
))
let payload = msg
.solana
.as_ref()
.ok_or_else(|| Error::SolanaError("missing solana payload".to_string()))?;

let mut instructions = Vec::with_capacity(payload.instructions.len() + 2);
if fee.compute_unit_limit > 0 {
instructions.push(ComputeBudgetInstruction::set_compute_unit_limit(
fee.compute_unit_limit,
));
}
if fee.compute_unit_price > 0 {
instructions.push(ComputeBudgetInstruction::set_compute_unit_price(
fee.compute_unit_price,
));
}
instructions.extend(payload.instructions.iter().cloned());

let payer = self.payer.pubkey();
let message = SolanaMessage::new_with_blockhash(&instructions, Some(&payer), replay_token);
let tx = VersionedTransaction::try_new(message, &[self.payer.as_ref()])
.map_err(|err| Error::SolanaError(format!("failed to sign transaction: {err}")))?;

let signature = self
.rpc
.send_transaction(&tx)
.await
.map_err(|err| Error::SolanaError(format!("send_transaction failed: {err}")))?;

let start = std::time::Instant::now();
loop {
let status = self.get_transaction_status(&signature).await?;
match status {
TransactionStatus::Confirmed { .. } | TransactionStatus::Finalized { .. } => {
return Ok(SendOutcome::Confirmed { tx_id: signature });
}
TransactionStatus::Failed { .. } => {
return Ok(SendOutcome::Reverted { tx_id: signature });
}
TransactionStatus::Expired => return Ok(SendOutcome::Dropped { tx_id: signature }),
TransactionStatus::Pending => {
if start.elapsed() >= Duration::from_secs(SOLANA_TX_TIMEOUT_SECS) {
return Ok(SendOutcome::Dropped { tx_id: signature });
}
tokio::time::sleep(Duration::from_millis(SOLANA_STATUS_POLL_MS)).await;
}
}
}
}

async fn get_transaction_status(&self, _id: &Signature) -> Result<TransactionStatus, Error> {
Expand All @@ -119,6 +176,9 @@ impl ChainClient<Sol> for SolClient {
};

if let Some(err) = status.err {
if matches!(err, TransactionError::BlockhashNotFound) {
return Ok(TransactionStatus::Expired);
}
return Ok(TransactionStatus::Failed {
reason: format!("{err:?}"),
});
Expand All @@ -142,23 +202,32 @@ pub struct SolFeeManager;

#[async_trait]
impl FeeManager<Sol> for SolFeeManager {
async fn get_fee_params(&self, _priority: u32) -> Result<SolFeeParams, Error> {
todo!("Use getRecentPrioritizationFees to derive CU price/limit");
async fn get_fee_params(&self, priority: u32) -> Result<SolFeeParams, Error> {
let priority = min(priority, crate::message::MAX_PRIORITY) as u64;
let price_range = MAX_COMPUTE_UNIT_PRICE.saturating_sub(DEFAULT_COMPUTE_UNIT_PRICE);
let price = DEFAULT_COMPUTE_UNIT_PRICE.saturating_add(
price_range.saturating_mul(priority) / crate::message::MAX_PRIORITY as u64,
);

Ok(SolFeeParams {
compute_unit_price: price,
compute_unit_limit: DEFAULT_COMPUTE_UNIT_LIMIT,
})
}

async fn update_on_confirmation(&self, _confirmation_time: Duration, _fee_paid: &SolFeeParams) {
todo!("Update internal fee model from confirmation latency");
// Placeholder for future fee model updates.
}

fn bump_fee(&self, current: &SolFeeParams) -> SolFeeParams {
SolFeeParams {
compute_unit_price: current.compute_unit_price.saturating_add(1),
compute_unit_price: bump_by_percent(current.compute_unit_price, FEE_BUMP_PERCENT),
compute_unit_limit: current.compute_unit_limit,
}
}

async fn get_base_fee(&self) -> SolFeeParams {
todo!("Return baseline fee params");
Ok(SolFeeParams::default())
}
}

Expand Down Expand Up @@ -217,9 +286,7 @@ impl ReplayProtection<Sol> for SolReplayProtection {
.rpc
.get_latest_blockhash_with_commitment(CommitmentConfig::processed())
.await
.map_err(|err| {
Error::SolanaError(format!("get_latest_blockhash failed: {err}"))
})?;
.map_err(|err| Error::SolanaError(format!("get_latest_blockhash failed: {err}")))?;

let mut state = self.state.lock().await;
state.hash = hash;
Expand All @@ -242,7 +309,17 @@ impl RetryStrategy<Sol> for SolRetryStrategy {
_fees: &SolFeeManager,
_replay: &SolReplayProtection,
) -> RetryDecision<Sol> {
todo!("Resubmit with fresh blockhash + bumped CU price");
if let Err(err) = _replay.sync().await {
warn!("failed to refresh blockhash after drop: {err:?}");
return RetryDecision::Requeue;
}

let new_fee = _fees.bump_fee(&_pending.fee);
let new_replay = _replay.next().await;
RetryDecision::Resubmit {
fee: new_fee,
replay_token: new_replay,
}
}

async fn handle_confirmed(
Expand All @@ -252,16 +329,23 @@ impl RetryStrategy<Sol> for SolRetryStrategy {
_replay: &SolReplayProtection,
_confirmation_time: Duration,
) {
todo!("Update fee model / replay protection state");
_fees
.update_on_confirmation(_confirmation_time, &_pending.fee)
.await;
if let Err(err) = _replay.sync().await {
warn!("failed to refresh blockhash after confirmation: {err:?}");
}
}
}

// --- Solana payload helpers (placeholder) -----------------------------------
const DEFAULT_COMPUTE_UNIT_LIMIT: u32 = 200_000;
const DEFAULT_COMPUTE_UNIT_PRICE: u64 = 0;
const MAX_COMPUTE_UNIT_PRICE: u64 = 10_000;
const FEE_BUMP_PERCENT: u64 = 20;
const SOLANA_TX_TIMEOUT_SECS: u64 = 30;
const SOLANA_STATUS_POLL_MS: u64 = 500;

#[allow(dead_code)]
fn build_transaction(
_instructions: Vec<Instruction>,
_blockhash: Hash,
) -> VersionedTransaction {
todo!("Construct VersionedTransaction with payer + instructions");
fn bump_by_percent(value: u64, percent: u64) -> u64 {
let bump = value.saturating_mul(percent) / 100;
value.saturating_add(bump)
}
39 changes: 39 additions & 0 deletions src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
//! [`Message`]: Message

use alloy::primitives::{Address, Bytes, U256};
#[cfg(feature = "solana")]
use solana_sdk::instruction::Instruction;
use std::fmt::Display;

pub(crate) const MAX_PRIORITY: u32 = 100;
Expand Down Expand Up @@ -87,6 +89,18 @@ pub struct Message {

/// The number of times this message has been retried after a dropped transaction.
retry_count: u32,

/// Optional Solana payload when the `solana` feature is enabled.
#[cfg(feature = "solana")]
pub solana: Option<SolanaPayload>,
}

/// Solana-specific transaction payload.
#[cfg(feature = "solana")]
#[derive(Debug, Clone)]
pub struct SolanaPayload {
/// Ordered Solana instructions to execute.
pub instructions: Vec<Instruction>,
}

impl Message {
Expand Down Expand Up @@ -115,6 +129,29 @@ impl Message {
deadline_ms,
created_at_ms: now_ms,
retry_count: 0,
#[cfg(feature = "solana")]
solana: None,
}
}

/// Creates a Solana [`Message`] with the given instructions.
#[cfg(feature = "solana")]
pub fn solana(
instructions: Vec<Instruction>,
priority: u32,
deadline_ms: Option<u32>,
now_ms: u64,
) -> Self {
Self {
to: None,
value: U256::ZERO,
data: Bytes::default(),
gas: 0,
priority: priority.min(MAX_PRIORITY),
deadline_ms,
created_at_ms: now_ms,
retry_count: 0,
solana: Some(SolanaPayload { instructions }),
}
}

Expand Down Expand Up @@ -215,6 +252,8 @@ impl Default for Message {
deadline_ms: None,
created_at_ms: SystemClock.now_ms(),
retry_count: 0,
#[cfg(feature = "solana")]
solana: None,
}
}
}
Expand Down
Loading