Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions electrum/gui/qt/main_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -1387,6 +1387,12 @@ def on_event_request_status(self, wallet, key, status):
def on_event_invoice_status(self, wallet, key, status):
if wallet != self.wallet:
return
if not wallet.is_up_to_date():
# while syncing, these events arrive in bulk (per touched invoice per tx), and
# refresh_item recomputes the status, scanning prevouts on the GUI thread.
# update_wallet() rebuilds the list in one batch once we are up_to_date again.
self.need_update.set()
return
if status == PR_PAID:
self.send_tab.invoice_list.delete_item(key)
else:
Expand Down
93 changes: 62 additions & 31 deletions electrum/wallet.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,8 @@ def __init__(self, db: WalletDB, *, config: SimpleConfig):
self._last_full_history = None
self._tx_parents_cache = {}
self._paid_invoice_keys_cache = set() # type: Set[str]
# only set while an _update_onchain_invoice_paid_detection pass runs, see there
self._invoice_prevouts_cache = None # type: Optional[Dict[bytes, Sequence[Tuple[str, int, int, int]]]]
self._coin_price_cache = {}
self._default_labels = {}
self._accounting_addresses = set() # addresses counted as ours after successful sweep
Expand Down Expand Up @@ -631,7 +633,7 @@ def on_event_adb_added_tx(self, adb, tx_hash: str, tx: Transaction):
self.clear_tx_parents_cache()
if self.lnworker:
self.lnworker.maybe_add_backup_from_tx(tx)
self._update_invoices_and_reqs_touched_by_tx(tx)
self._update_invoices_and_reqs_touched_by_tx(tx, skip_cached_paid=True)
util.trigger_callback('new_transaction', self, tx)

@event_listener
Expand All @@ -649,7 +651,7 @@ def on_event_adb_added_verified_tx(self, adb, tx_hash):
if adb != self.adb:
return
if tx := self.db.get_transaction(tx_hash):
self._update_invoices_and_reqs_touched_by_tx(tx)
self._update_invoices_and_reqs_touched_by_tx(tx, skip_cached_paid=True)
tx_mined_status = self.adb.get_tx_height(tx_hash)
util.trigger_callback('verified', self, tx_hash, tx_mined_status)

Expand Down Expand Up @@ -1376,28 +1378,39 @@ def _prepare_onchain_invoice_paid_detection(self):
self._update_onchain_invoice_paid_detection(self._invoices.keys())

def _update_onchain_invoice_paid_detection(self, invoice_keys: Iterable[str]) -> None:
for invoice_key in invoice_keys:
invoice = self._invoices.get(invoice_key)
if not invoice:
continue
# clear the cache first so self.get_invoice_status takes the slow path,
# which is needed to detect paid->unpaid transitions (e.g. reorgs)
self._paid_invoice_keys_cache.discard(invoice_key)
if invoice.is_lightning():
is_paid_lightning = bool(self.lnworker and self.lnworker.get_invoice_status(invoice) == PR_PAID)
if is_paid_lightning:
self._paid_invoice_keys_cache.add(invoice_key)
if is_paid_lightning or not invoice.get_address():
# invoices often share scriptpubkeys; share the prevout lookups across this pass,
# including the ones done by self.get_invoice_status below
self._invoice_prevouts_cache = {}
try:
for invoice_key in invoice_keys:
invoice = self._invoices.get(invoice_key)
if not invoice:
continue
is_paid, conf_needed, relevant_txs = self._is_onchain_invoice_paid(invoice)
if is_paid:
for txid in relevant_txs:
self._invoices_from_txid_map[txid].add(invoice_key)
for txout in invoice.get_outputs():
self._invoices_from_scriptpubkey_map[txout.scriptpubkey].add(invoice_key)
# update invoice status
status = self.get_invoice_status(invoice)
util.trigger_callback('invoice_status', self, invoice_key, status)
# clear the cache first so self.get_invoice_status takes the slow path,
# which is needed to detect paid->unpaid transitions (e.g. reorgs)
self._paid_invoice_keys_cache.discard(invoice_key)
if invoice.is_lightning():
is_paid_lightning = bool(self.lnworker and self.lnworker.get_invoice_status(invoice) == PR_PAID)
if is_paid_lightning:
self._paid_invoice_keys_cache.add(invoice_key)
if is_paid_lightning or not invoice.get_address():
continue
is_paid, conf_needed, relevant_txs = self._is_onchain_invoice_paid(invoice)
if is_paid:
for txid in relevant_txs:
self._invoices_from_txid_map[txid].add(invoice_key)
# re-add to the cache, so that self.get_invoice_status below short-circuits
# instead of redoing the prevout scan. not for lightning invoices:
# their LN status takes precedence there (e.g. an in-flight LN payment)
if not invoice.is_lightning() and conf_needed:
self._paid_invoice_keys_cache.add(invoice_key)
for txout in invoice.get_outputs():
self._invoices_from_scriptpubkey_map[txout.scriptpubkey].add(invoice_key)
# update invoice status
status = self.get_invoice_status(invoice)
util.trigger_callback('invoice_status', self, invoice_key, status)
finally:
self._invoice_prevouts_cache = None

def _is_onchain_invoice_paid(self, invoice: BaseInvoice) -> Tuple[bool, Optional[int], Sequence[str]]:
"""Returns whether on-chain invoice/request is satisfied, num confs required txs have,
Expand All @@ -1414,15 +1427,12 @@ def _is_onchain_invoice_paid(self, invoice: BaseInvoice) -> Tuple[bool, Optional
conf_needed = None # type: Optional[int]
with self.lock:
for invoice_scriptpubkey, invoice_amt in invoice_amounts.items():
scripthash = bitcoin.script_to_scripthash(invoice_scriptpubkey)
prevouts_and_values = self.db.get_prevouts_by_scripthash(scripthash)
confs_and_values = []
for prevout, v in prevouts_and_values:
relevant_txs.add(prevout.txid.hex())
tx_height = self.adb.get_tx_height(prevout.txid.hex())
if 0 < tx_height.height() <= invoice.height: # exclude txs older than invoice
for txid, height, conf, v in self._get_prevout_infos_for_scriptpubkey(invoice_scriptpubkey):
relevant_txs.add(txid)
if 0 < height <= invoice.height: # exclude txs older than invoice
continue
confs_and_values.append((tx_height.conf or 0, v))
confs_and_values.append((conf, v))
# check that there is at least one TXO, and that they pay enough.
# note: "at least one TXO" check is needed for zero amount invoice (e.g. OP_RETURN)
vsum = 0
Expand All @@ -1435,6 +1445,23 @@ def _is_onchain_invoice_paid(self, invoice: BaseInvoice) -> Tuple[bool, Optional
is_paid = False
return is_paid, conf_needed, list(relevant_txs)

def _get_prevout_infos_for_scriptpubkey(self, scriptpubkey: bytes) -> Sequence[Tuple[str, int, int, int]]:
"""Returns (txid, height, conf, value) for each prevout paying scriptpubkey.
Memoized while an _update_onchain_invoice_paid_detection pass runs (see there).
"""
cache = self._invoice_prevouts_cache
infos = cache.get(scriptpubkey) if cache is not None else None
if infos is None:
scripthash = bitcoin.script_to_scripthash(scriptpubkey)
infos = []
for prevout, v in self.db.get_prevouts_by_scripthash(scripthash):
txid = prevout.txid.hex()
tx_height = self.adb.get_tx_height(txid)
infos.append((txid, tx_height.height(), tx_height.conf or 0, v))
if cache is not None:
cache[scriptpubkey] = infos
return infos

def is_onchain_invoice_paid(self, invoice: BaseInvoice) -> Tuple[bool, Optional[int]]:
is_paid, conf_needed, relevant_txs = self._is_onchain_invoice_paid(invoice)
return is_paid, conf_needed
Expand Down Expand Up @@ -3004,7 +3031,7 @@ def get_invoices_and_requests_touched_by_tx(self, tx):
invoice_keys.add(invoice_key)
return request_keys, invoice_keys

def _update_invoices_and_reqs_touched_by_tx(self, tx: Transaction) -> None:
def _update_invoices_and_reqs_touched_by_tx(self, tx: Transaction, *, skip_cached_paid: bool = False) -> None:
# FIXME in some cases if tx2 replaces unconfirmed tx1 in the mempool, we are not called.
# For a given receive request, if tx1 touches it but tx2 does not, then
# we were called when tx1 was added, but we will not get called when tx2 replaces tx1.
Expand All @@ -3015,6 +3042,10 @@ def _update_invoices_and_reqs_touched_by_tx(self, tx: Transaction) -> None:
continue
status = self.get_invoice_status(request)
util.trigger_callback('request_status', self, request.get_id(), status)
if skip_cached_paid:
# PR_PAID is terminal for tx additions: only a tx/verification removal
# (e.g. reorg) can demote a paid invoice, and those events rescan unconditionally
invoice_keys -= self._paid_invoice_keys_cache
self._update_onchain_invoice_paid_detection(invoice_keys)

def set_broadcasting(self, tx: Transaction, *, broadcasting_status: Optional[int]):
Expand Down
119 changes: 117 additions & 2 deletions tests/test_invoices.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,15 +275,15 @@ def _make_wallet(self):
wallet.adb.receive_tx_callback(funding_tx, tx_height=TX_HEIGHT_UNCONFIRMED)
return wallet

def _make_outgoing_invoice(self, dest_addr: str, amount_sat: int, *, t: int = 1700000000):
def _make_outgoing_invoice(self, dest_addr: str, amount_sat: int, *, t: int = 1700000000, height: int = 0):
outputs = [PartialTxOutput.from_address_and_value(dest_addr, amount_sat)]
return Invoice(
amount_msat=amount_sat * 1000,
message="",
time=t,
exp=LN_EXPIRY_NEVER,
outputs=outputs,
height=0,
height=height,
lightning_invoice=None,
)

Expand Down Expand Up @@ -428,6 +428,121 @@ async def test_paid_keys_populated_on_wallet_load(self):
wallet._prepare_onchain_invoice_paid_detection()
self.assertIn(inv.get_id(), wallet._paid_invoice_keys_cache)

async def test_update_detection_scans_each_onchain_invoice_once(self):
"""_update_onchain_invoice_paid_detection needs the prevout scan for relevant_txs;
it must reuse that result instead of scanning a second time via get_invoice_status."""
wallet = self._make_wallet()
dest = "tb1qmjzmg8nd4z56ar4fpngzsr6euktrhnjg9td385"
amount_sat = 5_000
inv = self._make_outgoing_invoice(dest, amount_sat)
wallet.save_invoice(inv, write_to_disk=False)
# Pay it (confirmed).
outputs = [PartialTxOutput.from_address_and_value(dest, amount_sat)]
tx = wallet.make_unsigned_transaction(outputs=outputs, fee_policy=FixedFeePolicy(1000))
wallet.sign_transaction(tx, password=None)
wallet.adb.receive_tx_callback(tx, tx_height=TX_HEIGHT_UNCONFIRMED)
wallet.db.put('stored_height', 1010)
wallet.adb.add_verified_tx(tx.txid(), TxMinedInfo(_height=1001, timestamp=1700000001, txpos=1, header_hash="01"*32))

# Track the expensive prevout scan.
scanned = []
orig = wallet._is_onchain_invoice_paid
def spy(invoice):
scanned.append(invoice.get_id())
return orig(invoice)
wallet._is_onchain_invoice_paid = spy

wallet._paid_invoice_keys_cache.clear() # force the slow path
wallet._prepare_onchain_invoice_paid_detection()

self.assertEqual(1, scanned.count(inv.get_id()),
"onchain invoice must be scanned exactly once during load")
# The dedup must not change the observable outcome.
self.assertIn(inv.get_id(), wallet._paid_invoice_keys_cache)
self.assertEqual(PR_PAID, wallet.get_invoice_status(inv))

async def test_new_tx_does_not_rescan_cached_paid_invoices(self):
"""A tx addition can only promote an invoice towards paid; demotion needs a
tx/verification removal (e.g. reorg), which forces the rescan. So tx additions
must not rescan invoices already cached as PR_PAID - on wallets with many
invoices sharing a destination, that rescan storm starves the network loop
during sync."""
wallet = self._make_wallet()
dest = "tb1qmjzmg8nd4z56ar4fpngzsr6euktrhnjg9td385"
amount_sat = 5_000
inv = self._make_outgoing_invoice(dest, amount_sat)
wallet.save_invoice(inv, write_to_disk=False)
# Pay it (confirmed) -> cached as PR_PAID.
outputs = [PartialTxOutput.from_address_and_value(dest, amount_sat)]
tx = wallet.make_unsigned_transaction(outputs=outputs, fee_policy=FixedFeePolicy(1000))
wallet.sign_transaction(tx, password=None)
wallet.adb.receive_tx_callback(tx, tx_height=TX_HEIGHT_UNCONFIRMED)
wallet.db.put('stored_height', 1010)
wallet.adb.add_verified_tx(tx.txid(), TxMinedInfo(_height=1001, timestamp=1700000001, txpos=1, header_hash="01"*32))
self.assertEqual(PR_PAID, wallet.get_invoice_status(inv))
self.assertIn(inv.get_id(), wallet._paid_invoice_keys_cache)

# Track the expensive prevout scan.
scanned = []
orig = wallet._is_onchain_invoice_paid
def spy(invoice):
scanned.append(invoice.get_id())
return orig(invoice)
wallet._is_onchain_invoice_paid = spy

# Another tx paying the same destination arrives.
tx2 = wallet.make_unsigned_transaction(
outputs=[PartialTxOutput.from_address_and_value(dest, 1_000)],
fee_policy=FixedFeePolicy(1000),
)
wallet.sign_transaction(tx2, password=None)
wallet.adb.receive_tx_callback(tx2, tx_height=TX_HEIGHT_UNCONFIRMED)
self.assertNotIn(inv.get_id(), scanned,
"tx addition must not rescan an invoice already cached as PR_PAID")
# Verifying the new tx must not rescan either.
wallet.adb.add_verified_tx(tx2.txid(), TxMinedInfo(_height=1005, timestamp=1700000002, txpos=1, header_hash="02"*32))
self.assertNotIn(inv.get_id(), scanned,
"tx verification must not rescan an invoice already cached as PR_PAID")
self.assertEqual(PR_PAID, wallet.get_invoice_status(inv))

async def test_detection_pass_looks_up_each_scripthash_once(self):
"""Invoices sharing a destination scriptpubkey must not each redo the same
prevout lookup within one detection pass; the per-invoice height filtering
must still apply on top of the shared lookup."""
wallet = self._make_wallet()
dest = "tb1qmjzmg8nd4z56ar4fpngzsr6euktrhnjg9td385"
invs = [self._make_outgoing_invoice(dest, 1_000 + i, t=1700000000 + i) for i in range(5)]
# This one was created above the paying tx's height, so the tx must not count for it.
inv_late = self._make_outgoing_invoice(dest, 1_000, t=1700000010, height=1005)
for inv in invs + [inv_late]:
wallet.save_invoice(inv, write_to_disk=False)
# Pay the shared destination (confirmed at height 1001).
outputs = [PartialTxOutput.from_address_and_value(dest, 50_000)]
tx = wallet.make_unsigned_transaction(outputs=outputs, fee_policy=FixedFeePolicy(1000))
wallet.sign_transaction(tx, password=None)
wallet.adb.receive_tx_callback(tx, tx_height=TX_HEIGHT_UNCONFIRMED)
wallet.db.put('stored_height', 1010)
wallet.adb.add_verified_tx(tx.txid(), TxMinedInfo(_height=1001, timestamp=1700000001, txpos=1, header_hash="01"*32))

# Track the prevout lookups.
looked_up = []
orig = wallet.db.get_prevouts_by_scripthash
def spy(scripthash):
looked_up.append(scripthash)
return orig(scripthash)
wallet.db.get_prevouts_by_scripthash = spy

wallet._paid_invoice_keys_cache.clear() # force the slow path
wallet._prepare_onchain_invoice_paid_detection()

self.assertEqual(1, len(looked_up),
"one shared scriptpubkey must be looked up exactly once per pass")
# The shared lookup must not change the per-invoice outcomes.
for inv in invs:
self.assertIn(inv.get_id(), wallet._paid_invoice_keys_cache)
self.assertNotIn(inv_late.get_id(), wallet._paid_invoice_keys_cache)
self.assertEqual(PR_UNPAID, wallet.get_invoice_status(inv_late))

async def test_paid_keys_demoted_on_reorg(self):
"""A reorg unverifying the paying tx must remove the invoice from the cache,
otherwise get_invoice_status would keep returning a stale PR_PAID."""
Expand Down