mirror of
https://github.com/aljazceru/nutshell.git
synced 2026-01-08 19:24:21 +01:00
Fix race condition (#586)
* `_set_proofs_pending` performs DB related "proofs are spendable" check inside the lock. * move _verify_spent_proofs_and_set_pending to write.py * edit logging --------- Co-authored-by: callebtc <93376500+callebtc@users.noreply.github.com>
This commit is contained in:
@@ -2,6 +2,7 @@ from typing import Dict, List, Optional
|
||||
|
||||
from ...core.base import Proof, ProofSpentState, ProofState
|
||||
from ...core.db import Connection, Database
|
||||
from ...core.errors import TokenAlreadySpentError
|
||||
from ..crud import LedgerCrud
|
||||
|
||||
|
||||
@@ -74,3 +75,19 @@ class DbReadHelper:
|
||||
)
|
||||
)
|
||||
return states
|
||||
|
||||
async def _verify_proofs_spendable(
|
||||
self, proofs: List[Proof], conn: Optional[Connection] = None
|
||||
):
|
||||
"""Checks the database to see if any of the proofs are already spent.
|
||||
|
||||
Args:
|
||||
proofs (List[Proof]): Proofs to verify
|
||||
conn (Optional[Connection]): Database connection to use. Defaults to None.
|
||||
|
||||
Raises:
|
||||
TokenAlreadySpentError: If any of the proofs are already spent
|
||||
"""
|
||||
async with self.db.get_connection(conn) as conn:
|
||||
if not len(await self._get_proofs_spent([p.Y for p in proofs], conn)) == 0:
|
||||
raise TokenAlreadySpentError()
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import random
|
||||
from typing import List, Optional, Union
|
||||
|
||||
from loguru import logger
|
||||
@@ -18,43 +17,49 @@ from ...core.errors import (
|
||||
)
|
||||
from ..crud import LedgerCrud
|
||||
from ..events.events import LedgerEventManager
|
||||
from .read import DbReadHelper
|
||||
|
||||
|
||||
class DbWriteHelper:
|
||||
db: Database
|
||||
crud: LedgerCrud
|
||||
events: LedgerEventManager
|
||||
db_read: DbReadHelper
|
||||
|
||||
def __init__(
|
||||
self, db: Database, crud: LedgerCrud, events: LedgerEventManager
|
||||
self,
|
||||
db: Database,
|
||||
crud: LedgerCrud,
|
||||
events: LedgerEventManager,
|
||||
db_read: DbReadHelper,
|
||||
) -> None:
|
||||
self.db = db
|
||||
self.crud = crud
|
||||
self.events = events
|
||||
self.db_read = db_read
|
||||
|
||||
async def _set_proofs_pending(
|
||||
async def _verify_spent_proofs_and_set_pending(
|
||||
self, proofs: List[Proof], quote_id: Optional[str] = None
|
||||
) -> None:
|
||||
"""If none of the proofs is in the pending table (_validate_proofs_pending), adds proofs to
|
||||
the list of pending proofs or removes them. Used as a mutex for proofs.
|
||||
|
||||
"""
|
||||
Method to check if proofs are already spent. If they are not spent, we check if they are pending.
|
||||
If they are not pending, we set them as pending.
|
||||
Args:
|
||||
proofs (List[Proof]): Proofs to add to pending table.
|
||||
quote_id (Optional[str]): Melt quote ID. If it is not set, we assume the pending tokens to be from a swap.
|
||||
|
||||
Raises:
|
||||
Exception: At least one proof already in pending table.
|
||||
TransactionError: If any one of the proofs is already spent or pending.
|
||||
"""
|
||||
# first we check whether these proofs are pending already
|
||||
random_id = random.randint(0, 1000000)
|
||||
try:
|
||||
logger.debug("trying to set proofs pending")
|
||||
logger.trace(f"get_connection: random_id: {random_id}")
|
||||
logger.trace("_verify_spent_proofs_and_set_pending acquiring lock")
|
||||
async with self.db.get_connection(
|
||||
lock_table="proofs_pending",
|
||||
lock_timeout=1,
|
||||
) as conn:
|
||||
logger.trace(f"get_connection: got connection {random_id}")
|
||||
logger.trace("checking whether proofs are already spent")
|
||||
await self.db_read._verify_proofs_spendable(proofs, conn)
|
||||
logger.trace("checking whether proofs are already pending")
|
||||
await self._validate_proofs_pending(proofs, conn)
|
||||
for p in proofs:
|
||||
logger.trace(f"crud: setting proof {p.Y} as PENDING")
|
||||
@@ -62,10 +67,10 @@ class DbWriteHelper:
|
||||
proof=p, db=self.db, quote_id=quote_id, conn=conn
|
||||
)
|
||||
logger.trace(f"crud: set proof {p.Y} as PENDING")
|
||||
logger.trace("_verify_spent_proofs_and_set_pending released lock")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to set proofs pending: {e}")
|
||||
raise TransactionError(f"Failed to set proofs pending: {str(e)}")
|
||||
logger.trace("_set_proofs_pending released lock")
|
||||
raise e
|
||||
for p in proofs:
|
||||
await self.events.submit(ProofState(Y=p.Y, state=ProofSpentState.pending))
|
||||
|
||||
|
||||
@@ -100,7 +100,7 @@ class Ledger(LedgerVerification, LedgerSpendingConditions, LedgerTasks, LedgerFe
|
||||
self.backends = backends
|
||||
self.pubkey = derive_pubkey(self.seed)
|
||||
self.db_read = DbReadHelper(self.db, self.crud)
|
||||
self.db_write = DbWriteHelper(self.db, self.crud, self.events)
|
||||
self.db_write = DbWriteHelper(self.db, self.crud, self.events, self.db_read)
|
||||
|
||||
# ------- STARTUP -------
|
||||
|
||||
@@ -905,7 +905,9 @@ class Ledger(LedgerVerification, LedgerSpendingConditions, LedgerTasks, LedgerFe
|
||||
await self.verify_inputs_and_outputs(proofs=proofs)
|
||||
|
||||
# set proofs to pending to avoid race conditions
|
||||
await self.db_write._set_proofs_pending(proofs, quote_id=melt_quote.quote)
|
||||
await self.db_write._verify_spent_proofs_and_set_pending(
|
||||
proofs, quote_id=melt_quote.quote
|
||||
)
|
||||
try:
|
||||
# settle the transaction internally if there is a mint quote with the same payment request
|
||||
melt_quote = await self.melt_mint_settle_internally(melt_quote, proofs)
|
||||
@@ -985,7 +987,7 @@ class Ledger(LedgerVerification, LedgerSpendingConditions, LedgerTasks, LedgerFe
|
||||
logger.trace("swap called")
|
||||
# verify spending inputs, outputs, and spending conditions
|
||||
await self.verify_inputs_and_outputs(proofs=proofs, outputs=outputs)
|
||||
await self.db_write._set_proofs_pending(proofs)
|
||||
await self.db_write._verify_spent_proofs_and_set_pending(proofs)
|
||||
try:
|
||||
async with self.db.get_connection(lock_table="proofs_pending") as conn:
|
||||
await self._invalidate_proofs(proofs=proofs, conn=conn)
|
||||
|
||||
@@ -17,7 +17,6 @@ from ..core.errors import (
|
||||
NoSecretInProofsError,
|
||||
NotAllowedError,
|
||||
SecretTooLongError,
|
||||
TokenAlreadySpentError,
|
||||
TransactionError,
|
||||
TransactionUnitError,
|
||||
)
|
||||
@@ -67,12 +66,6 @@ class LedgerVerification(
|
||||
# Verify inputs
|
||||
if not proofs:
|
||||
raise TransactionError("no proofs provided.")
|
||||
# Verify proofs are spendable
|
||||
if (
|
||||
not len(await self.db_read._get_proofs_spent([p.Y for p in proofs], conn))
|
||||
== 0
|
||||
):
|
||||
raise TokenAlreadySpentError()
|
||||
# Verify amounts of inputs
|
||||
if not all([self._verify_amount(p.amount) for p in proofs]):
|
||||
raise TransactionError("invalid amount.")
|
||||
|
||||
@@ -184,7 +184,9 @@ async def test_db_get_connection_lock_row(wallet: Wallet, ledger: Ledger):
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_db_set_proofs_pending_race_condition(wallet: Wallet, ledger: Ledger):
|
||||
async def test_db_verify_spent_proofs_and_set_pending_race_condition(
|
||||
wallet: Wallet, ledger: Ledger
|
||||
):
|
||||
# fill wallet
|
||||
invoice = await wallet.request_mint(64)
|
||||
await pay_if_regtest(invoice.bolt11)
|
||||
@@ -193,8 +195,8 @@ async def test_db_set_proofs_pending_race_condition(wallet: Wallet, ledger: Ledg
|
||||
|
||||
await assert_err_multiple(
|
||||
asyncio.gather(
|
||||
ledger.db_write._set_proofs_pending(wallet.proofs),
|
||||
ledger.db_write._set_proofs_pending(wallet.proofs),
|
||||
ledger.db_write._verify_spent_proofs_and_set_pending(wallet.proofs),
|
||||
ledger.db_write._verify_spent_proofs_and_set_pending(wallet.proofs),
|
||||
),
|
||||
[
|
||||
"failed to acquire database lock",
|
||||
@@ -204,7 +206,7 @@ async def test_db_set_proofs_pending_race_condition(wallet: Wallet, ledger: Ledg
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_db_set_proofs_pending_delayed_no_race_condition(
|
||||
async def test_db_verify_spent_proofs_and_set_pending_delayed_no_race_condition(
|
||||
wallet: Wallet, ledger: Ledger
|
||||
):
|
||||
# fill wallet
|
||||
@@ -213,21 +215,21 @@ async def test_db_set_proofs_pending_delayed_no_race_condition(
|
||||
await wallet.mint(64, id=invoice.id)
|
||||
assert wallet.balance == 64
|
||||
|
||||
async def delayed_set_proofs_pending():
|
||||
async def delayed_verify_spent_proofs_and_set_pending():
|
||||
await asyncio.sleep(0.1)
|
||||
await ledger.db_write._set_proofs_pending(wallet.proofs)
|
||||
await ledger.db_write._verify_spent_proofs_and_set_pending(wallet.proofs)
|
||||
|
||||
await assert_err(
|
||||
asyncio.gather(
|
||||
ledger.db_write._set_proofs_pending(wallet.proofs),
|
||||
delayed_set_proofs_pending(),
|
||||
ledger.db_write._verify_spent_proofs_and_set_pending(wallet.proofs),
|
||||
delayed_verify_spent_proofs_and_set_pending(),
|
||||
),
|
||||
"proofs are pending",
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_db_set_proofs_pending_no_race_condition_different_proofs(
|
||||
async def test_db_verify_spent_proofs_and_set_pending_no_race_condition_different_proofs(
|
||||
wallet: Wallet, ledger: Ledger
|
||||
):
|
||||
# fill wallet
|
||||
@@ -238,8 +240,8 @@ async def test_db_set_proofs_pending_no_race_condition_different_proofs(
|
||||
assert len(wallet.proofs) == 2
|
||||
|
||||
asyncio.gather(
|
||||
ledger.db_write._set_proofs_pending(wallet.proofs[:1]),
|
||||
ledger.db_write._set_proofs_pending(wallet.proofs[1:]),
|
||||
ledger.db_write._verify_spent_proofs_and_set_pending(wallet.proofs[:1]),
|
||||
ledger.db_write._verify_spent_proofs_and_set_pending(wallet.proofs[1:]),
|
||||
)
|
||||
|
||||
|
||||
@@ -300,6 +302,6 @@ async def test_db_lock_table(wallet: Wallet, ledger: Ledger):
|
||||
async with ledger.db.connect(lock_table="proofs_pending", lock_timeout=0.1) as conn:
|
||||
assert isinstance(conn, Connection)
|
||||
await assert_err(
|
||||
ledger.db_write._set_proofs_pending(wallet.proofs),
|
||||
ledger.db_write._verify_spent_proofs_and_set_pending(wallet.proofs),
|
||||
"failed to acquire database lock",
|
||||
)
|
||||
|
||||
@@ -39,7 +39,7 @@ async def test_mint_proofs_pending(wallet1: Wallet, ledger: Ledger):
|
||||
[s.state == ProofSpentState.unspent for s in proofs_states_before_split.states]
|
||||
)
|
||||
|
||||
await ledger.db_write._set_proofs_pending(proofs)
|
||||
await ledger.db_write._verify_spent_proofs_and_set_pending(proofs)
|
||||
|
||||
proof_states = await wallet1.check_proof_state(proofs)
|
||||
assert all([s.state == ProofSpentState.pending for s in proof_states.states])
|
||||
|
||||
@@ -363,7 +363,7 @@ async def test_double_spend(wallet1: Wallet):
|
||||
await wallet1.split(wallet1.proofs, 20)
|
||||
await assert_err(
|
||||
wallet1.split(doublespend, 20),
|
||||
"Mint Error: Token already spent.",
|
||||
"Token already spent.",
|
||||
)
|
||||
assert wallet1.balance == 64
|
||||
assert wallet1.available_balance == 64
|
||||
|
||||
Reference in New Issue
Block a user