Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 59 additions & 8 deletions src/price/calculator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,18 @@ impl RawSwapResult {
}
}

/// Outcome of scanning a single gap.
///
/// `degraded` is `true` when at least one relevant swap was dropped because
/// its token decimals could not be fetched. A degraded scan yields a zeroed
/// or partial aggregate that reflects a transient RPC failure rather than the
/// real on-chain activity, so the orchestration must not cache it as
/// authoritative.
struct GapScan {
result: TokenPriceResult,
degraded: bool,
}

/// Calculates token prices from blockchain swap events using a configurable price source.
///
/// This calculator fetches swap events from the blockchain, extracts price information,
Expand Down Expand Up @@ -238,7 +250,7 @@ impl<P: Provider + Clone> PriceCalculator<P> {
token_address: Address,
gap_start: BlockNumber,
gap_end: BlockNumber,
) -> Result<TokenPriceResult, PriceCalculationError> {
) -> Result<GapScan, PriceCalculationError> {
let scanner = SwapLogScanner::new(
&self.provider,
self.chain,
Expand Down Expand Up @@ -268,7 +280,10 @@ impl<P: Provider + Clone> PriceCalculator<P> {

let mut aggregator = PriceAggregator::new(token_address);
if relevant.is_empty() {
return Ok(aggregator.finish());
return Ok(GapScan {
result: aggregator.finish(),
degraded: false,
});
}

let metadata = TokenMetadataProvider::new(&self.provider);
Expand All @@ -279,16 +294,21 @@ impl<P: Provider + Clone> PriceCalculator<P> {
)
.await;

let mut degraded = false;
for swap in relevant {
// A decimals fetch failure skips the swap rather than aborting the
// whole range, so one unreadable token can't void an entire scan.
// The drop is recorded so the orchestration declines to cache this
// gap as authoritative — otherwise a transient failure would freeze
// a zeroed aggregate in and defeat retries.
let target_decimals = match metadata
.get_or_fetch(&mut self.decimals_cache, token_address)
.await
{
Ok(d) => d,
Err(e) => {
error!(error = ?e, "Error processing swap data");
degraded = true;
continue;
}
};
Expand All @@ -299,6 +319,7 @@ impl<P: Provider + Clone> PriceCalculator<P> {
Ok(d) => d,
Err(e) => {
error!(error = ?e, "Error processing swap data");
degraded = true;
continue;
}
};
Expand All @@ -314,7 +335,10 @@ impl<P: Provider + Clone> PriceCalculator<P> {
}
}

Ok(aggregator.finish())
Ok(GapScan {
result: aggregator.finish(),
degraded,
})
}

/// Calculate aggregated price totals for `token_address` between
Expand Down Expand Up @@ -357,6 +381,13 @@ impl<P: Provider + Clone> PriceCalculator<P> {
// Initialize with any cached data or create new result
let mut price_data = cached_result.unwrap_or_else(|| TokenPriceResult::new(token_address));

// A gap degraded by a decimals fetch failure must not be cached: doing
// so would freeze a transient failure in and return the zeroed
// aggregate on every later call. Track whether any gap degraded so the
// full-range write-back below is skipped too — it would otherwise
// re-cache the same degraded data under the whole query window.
let mut any_degraded = false;

// Process each gap
for gap in gaps {
info!(
Expand All @@ -367,12 +398,23 @@ impl<P: Provider + Clone> PriceCalculator<P> {
);

// Process the gap by fetching logs in chunks with rate limiting
let gap_result = self
let GapScan {
result: gap_result,
degraded,
} = self
.process_gap_for_price(token_address, gap.start, gap.end)
.await?;

// Cache the gap result
{
if degraded {
any_degraded = true;
warn!(
token_address = ?token_address,
gap_start = gap.start,
gap_end = gap.end,
"Gap scan degraded by a token-decimals fetch failure; not caching so the range is rescanned on a later call"
);
} else {
// Cache the gap result
let mut cache = self.price_cache.lock()
.expect("Price cache mutex poisoned - indicates a panic occurred while holding the lock");
cache.insert(token_address, gap.start, gap.end, gap_result.clone());
Expand All @@ -382,8 +424,17 @@ impl<P: Provider + Clone> PriceCalculator<P> {
price_data.merge(&gap_result);
}

// Cache the complete result
{
// Cache the complete result, unless a gap degraded — a covering
// write-back would collapse the disjoint segments into one entry that
// includes the degraded gap and serve it for the whole window.
if any_degraded {
warn!(
token_address = ?token_address,
start_block = start_block,
end_block = end_block,
"Range scan degraded by a token-decimals fetch failure; skipping full-range cache write-back so a later call rescans the affected gap"
);
} else {
let mut cache = self.price_cache.lock().expect(
"Price cache mutex poisoned - indicates a panic occurred while holding the lock",
);
Expand Down
Loading