Use m̶u̶l̶t̶i̶p̶r̶o̶c̶e̶s̶s̶i̶n̶g̶ asyncio locks instead of db locks (#256)

* use mp locks instead of db locks

* replace mp.Lock with asyncio.Lock

* make format

* push

* remove db connection (and lock) and delete asyncio locks
This commit is contained in:
callebtc
2023-06-23 19:10:32 +02:00
committed by GitHub
parent 62a6ec34b0
commit a3e67d21aa
2 changed files with 56 additions and 74 deletions

View File

@@ -1,3 +1,4 @@
import asyncio
import math import math
from typing import Dict, List, Literal, Optional, Set, Union from typing import Dict, List, Literal, Optional, Set, Union
@@ -15,7 +16,7 @@ from ..core.base import (
from ..core.crypto import b_dhke from ..core.crypto import b_dhke
from ..core.crypto.keys import derive_pubkey, random_hash from ..core.crypto.keys import derive_pubkey, random_hash
from ..core.crypto.secp import PublicKey from ..core.crypto.secp import PublicKey
from ..core.db import Connection, Database, lock_table from ..core.db import Connection, Database
from ..core.helpers import fee_reserve, sum_proofs from ..core.helpers import fee_reserve, sum_proofs
from ..core.script import verify_script from ..core.script import verify_script
from ..core.settings import settings from ..core.settings import settings
@@ -25,6 +26,11 @@ from ..mint.crud import LedgerCrud
class Ledger: class Ledger:
locks: Dict[str, asyncio.Lock] = {} # holds multiprocessing locks
proofs_pending_lock: asyncio.Lock = (
asyncio.Lock()
) # holds locks for proofs_pending database
def __init__( def __init__(
self, self,
db: Database, db: Database,
@@ -433,18 +439,19 @@ class Ledger:
Exception: At least one proof already in pending table. Exception: At least one proof already in pending table.
""" """
# first we check whether these proofs are pending aready # first we check whether these proofs are pending aready
await self._validate_proofs_pending(proofs, conn) async with self.proofs_pending_lock:
for p in proofs: await self._validate_proofs_pending(proofs, conn)
try: for p in proofs:
logger.trace( try:
f"crud: _set_proofs_pending setting proof {p.secret} as pending" logger.trace(
) f"crud: _set_proofs_pending setting proof {p.secret} as pending"
await self.crud.set_proof_pending(proof=p, db=self.db, conn=conn) )
logger.trace( await self.crud.set_proof_pending(proof=p, db=self.db, conn=conn)
f"crud: _set_proofs_pending proof {p.secret} set as pending" logger.trace(
) f"crud: _set_proofs_pending proof {p.secret} set as pending"
except: )
raise Exception("proofs already pending.") except:
raise Exception("proofs already pending.")
async def _unset_proofs_pending( async def _unset_proofs_pending(
self, proofs: List[Proof], conn: Optional[Connection] = None self, proofs: List[Proof], conn: Optional[Connection] = None
@@ -456,20 +463,23 @@ class Ledger:
""" """
# we try: except: this block in order to avoid that any errors here # we try: except: this block in order to avoid that any errors here
# could block the _invalidate_proofs() call that happens afterwards. # could block the _invalidate_proofs() call that happens afterwards.
try: async with self.proofs_pending_lock:
for p in proofs: try:
logger.trace( for p in proofs:
f"crud: _unset_proofs_pending unsetting proof {p.secret} as pending" logger.trace(
) f"crud: _unset_proofs_pending unsetting proof {p.secret} as pending"
await self.crud.unset_proof_pending(proof=p, db=self.db, conn=conn) )
logger.trace( await self.crud.unset_proof_pending(proof=p, db=self.db, conn=conn)
f"crud: _unset_proofs_pending proof {p.secret} unset as pending" logger.trace(
) f"crud: _unset_proofs_pending proof {p.secret} unset as pending"
except Exception as e: )
print(e) except Exception as e:
pass print(e)
pass
async def _validate_proofs_pending(self, proofs: List[Proof], conn): async def _validate_proofs_pending(
self, proofs: List[Proof], conn: Optional[Connection] = None
):
"""Checks if any of the provided proofs is in the pending proofs table. """Checks if any of the provided proofs is in the pending proofs table.
Args: Args:
@@ -633,8 +643,9 @@ class Ledger:
keyset (Optional[MintKeyset], optional): Keyset to use. If not provided, uses active keyset. Defaults to None. keyset (Optional[MintKeyset], optional): Keyset to use. If not provided, uses active keyset. Defaults to None.
Raises: Raises:
Exception: Lightning invvoice is not paid.
Exception: Lightning is turned on but no payment hash is provided. Exception: Lightning is turned on but no payment hash is provided.
e: Something went wrong with the invoice check. Exception: Something went wrong with the invoice check.
Exception: Amount too large. Exception: Amount too large.
Returns: Returns:
@@ -643,20 +654,17 @@ class Ledger:
logger.trace("called mint") logger.trace("called mint")
amounts = [b.amount for b in B_s] amounts = [b.amount for b in B_s]
amount = sum(amounts) amount = sum(amounts)
async with self.db.connect() as conn:
logger.trace("attempting lock table invoice") if settings.lightning:
await conn.execute(lock_table(self.db, "invoices")) if not hash:
logger.trace("locked table invoice") raise Exception("no hash provided.")
# check if lightning invoice was paid self.locks[hash] = (
if settings.lightning: self.locks.get(hash) or asyncio.Lock()
if not hash: ) # create a new lock if it doesn't exist
raise Exception("no hash provided.") async with self.locks[hash]:
try: # will raise an exception if the invoice is not paid or tokens are already issued
logger.trace("checking lightning invoice") await self._check_lightning_invoice(amount, hash)
paid = await self._check_lightning_invoice(amount, hash, conn) del self.locks[hash]
logger.trace(f"invoice paid: {paid}")
except Exception as e:
raise e
for amount in amounts: for amount in amounts:
if amount not in [2**i for i in range(settings.max_order)]: if amount not in [2**i for i in range(settings.max_order)]:
@@ -687,15 +695,7 @@ class Ledger:
logger.trace("melt called") logger.trace("melt called")
async with self.db.connect() as conn: await self._set_proofs_pending(proofs)
logger.trace("attempting lock table proofs_pending")
await conn.execute(lock_table(self.db, "proofs_pending"))
logger.trace("locked table proofs_pending")
# validate and set proofs as pending
logger.trace("setting proofs pending")
await self._set_proofs_pending(proofs, conn)
logger.trace(f"set proofs as pending")
logger.trace("unlocked table proofs_pending")
try: try:
await self._verify_proofs(proofs) await self._verify_proofs(proofs)
@@ -751,14 +751,7 @@ class Ledger:
raise e raise e
finally: finally:
# delete proofs from pending list # delete proofs from pending list
async with self.db.connect() as conn: await self._unset_proofs_pending(proofs)
logger.trace("attempting lock table proofs_pending")
await conn.execute(lock_table(self.db, "proofs_pending"))
logger.trace("locked table proofs_pending")
logger.trace("unsetting proofs as pending")
await self._unset_proofs_pending(proofs, conn)
logger.trace(f"unset proofs as pending")
logger.trace("unlocked table proofs_pending")
return status, preimage, return_promises return status, preimage, return_promises
@@ -829,15 +822,9 @@ class Ledger:
Tuple[List[BlindSignature],List[BlindSignature]]: Promises on both sides of the split. Tuple[List[BlindSignature],List[BlindSignature]]: Promises on both sides of the split.
""" """
logger.trace(f"split called") logger.trace(f"split called")
# set proofs as pending
async with self.db.connect() as conn: await self._set_proofs_pending(proofs)
logger.trace("attempting lock table proofs_pending")
await conn.execute(lock_table(self.db, "proofs_pending"))
logger.trace("locked table proofs_pending")
# validate and set proofs as pending
logger.trace("setting proofs pending")
await self._set_proofs_pending(proofs, conn)
logger.trace(f"set proofs as pending")
total = sum_proofs(proofs) total = sum_proofs(proofs)
try: try:
@@ -863,13 +850,7 @@ class Ledger:
raise e raise e
finally: finally:
# delete proofs from pending list # delete proofs from pending list
async with self.db.connect() as conn: await self._unset_proofs_pending(proofs)
logger.trace("attempting lock table proofs_pending")
await conn.execute(lock_table(self.db, "proofs_pending"))
logger.trace("locked table proofs_pending")
logger.trace("unsetting proofs as pending")
await self._unset_proofs_pending(proofs, conn)
logger.trace(f"unset proofs as pending")
# Mark proofs as used and prepare new promises # Mark proofs as used and prepare new promises
logger.trace(f"invalidating proofs") logger.trace(f"invalidating proofs")

View File

@@ -453,6 +453,7 @@ async def pending(ctx: Context, legacy, number: int, offset: int):
tokenObj = deserialize_token_from_string(token) tokenObj = deserialize_token_from_string(token)
mint = [t.mint for t in tokenObj.token][0] mint = [t.mint for t in tokenObj.token][0]
# token_hidden_secret = await wallet.serialize_proofs(grouped_proofs) # token_hidden_secret = await wallet.serialize_proofs(grouped_proofs)
assert grouped_proofs[0].time_reserved
reserved_date = datetime.utcfromtimestamp( reserved_date = datetime.utcfromtimestamp(
int(grouped_proofs[0].time_reserved) int(grouped_proofs[0].time_reserved)
).strftime("%Y-%m-%d %H:%M:%S") ).strftime("%Y-%m-%d %H:%M:%S")