Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hevm.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,10 @@ library
pretty-hex >= 1.1 && < 1.2,
rosezipper >= 0.2 && < 0.3,
wreq >= 0.5.3 && < 0.6,
random >= 1.2 && < 1.3,
regex-tdfa >= 1.2.3 && < 1.4,
base >= 4.9 && < 5,
time >= 1.9 && < 1.15,
smt2-parser >= 0.1.0 && < 0.2,
spool >= 0.1 && < 0.2,
stm >= 2.5.0 && < 2.6.0,
Expand Down
78 changes: 72 additions & 6 deletions src/EVM/Fetch.hs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import Data.Aeson.Encode.Pretty (encodePretty)
import qualified Data.ByteString.Lazy as BSL
import Data.Bifunctor (first)
import Control.Exception (try, SomeException)
import Control.Exception (SomeException, SomeAsyncException(..), try, catches, Handler(..), throwIO)

import Data.Aeson hiding (Error)
import Data.Aeson.Optics
Expand All @@ -69,7 +69,11 @@
import Control.Monad (when)
import EVM.Effects
import qualified EVM.Expr as Expr
import Control.Concurrent (threadDelay)
import Control.Concurrent.MVar (MVar, newMVar, readMVar, modifyMVar_)
import Data.IORef (IORef, newIORef, readIORef, atomicModifyIORef')
import Data.Time.Clock (UTCTime, getCurrentTime, diffUTCTime, addUTCTime)
import System.Random (randomRIO)

type Fetcher t m = App m => Query t -> m (EVM t ())

Expand All @@ -91,6 +95,9 @@
-- These are NOT persisted to disk
, failedContracts :: MVar (Set Addr)
, failedSlots :: MVar (Set (Addr, W256))
-- Shared rate limit cooldown across workers. When any worker hits a
-- rate limit, it sets a deadline; other workers wait before retrying.
, rpcThrottle :: IORef (Maybe UTCTime)
}

data FetchCache = FetchCache
Expand Down Expand Up @@ -276,6 +283,64 @@
<*> v .: "maxCodeSize"
<*> pure feeSchedule

-- | Wait if there's an active cooldown from a recent rate limit.
waitForCooldown :: IORef (Maybe UTCTime) -> IO ()
waitForCooldown ref = do
deadline <- readIORef ref
case deadline of
Nothing -> pure ()
Just until -> do

Check warning on line 292 in src/EVM/Fetch.hs

View workflow job for this annotation

GitHub Actions / build (windows-latest)

This binding for ‘until’ shadows the existing binding
now <- getCurrentTime
let diff = diffUTCTime until now
when (diff > 0) $
threadDelay (ceiling (realToFrac diff * 1_000_000 :: Double))

-- | Set a cooldown deadline. Keeps the longer of existing vs new deadline.
setCooldown :: IORef (Maybe UTCTime) -> Int -> IO ()
setCooldown ref delayUs = do
now <- getCurrentTime
let newDeadline = addUTCTime (realToFrac delayUs / 1_000_000) now
atomicModifyIORef' ref $ \existing ->
let deadline = case existing of
Just d | d > newDeadline -> d
_ -> newDeadline
in (Just deadline, ())

-- | Retry an IO action on exception with exponential backoff + jitter.
-- Uses a shared cooldown so all workers back off together.
-- Only retries synchronous exceptions (network errors, HTTP 429, etc.).
-- Re-throws all async exceptions (Timeout, ThreadKilled).
-- RPC-level errors (Left) are NOT retried — they pass through.
withRetry :: Bool -> IORef (Maybe UTCTime) -> IO (Either Text a) -> IO (Either Text a)
withRetry debug throttle action = go 1
where
maxRetries = 5
go attempt = do
waitForCooldown throttle
result <- catches (Right <$> action)
[ Handler $ \(SomeAsyncException e) -> throwIO e
, Handler $ \(e :: SomeException) -> pure $ Left e
]
case result of
Right r -> pure r
Left e
| attempt <= maxRetries -> do

Check warning on line 327 in src/EVM/Fetch.hs

View workflow job for this annotation

GitHub Actions / build (windows-latest)

• Defaulting the type variable ‘t0’ to type ‘Integer’ in the following constraints
let baseDelayUs = 500_000 * (2 ^ (attempt - 1)) :: Int
jitter <- randomRIO (0, baseDelayUs)
let delayUs = baseDelayUs + jitter
setCooldown throttle delayUs
when debug $ putStrLn $
"RPC retry " <> show attempt <> "/" <> show maxRetries
<> " in " <> show (delayUs `div` 1000) <> "ms"
threadDelay delayUs
go (attempt + 1)
| otherwise ->
pure $ Left $ pack $ show e

-- | Like fetchWithSession but with retry + shared cooldown from Session.
fetchWithRetry :: Bool -> Text -> Session -> Value -> IO (Either Text Value)
fetchWithRetry debug url session = withRetry debug session.rpcThrottle . fetchWithSession url session.sess

fetchWithSession :: Text -> NetSession.Session -> Value -> IO (Either Text Value)
fetchWithSession url sess x = do
r <- asValue =<< NetSession.post sess (unpack url) x
Expand Down Expand Up @@ -306,7 +371,7 @@
-- Attempt fetch
when (conf.debug) $ putStrLn $ "-> Fetching contract at " ++ show addr
let fetch :: Show a => RpcQuery a -> IO (Either Text a)
fetch = fetchQuery n (fetchWithSession url sess.sess)
fetch = fetchQuery n (fetchWithRetry conf.debug url sess)

codeRes <- fetch (QueryCode addr)
nonceRes <- fetch (QueryNonce addr)
Expand Down Expand Up @@ -373,7 +438,7 @@
else do
-- Attempt fetch
when (conf.debug) $ putStrLn $ "-> Fetching slot " <> show slot <> " at " <> show addr
ret <- fetchSlotWithSession sess.sess n url addr slot
ret <- fetchQuery n (fetchWithRetry conf.debug url sess) (QuerySlot addr slot)
case ret of
Right val -> do
-- Success: cache it
Expand Down Expand Up @@ -436,7 +501,7 @@
internalBlockFetch :: Config -> Session -> BlockNumber -> Text -> IO (Maybe Block)
internalBlockFetch conf sess n url = do
when (conf.debug) $ putStrLn $ "Fetching block " ++ show n ++ " from " ++ unpack url
ret <- fetchQuery n (fetchWithSession url sess.sess) QueryBlock
ret <- fetchQuery n (fetchWithRetry conf.debug url sess) QueryBlock
case ret of
Left _ -> pure Nothing
Right b -> do
Expand Down Expand Up @@ -489,7 +554,8 @@
-- Initialize ephemeral failure tracking
failedContracts <- liftIO $ newMVar Set.empty
failedSlots <- liftIO $ newMVar Set.empty
pure $ Session sess latestBlockNum cache cacheDir failedContracts failedSlots
rpcThrottle <- liftIO $ newIORef Nothing
pure $ Session sess latestBlockNum cache cacheDir failedContracts failedSlots rpcThrottle

mkSessionWithoutCache :: App m => m Session
mkSessionWithoutCache = mkSession Nothing Nothing
Expand Down Expand Up @@ -569,7 +635,7 @@
when (conf.debug) $ liftIO $ putStrLn $ "Fetching slot " <> (show slot) <> " at " <> (show addr)
let (block, url) = fromJust rpcInfo.blockNumURL
n <- liftIO $ getLatestBlockNum conf sess block url
ret <- liftIO $ fetchSlotWithSession sess.sess n url addr slot
ret <- liftIO $ fetchQuery n (fetchWithRetry conf.debug url sess) (QuerySlot addr slot)
case ret of
Right val -> do
liftIO $ modifyMVar_ sess.sharedCache $ \c ->
Expand Down
Loading