From d4c7a15e89e577326fc640d630eb9cf366069de0 Mon Sep 17 00:00:00 2001 From: Semisol <45574030+Semisol@users.noreply.github.com> Date: Mon, 15 May 2023 18:09:28 +0300 Subject: [PATCH] add transactions and locking for certain operations (#217) --- cashu/core/db.py | 9 +++++ cashu/mint/ledger.py | 91 +++++++++++++++++++++++++++----------------- 2 files changed, 66 insertions(+), 34 deletions(-) diff --git a/cashu/core/db.py b/cashu/core/db.py index d877b2d..71c3ae3 100644 --- a/cashu/core/db.py +++ b/cashu/core/db.py @@ -48,6 +48,15 @@ class Compat: elif self.type == SQLITE: return "" return "" + + def lock_table(self, table: str) -> str: + if self.type == POSTGRES: + return f"LOCK TABLE {table} IN EXCLUSIVE MODE;" + elif self.type == COCKROACH: + return f"LOCK TABLE {table};" + elif self.type == SQLITE: + return "BEGIN EXCLUSIVE TRANSACTION;" + return "" class Connection(Compat): diff --git a/cashu/mint/ledger.py b/cashu/mint/ledger.py index 76b20f0..774f084 100644 --- a/cashu/mint/ledger.py +++ b/cashu/mint/ledger.py @@ -311,7 +311,7 @@ class Ledger: ) return payment_request, checking_id - async def _check_lightning_invoice(self, amount: int, hash: str) -> Literal[True]: + async def _check_lightning_invoice(self, amount: int, hash: str, conn) -> Literal[True]: """Checks with the Lightning backend whether an invoice stored with `hash` was paid. Args: @@ -330,7 +330,7 @@ class Ledger: """ logger.trace(f"crud: _check_lightning_invoice: checking invoice {hash}") invoice: Union[Invoice, None] = await self.crud.get_lightning_invoice( - hash=hash, db=self.db + hash=hash, db=self.db, conn=conn ) logger.trace(f"crud: _check_lightning_invoice: invoice: {invoice}") if invoice is None: @@ -341,7 +341,7 @@ class Ledger: # set this invoice as issued logger.trace(f"crud: setting invoice {invoice.payment_hash} as issued") - await self.crud.update_lightning_invoice(hash=hash, issued=True, db=self.db) + await self.crud.update_lightning_invoice(hash=hash, issued=True, db=self.db, conn=conn) logger.trace(f"crud: invoice {invoice.payment_hash} set as issued") try: @@ -364,7 +364,7 @@ class Ledger: # unset issued logger.trace(f"crud: unsetting invoice {invoice.payment_hash} as issued") await self.crud.update_lightning_invoice( - hash=hash, issued=False, db=self.db + hash=hash, issued=False, db=self.db, conn=conn ) logger.trace(f"crud: invoice {invoice.payment_hash} unset as issued") raise e @@ -415,7 +415,7 @@ class Ledger: await self.crud.invalidate_proof(proof=p, db=self.db) logger.trace(f"crud: stored proofs") - async def _set_proofs_pending(self, proofs: List[Proof]): + async def _set_proofs_pending(self, proofs: List[Proof], conn): """If none of the proofs is in the pending table (_validate_proofs_pending), adds proofs to the list of pending proofs or removes them. Used as a mutex for proofs. @@ -426,20 +426,20 @@ class Ledger: Exception: At least one proof already in pending table. """ # first we check whether these proofs are pending aready - await self._validate_proofs_pending(proofs) + await self._validate_proofs_pending(proofs, conn) for p in proofs: try: logger.trace( f"crud: _set_proofs_pending setting proof {p.secret} as pending" ) - await self.crud.set_proof_pending(proof=p, db=self.db) + await self.crud.set_proof_pending(proof=p, db=self.db, conn=conn) logger.trace( f"crud: _set_proofs_pending proof {p.secret} set as pending" ) except: raise Exception("proofs already pending.") - async def _unset_proofs_pending(self, proofs: List[Proof]): + async def _unset_proofs_pending(self, proofs: List[Proof], conn): """Deletes proofs from pending table. Args: @@ -452,7 +452,7 @@ class Ledger: logger.trace( f"crud: _unset_proofs_pending unsetting proof {p.secret} as pending" ) - await self.crud.unset_proof_pending(proof=p, db=self.db) + await self.crud.unset_proof_pending(proof=p, db=self.db, conn=conn) logger.trace( f"crud: _unset_proofs_pending proof {p.secret} unset as pending" ) @@ -460,7 +460,7 @@ class Ledger: print(e) pass - async def _validate_proofs_pending(self, proofs: List[Proof]): + async def _validate_proofs_pending(self, proofs: List[Proof], conn): """Checks if any of the provided proofs is in the pending proofs table. Args: @@ -470,7 +470,7 @@ class Ledger: Exception: At least one of the proofs is in the pending table. """ logger.trace(f"crud: _validate_proofs_pending validating proofs") - proofs_pending = await self.crud.get_proofs_pending(db=self.db) + proofs_pending = await self.crud.get_proofs_pending(db=self.db, conn=conn) logger.trace(f"crud: _validate_proofs_pending got proofs pending") for p in proofs: for pp in proofs_pending: @@ -632,16 +632,20 @@ class Ledger: logger.trace("called mint") amounts = [b.amount for b in B_s] amount = sum(amounts) - # check if lightning invoice was paid - if settings.lightning: - if not hash: - raise Exception("no hash provided.") - try: - logger.trace("checking lightning invoice") - paid = await self._check_lightning_invoice(amount, hash) - logger.trace(f"invoice paid: {paid}") - except Exception as e: - raise e + async with self.db.connect() as conn: + logger.trace("trying to lock table invoice") + await conn.execute(self.db.lock_table("invoices")) + logger.trace("locked table invoice") + # check if lightning invoice was paid + if settings.lightning: + if not hash: + raise Exception("no hash provided.") + try: + logger.trace("checking lightning invoice") + paid = await self._check_lightning_invoice(amount, hash, conn) + logger.trace(f"invoice paid: {paid}") + except Exception as e: + raise e for amount in amounts: if amount not in [2**i for i in range(settings.max_order)]: @@ -669,11 +673,17 @@ class Ledger: Returns: List[BlindedMessage]: Signed outputs for returning overpaid fees to wallet. """ + logger.trace("melt called") - # validate and set proofs as pending - logger.trace("setting proofs pending") - await self._set_proofs_pending(proofs) - logger.trace(f"set proofs as pending") + + async with self.db.connect() as conn: + logger.trace("trying to lock table proofs_pending") + await conn.execute(self.db.lock_table("proofs_pending")) + logger.trace("locked table proofs_pending") + # validate and set proofs as pending + logger.trace("setting proofs pending") + await self._set_proofs_pending(proofs, conn) + logger.trace(f"set proofs as pending") try: await self._verify_proofs(proofs) @@ -729,9 +739,13 @@ class Ledger: raise e finally: # delete proofs from pending list - logger.trace("unsetting proofs pending") - await self._unset_proofs_pending(proofs) - logger.trace("unset proofs pending") + async with self.db.connect() as conn: + logger.trace("trying to lock table proofs_pending") + await conn.execute(self.db.lock_table("proofs_pending")) + logger.trace("locked table proofs_pending") + logger.trace("unsetting proofs as pending") + await self._unset_proofs_pending(proofs, conn) + logger.trace(f"unset proofs as pending") return status, preimage, return_promises @@ -803,9 +817,14 @@ class Ledger: """ logger.trace(f"split called") # set proofs as pending - logger.trace(f"setting proofs as pending") - await self._set_proofs_pending(proofs) - logger.trace(f"set proofs as pending") + async with self.db.connect() as conn: + logger.trace("trying to lock table proofs_pending") + await conn.execute(self.db.lock_table("proofs_pending")) + logger.trace("locked table proofs_pending") + # validate and set proofs as pending + logger.trace("setting proofs pending") + await self._set_proofs_pending(proofs, conn) + logger.trace(f"set proofs as pending") total = sum_proofs(proofs) try: @@ -831,9 +850,13 @@ class Ledger: raise e finally: # delete proofs from pending list - logger.trace(f"unsetting proofs as pending") - await self._unset_proofs_pending(proofs) - logger.trace(f"unset proofs as pending") + async with self.db.connect() as conn: + logger.trace("trying to lock table proofs_pending") + await conn.execute(self.db.lock_table("proofs_pending")) + logger.trace("locked table proofs_pending") + logger.trace("unsetting proofs as pending") + await self._unset_proofs_pending(proofs, conn) + logger.trace(f"unset proofs as pending") # Mark proofs as used and prepare new promises logger.trace(f"invalidating proofs")