add transactions and locking for certain operations (#217)

This commit is contained in:
Semisol
2023-05-15 18:09:28 +03:00
committed by GitHub
parent ddd8cebdb0
commit d4c7a15e89
2 changed files with 66 additions and 34 deletions

View File

@@ -49,6 +49,15 @@ class Compat:
return "" return ""
return "<nothing>" return "<nothing>"
def lock_table(self, table: str) -> str:
if self.type == POSTGRES:
return f"LOCK TABLE {table} IN EXCLUSIVE MODE;"
elif self.type == COCKROACH:
return f"LOCK TABLE {table};"
elif self.type == SQLITE:
return "BEGIN EXCLUSIVE TRANSACTION;"
return "<nothing>"
class Connection(Compat): class Connection(Compat):
def __init__(self, conn: AsyncConnection, txn, typ, name, schema): def __init__(self, conn: AsyncConnection, txn, typ, name, schema):

View File

@@ -311,7 +311,7 @@ class Ledger:
) )
return payment_request, checking_id return payment_request, checking_id
async def _check_lightning_invoice(self, amount: int, hash: str) -> Literal[True]: async def _check_lightning_invoice(self, amount: int, hash: str, conn) -> Literal[True]:
"""Checks with the Lightning backend whether an invoice stored with `hash` was paid. """Checks with the Lightning backend whether an invoice stored with `hash` was paid.
Args: Args:
@@ -330,7 +330,7 @@ class Ledger:
""" """
logger.trace(f"crud: _check_lightning_invoice: checking invoice {hash}") logger.trace(f"crud: _check_lightning_invoice: checking invoice {hash}")
invoice: Union[Invoice, None] = await self.crud.get_lightning_invoice( invoice: Union[Invoice, None] = await self.crud.get_lightning_invoice(
hash=hash, db=self.db hash=hash, db=self.db, conn=conn
) )
logger.trace(f"crud: _check_lightning_invoice: invoice: {invoice}") logger.trace(f"crud: _check_lightning_invoice: invoice: {invoice}")
if invoice is None: if invoice is None:
@@ -341,7 +341,7 @@ class Ledger:
# set this invoice as issued # set this invoice as issued
logger.trace(f"crud: setting invoice {invoice.payment_hash} as issued") logger.trace(f"crud: setting invoice {invoice.payment_hash} as issued")
await self.crud.update_lightning_invoice(hash=hash, issued=True, db=self.db) await self.crud.update_lightning_invoice(hash=hash, issued=True, db=self.db, conn=conn)
logger.trace(f"crud: invoice {invoice.payment_hash} set as issued") logger.trace(f"crud: invoice {invoice.payment_hash} set as issued")
try: try:
@@ -364,7 +364,7 @@ class Ledger:
# unset issued # unset issued
logger.trace(f"crud: unsetting invoice {invoice.payment_hash} as issued") logger.trace(f"crud: unsetting invoice {invoice.payment_hash} as issued")
await self.crud.update_lightning_invoice( await self.crud.update_lightning_invoice(
hash=hash, issued=False, db=self.db hash=hash, issued=False, db=self.db, conn=conn
) )
logger.trace(f"crud: invoice {invoice.payment_hash} unset as issued") logger.trace(f"crud: invoice {invoice.payment_hash} unset as issued")
raise e raise e
@@ -415,7 +415,7 @@ class Ledger:
await self.crud.invalidate_proof(proof=p, db=self.db) await self.crud.invalidate_proof(proof=p, db=self.db)
logger.trace(f"crud: stored proofs") logger.trace(f"crud: stored proofs")
async def _set_proofs_pending(self, proofs: List[Proof]): async def _set_proofs_pending(self, proofs: List[Proof], conn):
"""If none of the proofs is in the pending table (_validate_proofs_pending), adds proofs to """If none of the proofs is in the pending table (_validate_proofs_pending), adds proofs to
the list of pending proofs or removes them. Used as a mutex for proofs. the list of pending proofs or removes them. Used as a mutex for proofs.
@@ -426,20 +426,20 @@ class Ledger:
Exception: At least one proof already in pending table. Exception: At least one proof already in pending table.
""" """
# first we check whether these proofs are pending aready # first we check whether these proofs are pending aready
await self._validate_proofs_pending(proofs) await self._validate_proofs_pending(proofs, conn)
for p in proofs: for p in proofs:
try: try:
logger.trace( logger.trace(
f"crud: _set_proofs_pending setting proof {p.secret} as pending" f"crud: _set_proofs_pending setting proof {p.secret} as pending"
) )
await self.crud.set_proof_pending(proof=p, db=self.db) await self.crud.set_proof_pending(proof=p, db=self.db, conn=conn)
logger.trace( logger.trace(
f"crud: _set_proofs_pending proof {p.secret} set as pending" f"crud: _set_proofs_pending proof {p.secret} set as pending"
) )
except: except:
raise Exception("proofs already pending.") raise Exception("proofs already pending.")
async def _unset_proofs_pending(self, proofs: List[Proof]): async def _unset_proofs_pending(self, proofs: List[Proof], conn):
"""Deletes proofs from pending table. """Deletes proofs from pending table.
Args: Args:
@@ -452,7 +452,7 @@ class Ledger:
logger.trace( logger.trace(
f"crud: _unset_proofs_pending unsetting proof {p.secret} as pending" f"crud: _unset_proofs_pending unsetting proof {p.secret} as pending"
) )
await self.crud.unset_proof_pending(proof=p, db=self.db) await self.crud.unset_proof_pending(proof=p, db=self.db, conn=conn)
logger.trace( logger.trace(
f"crud: _unset_proofs_pending proof {p.secret} unset as pending" f"crud: _unset_proofs_pending proof {p.secret} unset as pending"
) )
@@ -460,7 +460,7 @@ class Ledger:
print(e) print(e)
pass pass
async def _validate_proofs_pending(self, proofs: List[Proof]): async def _validate_proofs_pending(self, proofs: List[Proof], conn):
"""Checks if any of the provided proofs is in the pending proofs table. """Checks if any of the provided proofs is in the pending proofs table.
Args: Args:
@@ -470,7 +470,7 @@ class Ledger:
Exception: At least one of the proofs is in the pending table. Exception: At least one of the proofs is in the pending table.
""" """
logger.trace(f"crud: _validate_proofs_pending validating proofs") logger.trace(f"crud: _validate_proofs_pending validating proofs")
proofs_pending = await self.crud.get_proofs_pending(db=self.db) proofs_pending = await self.crud.get_proofs_pending(db=self.db, conn=conn)
logger.trace(f"crud: _validate_proofs_pending got proofs pending") logger.trace(f"crud: _validate_proofs_pending got proofs pending")
for p in proofs: for p in proofs:
for pp in proofs_pending: for pp in proofs_pending:
@@ -632,13 +632,17 @@ class Ledger:
logger.trace("called mint") logger.trace("called mint")
amounts = [b.amount for b in B_s] amounts = [b.amount for b in B_s]
amount = sum(amounts) amount = sum(amounts)
async with self.db.connect() as conn:
logger.trace("trying to lock table invoice")
await conn.execute(self.db.lock_table("invoices"))
logger.trace("locked table invoice")
# check if lightning invoice was paid # check if lightning invoice was paid
if settings.lightning: if settings.lightning:
if not hash: if not hash:
raise Exception("no hash provided.") raise Exception("no hash provided.")
try: try:
logger.trace("checking lightning invoice") logger.trace("checking lightning invoice")
paid = await self._check_lightning_invoice(amount, hash) paid = await self._check_lightning_invoice(amount, hash, conn)
logger.trace(f"invoice paid: {paid}") logger.trace(f"invoice paid: {paid}")
except Exception as e: except Exception as e:
raise e raise e
@@ -669,10 +673,16 @@ class Ledger:
Returns: Returns:
List[BlindedMessage]: Signed outputs for returning overpaid fees to wallet. List[BlindedMessage]: Signed outputs for returning overpaid fees to wallet.
""" """
logger.trace("melt called") logger.trace("melt called")
async with self.db.connect() as conn:
logger.trace("trying to lock table proofs_pending")
await conn.execute(self.db.lock_table("proofs_pending"))
logger.trace("locked table proofs_pending")
# validate and set proofs as pending # validate and set proofs as pending
logger.trace("setting proofs pending") logger.trace("setting proofs pending")
await self._set_proofs_pending(proofs) await self._set_proofs_pending(proofs, conn)
logger.trace(f"set proofs as pending") logger.trace(f"set proofs as pending")
try: try:
@@ -729,9 +739,13 @@ class Ledger:
raise e raise e
finally: finally:
# delete proofs from pending list # delete proofs from pending list
logger.trace("unsetting proofs pending") async with self.db.connect() as conn:
await self._unset_proofs_pending(proofs) logger.trace("trying to lock table proofs_pending")
logger.trace("unset proofs pending") await conn.execute(self.db.lock_table("proofs_pending"))
logger.trace("locked table proofs_pending")
logger.trace("unsetting proofs as pending")
await self._unset_proofs_pending(proofs, conn)
logger.trace(f"unset proofs as pending")
return status, preimage, return_promises return status, preimage, return_promises
@@ -803,8 +817,13 @@ class Ledger:
""" """
logger.trace(f"split called") logger.trace(f"split called")
# set proofs as pending # set proofs as pending
logger.trace(f"setting proofs as pending") async with self.db.connect() as conn:
await self._set_proofs_pending(proofs) logger.trace("trying to lock table proofs_pending")
await conn.execute(self.db.lock_table("proofs_pending"))
logger.trace("locked table proofs_pending")
# validate and set proofs as pending
logger.trace("setting proofs pending")
await self._set_proofs_pending(proofs, conn)
logger.trace(f"set proofs as pending") logger.trace(f"set proofs as pending")
total = sum_proofs(proofs) total = sum_proofs(proofs)
@@ -831,8 +850,12 @@ class Ledger:
raise e raise e
finally: finally:
# delete proofs from pending list # delete proofs from pending list
logger.trace(f"unsetting proofs as pending") async with self.db.connect() as conn:
await self._unset_proofs_pending(proofs) logger.trace("trying to lock table proofs_pending")
await conn.execute(self.db.lock_table("proofs_pending"))
logger.trace("locked table proofs_pending")
logger.trace("unsetting proofs as pending")
await self._unset_proofs_pending(proofs, conn)
logger.trace(f"unset proofs as pending") logger.trace(f"unset proofs as pending")
# Mark proofs as used and prepare new promises # Mark proofs as used and prepare new promises