mirror of
https://github.com/aljazceru/nutshell.git
synced 2025-12-20 10:34:20 +01:00
* refactor mint verification * test with lightning=true * rename proofs_used to secrets_used and refactor * test with lightning * spelling fixes
164 lines
6.3 KiB
Python
164 lines
6.3 KiB
Python
from typing import List, Literal, Optional, Set, Union
|
|
|
|
from loguru import logger
|
|
|
|
from ..core.base import (
|
|
BlindedMessage,
|
|
BlindedSignature,
|
|
MintKeyset,
|
|
MintKeysets,
|
|
Proof,
|
|
)
|
|
from ..core.crypto import b_dhke
|
|
from ..core.crypto.secp import PublicKey
|
|
from ..core.errors import (
|
|
NoSecretInProofsError,
|
|
NotAllowedError,
|
|
SecretTooLongError,
|
|
TokenAlreadySpentError,
|
|
TransactionError,
|
|
)
|
|
from ..core.settings import settings
|
|
from .conditions import LedgerSpendingConditions
|
|
from .protocols import SupportsKeysets
|
|
|
|
|
|
class LedgerVerification(LedgerSpendingConditions, SupportsKeysets):
|
|
"""Verification functions for the ledger."""
|
|
|
|
keyset: MintKeyset
|
|
keysets: MintKeysets
|
|
secrets_used: Set[str]
|
|
|
|
async def verify_inputs_and_outputs(
|
|
self, proofs: List[Proof], outputs: Optional[List[BlindedMessage]] = None
|
|
):
|
|
"""Checks all proofs and outputs for validity.
|
|
|
|
Args:
|
|
proofs (List[Proof]): List of proofs to check.
|
|
outputs (Optional[List[BlindedMessage]], optional): List of outputs to check.
|
|
Must be provided for /split but not for /melt. Defaults to None.
|
|
|
|
Raises:
|
|
Exception: Scripts did not validate.
|
|
Exception: Criteria for provided secrets not met.
|
|
Exception: Duplicate proofs provided.
|
|
Exception: BDHKE verification failed.
|
|
"""
|
|
# Verify inputs
|
|
# Verify proofs are spendable
|
|
self._check_proofs_spendable(proofs)
|
|
# Verify amounts of inputs
|
|
if not all([self._verify_amount(p.amount) for p in proofs]):
|
|
raise TransactionError("invalid amount.")
|
|
# Verify secret criteria
|
|
if not all([self._verify_secret_criteria(p) for p in proofs]):
|
|
raise TransactionError("secrets do not match criteria.")
|
|
# verify that only unique proofs were used
|
|
if not self._verify_no_duplicate_proofs(proofs):
|
|
raise TransactionError("duplicate proofs.")
|
|
# Verify ecash signatures
|
|
if not all([self._verify_proof_bdhke(p) for p in proofs]):
|
|
raise TransactionError("could not verify proofs.")
|
|
# Verify input spending conditions
|
|
if not all([self._verify_input_spending_conditions(p) for p in proofs]):
|
|
raise TransactionError("validation of input spending conditions failed.")
|
|
|
|
if not outputs:
|
|
return
|
|
|
|
# Verify outputs
|
|
self._verify_outputs(outputs)
|
|
|
|
# Verify inputs and outputs together
|
|
if not self._verify_input_output_amounts(proofs, outputs):
|
|
raise TransactionError("input amounts less than output.")
|
|
# Verify output spending conditions
|
|
if outputs and not self._verify_output_spending_conditions(proofs, outputs):
|
|
raise TransactionError("validation of output spending conditions failed.")
|
|
|
|
def _verify_outputs(self, outputs: List[BlindedMessage]):
|
|
"""Verify that the outputs are valid."""
|
|
# Verify amounts of outputs
|
|
if not all([self._verify_amount(o.amount) for o in outputs]):
|
|
raise TransactionError("invalid amount.")
|
|
# verify that only unique outputs were used
|
|
if not self._verify_no_duplicate_outputs(outputs):
|
|
raise TransactionError("duplicate outputs.")
|
|
|
|
def _check_proofs_spendable(self, proofs: List[Proof]):
|
|
"""Checks whether the proofs were already spent."""
|
|
if not all([p.secret not in self.secrets_used for p in proofs]):
|
|
raise TokenAlreadySpentError()
|
|
|
|
def _verify_secret_criteria(self, proof: Proof) -> Literal[True]:
|
|
"""Verifies that a secret is present and is not too long (DOS prevention)."""
|
|
if proof.secret is None or proof.secret == "":
|
|
raise NoSecretInProofsError()
|
|
if len(proof.secret) > 512:
|
|
raise SecretTooLongError()
|
|
return True
|
|
|
|
def _verify_proof_bdhke(self, proof: Proof):
|
|
"""Verifies that the proof of promise was issued by this ledger."""
|
|
# if no keyset id is given in proof, assume the current one
|
|
if not proof.id:
|
|
private_key_amount = self.keyset.private_keys[proof.amount]
|
|
else:
|
|
assert proof.id in self.keysets.keysets, f"keyset {proof.id} unknown"
|
|
logger.trace(
|
|
f"Validating proof with keyset {self.keysets.keysets[proof.id].id}."
|
|
)
|
|
# use the appropriate active keyset for this proof.id
|
|
private_key_amount = self.keysets.keysets[proof.id].private_keys[
|
|
proof.amount
|
|
]
|
|
|
|
C = PublicKey(bytes.fromhex(proof.C), raw=True)
|
|
return b_dhke.verify(private_key_amount, C, proof.secret)
|
|
|
|
def _verify_input_output_amounts(
|
|
self, inputs: List[Proof], outputs: List[BlindedMessage]
|
|
) -> bool:
|
|
"""Verifies that inputs have at least the same amount as outputs"""
|
|
input_amount = sum([p.amount for p in inputs])
|
|
output_amount = sum([o.amount for o in outputs])
|
|
return input_amount >= output_amount
|
|
|
|
def _verify_no_duplicate_proofs(self, proofs: List[Proof]) -> bool:
|
|
secrets = [p.secret for p in proofs]
|
|
if len(secrets) != len(list(set(secrets))):
|
|
return False
|
|
return True
|
|
|
|
def _verify_no_duplicate_outputs(self, outputs: List[BlindedMessage]) -> bool:
|
|
B_s = [od.B_ for od in outputs]
|
|
if len(B_s) != len(list(set(B_s))):
|
|
return False
|
|
return True
|
|
|
|
def _verify_amount(self, amount: int) -> int:
|
|
"""Any amount used should be a positive integer not larger than 2^MAX_ORDER."""
|
|
valid = (
|
|
isinstance(amount, int) and amount > 0 and amount < 2**settings.max_order
|
|
)
|
|
logger.trace(f"Verifying amount {amount} is valid: {valid}")
|
|
if not valid:
|
|
raise NotAllowedError("invalid amount: " + str(amount))
|
|
return amount
|
|
|
|
def _verify_equation_balanced(
|
|
self,
|
|
proofs: List[Proof],
|
|
outs: Union[List[BlindedSignature], List[BlindedMessage]],
|
|
) -> None:
|
|
"""Verify that Σinputs - Σoutputs = 0.
|
|
Outputs can be BlindedSignature or BlindedMessage.
|
|
"""
|
|
sum_inputs = sum(self._verify_amount(p.amount) for p in proofs)
|
|
sum_outputs = sum(self._verify_amount(p.amount) for p in outs)
|
|
assert (
|
|
sum_outputs - sum_inputs == 0
|
|
), "inputs do not have same amount as outputs"
|