diff --git a/cashu/core/base.py b/cashu/core/base.py index 02c789b..2814bb2 100644 --- a/cashu/core/base.py +++ b/cashu/core/base.py @@ -193,17 +193,26 @@ class Proof(BaseModel): @property def p2pksigs(self) -> List[str]: assert self.witness, "Witness is missing for p2pk signature" - return P2PKWitness.from_witness(self.witness).signatures + try: + return P2PKWitness.from_witness(self.witness).signatures + except Exception: + return [] @property def htlcpreimage(self) -> str | None: assert self.witness, "Witness is missing for htlc preimage" - return HTLCWitness.from_witness(self.witness).preimage + try: + return HTLCWitness.from_witness(self.witness).preimage + except Exception: + return None @property def htlcsigs(self) -> List[str] | None: assert self.witness, "Witness is missing for htlc signatures" - return HTLCWitness.from_witness(self.witness).signatures + try: + return HTLCWitness.from_witness(self.witness).signatures + except Exception: + return None class Proofs(BaseModel): @@ -219,12 +228,6 @@ class BlindedMessage(BaseModel): amount: int id: str # Keyset id B_: str # Hex-encoded blinded message - witness: Union[str, None] = None # witnesses (used for P2PK with SIG_ALL) - - @property - def p2pksigs(self) -> List[str]: - assert self.witness, "Witness missing in output" - return P2PKWitness.from_witness(self.witness).signatures class BlindedMessage_Deprecated(BaseModel): diff --git a/cashu/core/htlc.py b/cashu/core/htlc.py index e2bcea2..bb4968b 100644 --- a/cashu/core/htlc.py +++ b/cashu/core/htlc.py @@ -1,33 +1,12 @@ -from enum import Enum -from typing import Union - +from .p2pk import P2PKSecret from .secret import Secret, SecretKind -class SigFlags(Enum): - # require signatures only on the inputs (default signature flag) - SIG_INPUTS = "SIG_INPUTS" - - -class HTLCSecret(Secret): +# HTLCSecret inherits properties from P2PKSecret +class HTLCSecret(P2PKSecret, Secret): @classmethod def from_secret(cls, secret: Secret): assert SecretKind(secret.kind) == SecretKind.HTLC, "Secret is not a HTLC secret" # NOTE: exclude tags in .dict() because it doesn't deserialize it properly # need to add it back in manually with tags=secret.tags return cls(**secret.dict(exclude={"tags"}), tags=secret.tags) - - @property - def locktime(self) -> Union[None, int]: - locktime = self.tags.get_tag("locktime") - return int(locktime) if locktime else None - - @property - def sigflag(self) -> Union[None, SigFlags]: - sigflag = self.tags.get_tag("sigflag") - return SigFlags(sigflag) if sigflag else None - - @property - def n_sigs(self) -> Union[None, int]: - n_sigs = self.tags.get_tag("n_sigs") - return int(n_sigs) if n_sigs else None diff --git a/cashu/core/p2pk.py b/cashu/core/p2pk.py index eb07a03..79afab1 100644 --- a/cashu/core/p2pk.py +++ b/cashu/core/p2pk.py @@ -27,14 +27,19 @@ class P2PKSecret(Secret): return int(locktime) if locktime else None @property - def sigflag(self) -> Union[None, SigFlags]: + def sigflag(self) -> SigFlags: sigflag = self.tags.get_tag("sigflag") - return SigFlags(sigflag) if sigflag else None + return SigFlags(sigflag) if sigflag else SigFlags.SIG_INPUTS @property - def n_sigs(self) -> Union[None, int]: - n_sigs = self.tags.get_tag("n_sigs") - return int(n_sigs) if n_sigs else None + def n_sigs(self) -> int: + n_sigs = self.tags.get_tag_int("n_sigs") + return int(n_sigs) if n_sigs else 1 + + @property + def n_sigs_refund(self) -> Union[None, int]: + n_sigs_refund = self.tags.get_tag_int("n_sigs_refund") + return n_sigs_refund def schnorr_sign(message: bytes, private_key: PrivateKey) -> bytes: diff --git a/cashu/core/secret.py b/cashu/core/secret.py index 663f2be..5cf9f4b 100644 --- a/cashu/core/secret.py +++ b/cashu/core/secret.py @@ -39,6 +39,15 @@ class Tags(BaseModel): return tag[1] return None + def get_tag_int(self, tag_name: str) -> Union[int, None]: + tag = self.get_tag(tag_name) + if tag is not None: + try: + return int(tag) + except ValueError: + logger.warning(f"Tag {tag_name} is not an integer") + return None + def get_tag_all(self, tag_name: str) -> List[str]: all_tags = [] for tag in self.__root__: @@ -77,3 +86,19 @@ class Secret(BaseModel): tags = Tags(tags=tags_list) logger.debug(f"Deserialized Secret: {kind}, {data}, {nonce}, {tags}") return cls(kind=kind, data=data, nonce=nonce, tags=tags) + + def __eq__(self, value: object) -> bool: + # two secrets are equal if they have the same kind, data and tags (ignoring nonce) + if not isinstance(value, Secret): + return False + return ( + self.kind == value.kind + and self.data == value.data + and self.tags.__root__ == value.tags.__root__ + ) + + def __hash__(self) -> int: + # everything except nonce + return hash( + (self.kind, self.data, tuple(s for xs in self.tags.__root__ for s in xs)) + ) diff --git a/cashu/core/settings.py b/cashu/core/settings.py index 5f999a5..ece06e1 100644 --- a/cashu/core/settings.py +++ b/cashu/core/settings.py @@ -59,7 +59,7 @@ class MintSettings(CashuSettings): mint_database: str = Field(default="data/mint") mint_test_database: str = Field(default="test_data/test_mint") - mint_max_secret_length: int = Field(default=512) + mint_max_secret_length: int = Field(default=1024) mint_input_fee_ppk: int = Field(default=0) mint_disable_melt_on_error: bool = Field(default=False) diff --git a/cashu/mint/conditions.py b/cashu/mint/conditions.py index ea7c53a..705cc15 100644 --- a/cashu/mint/conditions.py +++ b/cashu/mint/conditions.py @@ -1,10 +1,10 @@ import hashlib import time -from typing import List +from typing import List, Optional, Union from loguru import logger -from ..core.base import BlindedMessage, Proof +from ..core.base import BlindedMessage, P2PKWitness, Proof from ..core.crypto.secp import PublicKey from ..core.errors import ( TransactionError, @@ -19,7 +19,12 @@ from ..core.secret import Secret, SecretKind class LedgerSpendingConditions: - def _verify_p2pk_spending_conditions(self, proof: Proof, secret: Secret) -> bool: + def _verify_p2pk_sig_inputs( + self, + proof: Proof, + secret: P2PKSecret | HTLCSecret, + message_to_sign: Optional[str] = None, + ) -> bool: """ Verify P2PK spending condition for a single input. @@ -36,42 +41,46 @@ class LedgerSpendingConditions: - if no valid signatures are present - if the signature threshold is not met """ - if SecretKind(secret.kind) != SecretKind.P2PK: - # not a P2PK secret - return True - p2pk_secret = P2PKSecret.from_secret(secret) + p2pk_secret = secret + message_to_sign = message_to_sign or proof.secret + + # if a sigflag other than SIG_INPUTS is present, we return True + if ( + secret.tags.get_tag("sigflag") + and secret.tags.get_tag("sigflag") != SigFlags.SIG_INPUTS.value + ): + return True # extract pubkeys that we require signatures from depending on whether the # locktime has passed (refund) or not (pubkeys in secret.data and in tags) - - # the pubkey in the data field is the pubkey to use for P2PK - pubkeys: List[str] = [p2pk_secret.data] - + # for P2PK, we use the data field as a pubkey + pubkeys: List[str] = [] + if SecretKind(p2pk_secret.kind) == SecretKind.P2PK: + pubkeys = [p2pk_secret.data] # get all additional pubkeys from tags for multisig pubkeys += p2pk_secret.tags.get_tag_all("pubkeys") - + n_sigs = p2pk_secret.n_sigs or 1 # check if locktime is passed and if so, only consider refund pubkeys now = time.time() if p2pk_secret.locktime and p2pk_secret.locktime < now: logger.trace(f"p2pk locktime ran out ({p2pk_secret.locktime}<{now}).") - # If a refund pubkey is present, we demand the signature to be from it - refund_pubkeys = p2pk_secret.tags.get_tag_all("refund") - if not refund_pubkeys: - # no refund pubkey is present, anyone can spend - return True - return self._verify_secret_signatures( - proof, - refund_pubkeys, - proof.p2pksigs, - 1, # only 1 sig required for refund - ) - - return self._verify_secret_signatures( - proof, pubkeys, proof.p2pksigs, p2pk_secret.n_sigs + pubkeys = p2pk_secret.tags.get_tag_all("refund") + n_sigs = p2pk_secret.n_sigs_refund or 1 + if not pubkeys: + # no pubkeys are present, anyone can spend + return True + # require signatures from pubkeys + return self._verify_p2pk_signatures( + message_to_sign, pubkeys, proof.p2pksigs, n_sigs ) - def _verify_htlc_spending_conditions(self, proof: Proof, secret: Secret) -> bool: + def _verify_htlc_spending_conditions( + self, + proof: Proof, + secret: HTLCSecret, + message_to_sign: Optional[str] = None, + ) -> bool: """ Verify HTLC spending condition for a single input. @@ -103,58 +112,34 @@ class LedgerSpendingConditions: - if 'pubkeys' are present but no valid signature is provided """ - if SecretKind(secret.kind) != SecretKind.HTLC: - # not a P2PK secret - return True - htlc_secret = HTLCSecret.from_secret(secret) - - # time lock - # check if locktime is in the past - if htlc_secret.locktime and htlc_secret.locktime < time.time(): - refund_pubkeys = htlc_secret.tags.get_tag_all("refund") - if refund_pubkeys: - return self._verify_secret_signatures( - proof, - refund_pubkeys, - proof.p2pksigs, - 1, # only one refund signature required - ) - # no pubkeys given in secret, anyone can spend - return True - + htlc_secret = secret # hash lock - assert proof.htlcpreimage, TransactionError("no HTLC preimage provided") + if not proof.htlcpreimage: + raise TransactionError("no HTLC preimage provided") + # verify correct preimage (the hashlock) if the locktime hasn't passed + now = time.time() + if not htlc_secret.locktime or htlc_secret.locktime > now: + if not hashlib.sha256( + bytes.fromhex(proof.htlcpreimage) + ).digest() == bytes.fromhex(htlc_secret.data): + raise TransactionError("HTLC preimage does not match.") + return True - # first we check whether a correct preimage was included - if not hashlib.sha256( - bytes.fromhex(proof.htlcpreimage) - ).digest() == bytes.fromhex(htlc_secret.data): - raise TransactionError("HTLC preimage does not match.") - - # then we check whether signatures are required - hashlock_pubkeys = htlc_secret.tags.get_tag_all("pubkeys") - if not hashlock_pubkeys: - # no pubkeys given in secret, anyone can spend - return True - - return self._verify_secret_signatures( - proof, hashlock_pubkeys, proof.htlcsigs or [], htlc_secret.n_sigs - ) - - def _verify_secret_signatures( + def _verify_p2pk_signatures( self, - proof: Proof, + message_to_sign: str, pubkeys: List[str], signatures: List[str], - n_sigs_required: int | None = 1, + n_sigs_required: int, ) -> bool: - assert len(set(pubkeys)) == len(pubkeys), "pubkeys must be unique." + if len(set(pubkeys)) != len(pubkeys): + raise TransactionError("pubkeys must be unique.") logger.trace(f"pubkeys: {pubkeys}") + unique_pubkeys = set(pubkeys) # verify that signatures are present if not signatures: # no signature present although secret indicates one - logger.error(f"no signatures in proof: {proof}") raise TransactionError("no signatures in proof.") # we make sure that there are no duplicate signatures @@ -164,38 +149,42 @@ class LedgerSpendingConditions: # INPUTS: check signatures against pubkey # we expect the signature to be on the pubkey (=message) itself n_sigs_required = n_sigs_required or 1 - assert n_sigs_required > 0, "n_sigs must be positive." + if not n_sigs_required > 0: + raise TransactionError("n_sigs must be positive.") - # check if enough signatures are present - assert ( - len(signatures) >= n_sigs_required - ), f"not enough signatures provided: {len(signatures)} < {n_sigs_required}." + # check if enough pubkeys or signatures are present + if len(pubkeys) < n_sigs_required or len(signatures) < n_sigs_required: + raise TransactionError( + f"not enough pubkeys ({len(pubkeys)}) or signatures ({len(signatures)}) present for n_sigs ({n_sigs_required})." + ) - n_valid_sigs_per_output = 0 - # loop over all signatures in input - for input_sig in signatures: - for pubkey in pubkeys: + n_pubkeys_with_valid_sigs = 0 + # loop over all unique pubkeys in input + for pubkey in unique_pubkeys: + for i, input_sig in enumerate(signatures): logger.trace(f"verifying signature {input_sig} by pubkey {pubkey}.") - logger.trace(f"Message: {proof.secret}") + logger.trace(f"Message: {message_to_sign}") if verify_schnorr_signature( - message=proof.secret.encode("utf-8"), + message=message_to_sign.encode("utf-8"), pubkey=PublicKey(bytes.fromhex(pubkey), raw=True), signature=bytes.fromhex(input_sig), ): - n_valid_sigs_per_output += 1 + n_pubkeys_with_valid_sigs += 1 logger.trace( f"signature on input is valid: {input_sig} on {pubkey}." ) + signatures.pop(i) + break # check if we have enough valid signatures - assert n_valid_sigs_per_output, "no valid signature provided for input." - assert n_valid_sigs_per_output >= n_sigs_required, ( - f"signature threshold not met. {n_valid_sigs_per_output} <" - f" {n_sigs_required}." - ) + if not n_pubkeys_with_valid_sigs >= n_sigs_required: + raise TransactionError( + f"signature threshold not met. {n_pubkeys_with_valid_sigs} <" + f" {n_sigs_required}." + ) logger.trace( - f"{n_valid_sigs_per_output} of {n_sigs_required} valid signatures found." + f"{n_pubkeys_with_valid_sigs} of {n_sigs_required} valid signatures found." ) logger.trace("p2pk signature on inputs is valid.") @@ -218,30 +207,75 @@ class LedgerSpendingConditions: # P2PK if SecretKind(secret.kind) == SecretKind.P2PK: - return self._verify_p2pk_spending_conditions(proof, secret) + p2pk_secret = P2PKSecret.from_secret(secret) + return self._verify_p2pk_sig_inputs(proof, p2pk_secret) # HTLC if SecretKind(secret.kind) == SecretKind.HTLC: - return self._verify_htlc_spending_conditions(proof, secret) + htlc_secret = HTLCSecret.from_secret(secret) + self._verify_htlc_spending_conditions(proof, htlc_secret) + return self._verify_p2pk_sig_inputs(proof, htlc_secret) # no spending condition present return True # ------ output spending conditions ------ - def _verify_output_p2pk_spending_conditions( - self, proofs: List[Proof], outputs: List[BlindedMessage] + def _inputs_require_sigall(self, proofs: List[Proof]) -> bool: + """ + Check if any input requires sigall spending condition. + """ + for proof in proofs: + try: + secret = Secret.deserialize(proof.secret) + try: + p2pk_secret = P2PKSecret.from_secret(secret) + if p2pk_secret.sigflag == SigFlags.SIG_ALL: + return True + except Exception: + pass + try: + htlc_secret = HTLCSecret.from_secret(secret) + if htlc_secret.sigflag == SigFlags.SIG_ALL: + return True + except Exception: + pass + except Exception: + # secret is not a spending condition so we treat is a normal secret + pass + return False + + def _verify_all_secrets_equal_and_return(self, proofs: List[Proof]) -> Secret: + """ + Verify that all secrets are equal (kind, data, tags) and return them + """ + secrets = set() + for proof in proofs: + secrets.add(Secret.deserialize(proof.secret)) + + if len(secrets) != 1: + raise TransactionError("not all secrets are equal.") + + return secrets.pop() + + def _verify_sigall_spending_conditions( + self, + proofs: List[Proof], + outputs: List[BlindedMessage], + message_to_sign: Optional[str] = None, ) -> bool: """ - If sigflag==SIG_ALL in proof.secret, check if outputs - contain valid signatures for pubkeys in proof.secret. + If sigflag==SIG_ALL in any proof.secret, perform a signature check on all + inputs (proofs) and outputs (outputs) together. - We return True - - if not all proof.secret are Secret spending condition - - if not all secrets are P2PKSecret spending condition - - if not all signature.sigflag are SIG_ALL + # We return True + # - if not all proof.secret are Secret spending condition + # - if not all secrets are P2PKSecret spending condition + # - if not all signature.sigflag are SIG_ALL We raise an exception: + - if one input is SIG_ALL but not all inputs are SIG_ALL + - if not all secret kinds are the same - if not all pubkeys in all secrets are the same - if not all n_sigs in all secrets are the same - if not all signatures in all outputs are unique @@ -252,103 +286,108 @@ class LedgerSpendingConditions: We return True if we successfully validated the spending condition. """ + # verify that all secrets are of the same kind try: - secrets_generic = [Secret.deserialize(p.secret) for p in proofs] - p2pk_secrets = [ - P2PKSecret.from_secret(secret) for secret in secrets_generic - ] + secret = self._verify_all_secrets_equal_and_return(proofs) except Exception: - # secret is not a spending condition so we treat is a normal secret - return True + # not all secrets are equal, we fail + return False - # check if all secrets are P2PK - # NOTE: This is redundant, because P2PKSecret.from_secret() already checks for the kind - # Leaving it in for explicitness - if not all( - [SecretKind(secret.kind) == SecretKind.P2PK for secret in p2pk_secrets] - ): - # not all secrets are P2PK - return True + # now we can enforce that all inputs are SIG_ALL + secret_lock: Union[P2PKSecret, HTLCSecret] + if SecretKind(secret.kind) == SecretKind.P2PK: + secret_lock = P2PKSecret.from_secret(secret) + pubkeys = [secret_lock.data] + secret_lock.tags.get_tag_all("pubkeys") + n_sigs_required = secret_lock.n_sigs or 1 + elif SecretKind(secret.kind) == SecretKind.HTLC: + secret_lock = HTLCSecret.from_secret(secret) + pubkeys = secret_lock.tags.get_tag_all("pubkeys") + n_sigs_required = secret_lock.n_sigs or 1 + else: + # not a P2PK or HTLC secret + return False - # check if all secrets are sigflag==SIG_ALL - if not all([secret.sigflag == SigFlags.SIG_ALL for secret in p2pk_secrets]): - # not all secrets have sigflag==SIG_ALL - return True - - # extract all pubkeys and n_sigs from secrets - pubkeys_per_proof = [ - [p2pk_secret.data] + p2pk_secret.tags.get_tag_all("pubkeys") - for p2pk_secret in p2pk_secrets - ] - n_sigs_per_proof = [p2pk_secret.n_sigs for p2pk_secret in p2pk_secrets] - - # if locktime passed, we only require the refund pubkeys and 1 signature - for p2pk_secret in p2pk_secrets: - now = time.time() - if p2pk_secret.locktime and p2pk_secret.locktime < now: - refund_pubkeys = p2pk_secret.tags.get_tag_all("refund") - if refund_pubkeys: - pubkeys_per_proof.append(refund_pubkeys) - n_sigs_per_proof.append(1) # only 1 sig required for refund + now = time.time() + if secret_lock.locktime and secret_lock.locktime < now: + # locktime has passed, we only require the refund pubkeys and n_sigs_refund + pubkeys = secret_lock.tags.get_tag_all("refund") + n_sigs_required = secret_lock.n_sigs_refund or 1 # if no pubkeys are present, anyone can spend - if not pubkeys_per_proof: + if not pubkeys: return True - # all pubkeys and n_sigs must be the same - assert ( - len({tuple(pubs_output) for pubs_output in pubkeys_per_proof}) == 1 - ), "pubkeys in all proofs must match." - assert len(set(n_sigs_per_proof)) == 1, "n_sigs in all proofs must match." + message_to_sign = message_to_sign or "".join( + [p.secret for p in proofs] + [o.B_ for o in outputs] + ) - # validation successful + # validation + if len(set(pubkeys)) != len(pubkeys): + raise TransactionError("pubkeys must be unique.") + logger.trace(f"pubkeys: {pubkeys}") + unique_pubkeys = set(pubkeys) - pubkeys: List[str] = pubkeys_per_proof[0] - # if n_sigs is None, we set it to 1 - n_sigs: int = n_sigs_per_proof[0] or 1 + if not n_sigs_required > 0: + raise TransactionError("n_sigs must be positive.") + + first_proof = proofs[0] + if not first_proof.witness: + raise TransactionError("no witness in proof.") + signatures = P2PKWitness.from_witness(first_proof.witness).signatures + + # verify that signatures are present + if not signatures: + # no signature present although secret indicates one + raise TransactionError("no signatures in proof.") + + # we make sure that there are no duplicate signatures + if len(set(signatures)) != len(signatures): + raise TransactionError("signatures must be unique.") + + # check if enough pubkeys or signatures are present + if len(pubkeys) < n_sigs_required or len(signatures) < n_sigs_required: + raise TransactionError( + f"not enough pubkeys ({len(pubkeys)}) or signatures ({len(signatures)}) present for n_sigs ({n_sigs_required})." + ) logger.trace(f"pubkeys: {pubkeys}") - # loop over all outputs and check if the signatures are valid for pubkeys with a threshold of n_sig - for output in outputs: - # we expect the signature to be on the pubkey (=message) itself - p2pksigs = output.p2pksigs - assert p2pksigs, "no signatures in output." - # TODO: add limit for maximum number of signatures - - # we check whether any signature is duplicate - assert len(set(p2pksigs)) == len( - p2pksigs - ), "duplicate signatures in output." - - n_valid_sigs_per_output = 0 - # loop over all signatures in output - for sig in p2pksigs: - for pubkey in pubkeys: - if verify_schnorr_signature( - message=bytes.fromhex(output.B_), - pubkey=PublicKey(bytes.fromhex(pubkey), raw=True), - signature=bytes.fromhex(sig), - ): - n_valid_sigs_per_output += 1 - assert n_valid_sigs_per_output, "no valid signature provided for output." - assert ( - n_valid_sigs_per_output >= n_sigs - ), f"signature threshold not met. {n_valid_sigs_per_output} < {n_sigs}." - - logger.trace( - f"{n_valid_sigs_per_output} of {n_sigs} valid signatures found." + n_valid_sigs = 0 + for p in unique_pubkeys: + for i, s in enumerate(signatures): + if verify_schnorr_signature( + message=message_to_sign.encode("utf-8"), + pubkey=PublicKey(bytes.fromhex(p), raw=True), + signature=bytes.fromhex(s), + ): + n_valid_sigs += 1 + signatures.pop(i) + break + if n_valid_sigs < n_sigs_required: + raise TransactionError( + f"signature threshold not met. {n_valid_sigs} < {n_sigs_required}." ) - logger.trace(p2pksigs) - logger.trace("p2pk signatures on output is valid.") return True - def _verify_output_spending_conditions( - self, proofs: List[Proof], outputs: List[BlindedMessage] + def _verify_input_output_spending_conditions( + self, + proofs: List[Proof], + outputs: List[BlindedMessage], + message_to_sign: Optional[str] = None, ) -> bool: """ Verify spending conditions: - Condition: P2PK - If sigflag==SIG_ALL in proof.secret, check if outputs contain valid signatures for pubkeys in proof.secret. - """ + Condition: If sigflag==SIG_ALL in any proof.secret of the kind P2PK or HTLC + we require signatures on all inputs and outputs together. - return self._verify_output_p2pk_spending_conditions(proofs, outputs) + Implicitly enforces many other conditions such as all input Secrets + being the same except for the nonce (see verify_same_kinds_and_return()). + """ + if not self._inputs_require_sigall(proofs): + # no input requires sigall spending condition + return True + + # verify that all secrets are of the same kind, raise an error if not + _ = self._verify_all_secrets_equal_and_return(proofs) + + return self._verify_sigall_spending_conditions(proofs, outputs, message_to_sign) diff --git a/cashu/mint/ledger.py b/cashu/mint/ledger.py index 72b9231..b3f21c6 100644 --- a/cashu/mint/ledger.py +++ b/cashu/mint/ledger.py @@ -987,6 +987,12 @@ class Ledger(LedgerVerification, LedgerSpendingConditions, LedgerTasks, LedgerFe # we don't need to set it here, _set_melt_quote_pending will set it in the db melt_quote.outputs = outputs + # verify SIG_ALL signatures + message_to_sign = ( + "".join([p.secret for p in proofs] + [o.B_ for o in outputs or []]) + quote + ) + self._verify_sigall_spending_conditions(proofs, outputs or [], message_to_sign) + # verify that the amount of the input proofs is equal to the amount of the quote total_provided = sum_proofs(proofs) input_fees = self.get_fees_for_proofs(proofs) diff --git a/cashu/mint/verification.py b/cashu/mint/verification.py index dd73e5d..3fdc9df 100644 --- a/cashu/mint/verification.py +++ b/cashu/mint/verification.py @@ -75,7 +75,7 @@ class LedgerVerification( # Verify ecash signatures if not all([self._verify_proof_bdhke(p) for p in proofs]): raise InvalidProofsError() - # Verify input spending conditions + # Verify SIG_INPUTS spending conditions if not all([self._verify_input_spending_conditions(p) for p in proofs]): raise TransactionError("validation of input spending conditions failed.") @@ -103,9 +103,8 @@ class LedgerVerification( ): raise TransactionError("input and output keysets have different units.") - # Verify output spending conditions - if outputs and not self._verify_output_spending_conditions(proofs, outputs): - raise TransactionError("validation of output spending conditions failed.") + # Verify SIG_ALL spending conditions + self._verify_input_output_spending_conditions(proofs, outputs) async def _verify_outputs( self, diff --git a/cashu/wallet/cli/cli.py b/cashu/wallet/cli/cli.py index 90ea949..4b81fbe 100644 --- a/cashu/wallet/cli/cli.py +++ b/cashu/wallet/cli/cli.py @@ -283,16 +283,16 @@ async def pay( abort=True, default=True, ) + + if wallet.available_balance < total_amount + ecash_fees: + print(" Error: Balance too low.") + return + assert total_amount > 0, "amount is not positive" # we need to include fees so we can use the proofs for melting the `total_amount` send_proofs, _ = await wallet.select_to_send( wallet.proofs, total_amount, include_fees=True, set_reserved=True ) print("Paying Lightning invoice ...", end="", flush=True) - assert total_amount > 0, "amount is not positive" - if wallet.available_balance < total_amount: - print(" Error: Balance too low.") - return - try: melt_response = await wallet.melt( send_proofs, invoice, quote.fee_reserve, quote.quote @@ -764,12 +764,17 @@ async def receive_cli( return await print_balance(ctx) + @cli.command("decode", help="Decode a cashu token and print in JSON format.") @click.option( "--no-dleq", default=False, is_flag=True, help="Do not include DLEQ proofs." ) @click.option( - "--indent", "-i", default=2, is_flag=False, help="Number of spaces to indent JSON with." + "--indent", + "-i", + default=2, + is_flag=False, + help="Number of spaces to indent JSON with.", ) @click.argument("token", type=str, default="") def decode_to_json(token: str, no_dleq: bool, indent: int): @@ -785,6 +790,7 @@ def decode_to_json(token: str, no_dleq: bool, indent: int): else: print("Error: enter a token") + @cli.command("burn", help="Burn spent tokens.") @click.argument("token", required=False, type=str) @click.option("--all", "-a", default=False, is_flag=True, help="Burn all spent tokens.") diff --git a/cashu/wallet/helpers.py b/cashu/wallet/helpers.py index 1a33049..a574b91 100644 --- a/cashu/wallet/helpers.py +++ b/cashu/wallet/helpers.py @@ -130,10 +130,9 @@ async def send( assert len(lock) > 21, Exception( "Error: lock has to be at least 22 characters long." ) - if not lock.startswith("P2PK:"): - raise Exception("Error: lock has to start with P2PK:") # we add a time lock to the P2PK lock by appending the current unix time + 14 days - else: + if lock.startswith("P2PK:") or lock.startswith("P2PK-SIGALL:"): + sigall = lock.startswith("P2PK-SIGALL:") logger.debug(f"Locking token to: {lock}") logger.debug( f"Adding a time lock of {settings.locktime_delta_seconds} seconds." @@ -141,10 +140,12 @@ async def send( secret_lock = await wallet.create_p2pk_lock( lock.split(":")[1], locktime_seconds=settings.locktime_delta_seconds, - sig_all=False, + sig_all=sigall, n_sigs=1, ) logger.debug(f"Secret lock: {secret_lock}") + else: + raise Exception("Error: lock has to start with P2PK: or P2PK-SIGALL:") await wallet.load_proofs() diff --git a/cashu/wallet/htlc.py b/cashu/wallet/htlc.py index 12b1000..1c00323 100644 --- a/cashu/wallet/htlc.py +++ b/cashu/wallet/htlc.py @@ -23,6 +23,7 @@ class WalletHTLC(SupportsDb): hashlock_n_sigs: int | None = None, locktime_seconds: int | None = None, locktime_pubkeys: List[str] | None = None, + locktime_n_sigs: int | None = None, ) -> HTLCSecret: tags = Tags() if locktime_seconds: @@ -32,6 +33,9 @@ class WalletHTLC(SupportsDb): if locktime_pubkeys: tags["refund"] = locktime_pubkeys + if locktime_n_sigs: + tags["n_sigs_refund"] = str(locktime_n_sigs) + if not preimage_hash and preimage: preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest() diff --git a/cashu/wallet/p2pk.py b/cashu/wallet/p2pk.py index 971e9ae..89686ae 100644 --- a/cashu/wallet/p2pk.py +++ b/cashu/wallet/p2pk.py @@ -3,8 +3,11 @@ from typing import List, Optional from loguru import logger +from cashu.core.htlc import HTLCSecret + from ..core.base import ( BlindedMessage, + HTLCWitness, P2PKWitness, Proof, ) @@ -25,6 +28,7 @@ class WalletP2PK(SupportsPrivateKey, SupportsDb): # ---------- P2PK ---------- async def create_p2pk_pubkey(self): + """Create a P2PK public key from the private key.""" assert ( self.private_key ), "No private key set in settings. Set NOSTR_PRIVATE_KEY in .env" @@ -35,12 +39,24 @@ class WalletP2PK(SupportsPrivateKey, SupportsDb): async def create_p2pk_lock( self, - pubkey: str, + data: str, locktime_seconds: Optional[int] = None, tags: Optional[Tags] = None, sig_all: bool = False, n_sigs: int = 1, ) -> P2PKSecret: + """Generate a P2PK secret with the given pubkeys, locktime, tags, and signature flag. + + Args: + data (str): Public key to lock to. + locktime_seconds (Optional[int], optional): Locktime in seconds. Defaults to None. + tags (Optional[Tags], optional): Tags to add to the secret. Defaults to None. + sig_all (bool, optional): Whether to use SIG_ALL spending condition. Defaults to False. + n_sigs (int, optional): Number of signatures required. Defaults to 1. + + Returns: + P2PKSecret: P2PK secret with the given pubkeys, locktime, tags, and signature flag. + """ logger.debug(f"Provided tags: {tags}") if not tags: tags = Tags() @@ -57,12 +73,13 @@ class WalletP2PK(SupportsPrivateKey, SupportsDb): logger.debug(f"After tags: {tags}") return P2PKSecret( kind=SecretKind.P2PK.value, - data=pubkey, + data=data, tags=tags, ) - def sign_proofs(self, proofs: List[Proof]) -> List[str]: + def signatures_proofs_sig_inputs(self, proofs: List[Proof]) -> List[str]: """Signs proof secrets with the private key of the wallet. + This method is used to sign P2PK SIG_INPUTS proofs. Args: proofs (List[Proof]): Proofs to sign @@ -90,34 +107,74 @@ class WalletP2PK(SupportsPrivateKey, SupportsDb): logger.debug(f"Signatures: {signatures}") return signatures - def sign_outputs(self, outputs: List[BlindedMessage]) -> List[str]: + def schnorr_sign_message(self, message: str) -> str: + """Sign a message with the private key of the wallet.""" private_key = self.private_key assert private_key.pubkey - return [ - schnorr_sign( - message=bytes.fromhex(output.B_), - private_key=private_key, - ).hex() - for output in outputs - ] + return schnorr_sign( + message=message.encode("utf-8"), + private_key=private_key, + ).hex() - def add_signature_witnesses_to_outputs( - self, outputs: List[BlindedMessage] - ) -> List[BlindedMessage]: - """Takes a list of outputs and adds a P2PK signatures to each. - Args: - outputs (List[BlindedMessage]): Outputs to add P2PK signatures to - Returns: - List[BlindedMessage]: Outputs with P2PK signatures added + def _inputs_require_sigall(self, proofs: List[Proof]) -> bool: """ - p2pk_signatures = self.sign_outputs(outputs) - for o, s in zip(outputs, p2pk_signatures): - o.witness = P2PKWitness(signatures=[s]).json() - return outputs + Check if any input requires sigall spending condition. + """ + for proof in proofs: + try: + secret = Secret.deserialize(proof.secret) + try: + p2pk_secret = P2PKSecret.from_secret(secret) + if p2pk_secret.sigflag is SigFlags.SIG_ALL: + return True + except Exception: + pass + try: + htlc_secret = HTLCSecret.from_secret(secret) + if htlc_secret.sigflag is SigFlags.SIG_ALL: + return True + except Exception: + pass + except Exception: + # secret is not a spending condition so we treat is a normal secret + pass + return False - def add_witnesses_to_outputs( + def add_witness_swap_sig_all( + self, + proofs: List[Proof], + outputs: List[BlindedMessage], + message_to_sign: Optional[str] = None, + ) -> List[Proof]: + """Determine whether the first input's sig flag is SIG_ALL ()""" + if not self._inputs_require_sigall(proofs): + return proofs + try: + logger.debug("Input requires SIG_ALL") + proofs_to_sign = self.filter_proofs_locked_to_our_pubkey(proofs) + if len(proofs_to_sign) != len(proofs): + raise Exception("Proofs not locked to our pubkey") + secrets = set([Secret.deserialize(p.secret) for p in proofs]) + if not len(secrets) == 1: + raise Exception("Secrets not identical") + message_to_sign = message_to_sign or "".join( + [p.secret for p in proofs] + [o.B_ for o in outputs] + ) + signature = self.schnorr_sign_message(message_to_sign) + # add witness to only the first proof + signed_proofs = self.add_signatures_to_proofs([proofs[0]], [signature]) + proofs[0].witness = signed_proofs[0].witness + logger.debug( + f"SIGALL Adding witness to proof: {proofs[0].secret} with signature: {signature}" + ) + except Exception: + logger.error("not all secrets are the same, skipping SIG_ALL signature") + return proofs + return proofs + + def sign_proofs_inplace_swap( self, proofs: List[Proof], outputs: List[BlindedMessage] - ) -> List[BlindedMessage]: + ) -> List[Proof]: """Adds witnesses to outputs if the inputs (proofs) indicate an appropriate signature flag Args: @@ -126,69 +183,157 @@ class WalletP2PK(SupportsPrivateKey, SupportsDb): Returns: List[BlindedMessage]: Outputs with signatures added """ - # first we check whether all tokens have serialized secrets as their secret - try: - for p in proofs: - secret = Secret.deserialize(p.secret) - except Exception: - # if not, we do not add witnesses (treat as regular token secret) - return outputs + # sign proofs if they are P2PK SIG_INPUTS + proofs = self.add_witnesses_sig_inputs(proofs) + # sign first proof if swap is SIG_ALL + proofs = self.add_witness_swap_sig_all(proofs, outputs) - # if any of the proofs provided is P2PK and requires SIG_ALL, we must signatures to all outputs - if any( - [ - secret.kind == SecretKind.P2PK.value - and P2PKSecret.deserialize(p.secret).sigflag == SigFlags.SIG_ALL - for p in proofs - ] - ): - outputs = self.add_signature_witnesses_to_outputs(outputs) - return outputs - - def add_signature_witnesses_to_proofs(self, proofs: List[Proof]) -> List[Proof]: - p2pk_signatures = self.sign_proofs(proofs) - # attach unlock signatures to proofs - assert len(proofs) == len(p2pk_signatures), "wrong number of signatures" - for p, s in zip(proofs, p2pk_signatures): - # if there are already signatures, append - if p.witness and P2PKWitness.from_witness(p.witness).signatures: - signatures = P2PKWitness.from_witness(p.witness).signatures - p.witness = P2PKWitness(signatures=signatures + [s]).json() - else: - p.witness = P2PKWitness(signatures=[s]).json() return proofs - def add_witnesses_to_proofs(self, proofs: List[Proof]) -> List[Proof]: + def sign_proofs_inplace_melt( + self, proofs: List[Proof], outputs: List[BlindedMessage], quote_id: str + ) -> List[Proof]: + # sign proofs if they are P2PK SIG_INPUTS + proofs = self.add_witnesses_sig_inputs(proofs) + message_to_sign = ( + "".join([p.secret for p in proofs] + [o.B_ for o in outputs]) + quote_id + ) + # sign first proof if swap is SIG_ALL + return self.add_witness_swap_sig_all(proofs, outputs, message_to_sign) + + def add_signatures_to_proofs( + self, proofs: List[Proof], signatures: List[str] + ) -> List[Proof]: + """Add signatures to proofs. Signatures are added as witnesses to the proofs in place. + + Args: + proofs (List[Proof]): Proofs to add signatures to. + signatures (List[str]): Signatures to add to the proofs. + + Returns: + List[Proof]: Proofs with signatures added. + """ + + # attach unlock signatures to proofs + assert len(proofs) == len(signatures), "wrong number of signatures" + for p, s in zip(proofs, signatures): + if Secret.deserialize(p.secret).kind == SecretKind.P2PK.value: + # if there are already signatures, append + if p.witness and P2PKWitness.from_witness(p.witness).signatures: + proof_signatures = P2PKWitness.from_witness(p.witness).signatures + if proof_signatures and s not in proof_signatures: + p.witness = P2PKWitness( + signatures=proof_signatures + [s] + ).json() + else: + p.witness = P2PKWitness(signatures=[s]).json() + elif Secret.deserialize(p.secret).kind == SecretKind.HTLC.value: + # if there are already signatures, append + if p.witness and HTLCWitness.from_witness(p.witness).signatures: + witness = HTLCWitness.from_witness(p.witness) + proof_signatures = witness.signatures + if proof_signatures and s not in proof_signatures: + p.witness = HTLCWitness( + preimage=witness.preimage, signatures=proof_signatures + [s] + ).json() + else: + if p.witness: + witness = HTLCWitness.from_witness(p.witness) + p.witness = HTLCWitness( + preimage=witness.preimage, signatures=[s] + ).json() + else: + p.witness = HTLCWitness(signatures=[s]).json() + else: + raise Exception("Secret kind not supported") + + return proofs + + def filter_proofs_locked_to_our_pubkey(self, proofs: List[Proof]) -> List[Proof]: + """This method assumes that secrets are all P2PK!""" + # filter proofs that require our pubkey + assert self.private_key.pubkey + our_pubkey = self.private_key.pubkey.serialize().hex() + our_pubkey_proofs = [] + for p in proofs: + secret = P2PKSecret.deserialize(p.secret) + pubkeys = ( + [secret.data] + + secret.tags.get_tag_all("pubkeys") + + secret.tags.get_tag_all("refund") + ) + if our_pubkey in pubkeys: + # we are one of the signers + our_pubkey_proofs.append(p) + logger.debug( + f"Locked proofs containing our public key: {len(our_pubkey_proofs)}" + ) + return our_pubkey_proofs + + def sign_p2pk_sig_inputs(self, proofs: List[Proof]) -> List[Proof]: + """Signs P2PK SIG_INPUTS proofs with the private key of the wallet. Ignores proofs that + aren't locked to our public key (filters them out before returning). + Args: + proofs (List[Proof]): Proofs to sign + Returns: + List[Proof]: List of proofs with signatures added + """ + # filter proofs that are P2PK + p2pk_proofs = [] + for p in proofs: + try: + secret = Secret.deserialize(p.secret) + if secret.kind == SecretKind.P2PK.value: + p2pk_proofs.append(p) + if secret.kind == SecretKind.HTLC.value and ( + secret.tags.get_tag("pubkeys") or secret.tags.get_tag("refund") + ): + # HTLC secret with pubkeys tag is a P2PK secret + p2pk_proofs.append(p) + except Exception: + pass + + if not p2pk_proofs: + return [] + + # filter proofs that that are P2PK and SIG_INPUTS + sig_inputs_proofs = [ + p + for p in p2pk_proofs + if P2PKSecret.deserialize(p.secret).sigflag == SigFlags.SIG_INPUTS + ] + if not sig_inputs_proofs: + return [] + + our_pubkey_proofs = self.filter_proofs_locked_to_our_pubkey(sig_inputs_proofs) + p2pk_signatures = self.signatures_proofs_sig_inputs(our_pubkey_proofs) + signed_proofs = self.add_signatures_to_proofs( + our_pubkey_proofs, p2pk_signatures + ) + return signed_proofs + + def add_witnesses_sig_inputs(self, proofs: List[Proof]) -> List[Proof]: """Adds witnesses to proofs for P2PK redemption. This method parses the secret of each proof and determines the correct witness type and adds it to the proof if we have it available. - Note: In order for this method to work, all proofs must have the same secret type. - For P2PK and HTLC, we use an individual signature for each token in proofs. - Args: proofs (List[Proof]): List of proofs to add witnesses to Returns: List[Proof]: List of proofs with witnesses added """ - # first we check whether all tokens have serialized secrets as their secret - try: - for p in proofs: - secret = Secret.deserialize(p.secret) - except Exception: - # if not, we do not add witnesses (treat as regular token secret) - return proofs - logger.debug("Spending conditions detected.") - # check if all secrets are either P2PK or HTLC - if all([secret.kind == SecretKind.P2PK.value for p in proofs]): - proofs = self.add_signature_witnesses_to_proofs(proofs) + # sign P2PK SIG_INPUTS proofs + signed_proofs = self.sign_p2pk_sig_inputs(proofs) + # replace the original proofs with the signed ones + signed_proofs_secrets = [p.secret for p in signed_proofs] + for p in proofs: + if p.secret in signed_proofs_secrets: + proofs[proofs.index(p)] = signed_proofs[ + signed_proofs_secrets.index(p.secret) + ] - # if all([secret.kind == SecretKind.HTLC.value for p in proofs]): - # for p in proofs: - # htlc_secret = HTLCSecret.deserialize(p.secret) - # if htlc_secret.tags.get_tag("pubkeys"): - # p = self.add_signature_witnesses_to_proofs([p])[0] + # TODO: Sign HTLCs that require signatures as well return proofs diff --git a/cashu/wallet/wallet.py b/cashu/wallet/wallet.py index 0a2809f..df96156 100644 --- a/cashu/wallet/wallet.py +++ b/cashu/wallet/wallet.py @@ -519,8 +519,6 @@ class Wallet( await store_bolt11_mint_quote(db=self.db, quote=quote) return quote - - async def mint( self, amount: int, @@ -545,7 +543,9 @@ class Wallet( if split: logger.trace(f"Mint with split: {split}") assert sum(split) == amount, "split must sum to amount" - allowed_amounts = self.get_allowed_amounts() # Get allowed amounts from the mint + allowed_amounts = ( + self.get_allowed_amounts() + ) # Get allowed amounts from the mint for a in split: if a not in allowed_amounts: raise Exception( @@ -618,7 +618,7 @@ class Wallet( and the promises to send (send_outputs). If secret_lock is provided, the wallet will create blinded secrets with those to attach a predefined spending condition to the tokens they want to send. - Calls `add_witnesses_to_proofs` which parses all proofs and checks whether their + Calls `sign_proofs_inplace_swap` which parses all proofs and checks whether their secrets corresponds to any locks that we have the unlock conditions for. If so, it adds the unlock conditions to the proofs. @@ -639,9 +639,6 @@ class Wallet( # make sure we're operating on an independent copy of proofs proofs = copy.copy(proofs) - # potentially add witnesses to unlock provided proofs (if they indicate one) - proofs = self.add_witnesses_to_proofs(proofs) - input_fees = self.get_fees_for_proofs(proofs) logger.trace(f"Input fees: {input_fees}") # create a suitable amounts to keep and send. @@ -672,7 +669,7 @@ class Wallet( outputs, rs = self._construct_outputs(amounts, secrets, rs, self.keyset_id) # potentially add witnesses to outputs based on what requirement the proofs indicate - outputs = self.add_witnesses_to_outputs(proofs, outputs) + proofs = self.sign_proofs_inplace_swap(proofs, outputs) # sort outputs by amount, remember original order sorted_outputs_with_indices = sorted( @@ -681,7 +678,7 @@ class Wallet( original_indices, sorted_outputs = zip(*sorted_outputs_with_indices) # Call swap API - sorted_promises = await super().split(proofs, sorted_outputs) + sorted_promises = await super().split(proofs, list(sorted_outputs)) # sort promises back to original order promises = [ @@ -752,6 +749,8 @@ class Wallet( n_change_outputs * [1], change_secrets, change_rs ) + proofs = self.sign_proofs_inplace_melt(proofs, change_outputs, quote_id) + # store the melt_id in proofs db async with self.db.connect() as conn: for p in proofs: diff --git a/cashu/wallet/wallet_deprecated.py b/cashu/wallet/wallet_deprecated.py index 2c9b6dd..f8fb0a5 100644 --- a/cashu/wallet/wallet_deprecated.py +++ b/cashu/wallet/wallet_deprecated.py @@ -334,7 +334,7 @@ class LedgerAPIDeprecated(SupportsHttpxClient, SupportsMintURL): def _meltrequest_include_fields(proofs: List[Proof]): """strips away fields from the model that aren't necessary for the /melt""" - proofs_include = {"id", "amount", "secret", "C", "script"} + proofs_include = {"id", "amount", "secret", "C", "witness"} return { "proofs": {i: proofs_include for i in range(len(proofs))}, "pr": ..., diff --git a/requirements.txt b/requirements.txt index f02b88f..24012a0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,69 +1,65 @@ aiosqlite==0.20.0 ; python_version >= "3.10" and python_version < "4.0" -anyio==4.6.2.post1 ; python_version >= "3.10" and python_version < "4.0" +anyio==4.8.0 ; python_version >= "3.10" and python_version < "4.0" asn1crypto==1.5.1 ; python_version >= "3.10" and python_version < "4.0" -async-timeout==4.0.3 ; python_version >= "3.10" and python_version < "3.12.0" -asyncpg==0.29.0 ; python_version >= "3.10" and python_version < "4.0" +async-timeout==5.0.1 ; python_version >= "3.10" and python_full_version < "3.11.3" +asyncpg==0.30.0 ; python_version >= "3.10" and python_version < "4.0" base58==2.1.1 ; python_version >= "3.10" and python_version < "4.0" bech32==1.2.0 ; python_version >= "3.10" and python_version < "4.0" bip32==4.0 ; python_version >= "3.10" and python_version < "4.0" bitstring==3.1.9 ; python_version >= "3.10" and python_version < "4.0" bolt11==2.1.0 ; python_version >= "3.10" and python_version < "4.0" +brotli==1.1.0 ; python_version >= "3.10" and python_version < "4.0" cbor2==5.6.5 ; python_version >= "3.10" and python_version < "4.0" -certifi==2024.8.30 ; python_version >= "3.10" and python_version < "4.0" +certifi==2024.12.14 ; python_version >= "3.10" and python_version < "4.0" cffi==1.17.1 ; python_version >= "3.10" and python_version < "4.0" -cfgv==3.4.0 ; python_version >= "3.10" and python_version < "4.0" -click==8.1.7 ; python_version >= "3.10" and python_version < "4.0" -coincurve==21.0.0 ; python_version >= "3.10" and python_version < "4.0" +click==8.1.8 ; python_version >= "3.10" and python_version < "4.0" +coincurve==20.0.0 ; python_version >= "3.10" and python_version < "4.0" colorama==0.4.6 ; python_version >= "3.10" and python_version < "4.0" and (platform_system == "Windows" or sys_platform == "win32") cryptography==43.0.3 ; python_version >= "3.10" and python_version < "4.0" -deprecated==1.2.14 ; python_version >= "3.10" and python_version < "4.0" -distlib==0.3.9 ; python_version >= "3.10" and python_version < "4.0" +deprecated==1.2.15 ; python_version >= "3.10" and python_version < "4.0" ecdsa==0.19.0 ; python_version >= "3.10" and python_version < "4.0" environs==9.5.0 ; python_version >= "3.10" and python_version < "4.0" exceptiongroup==1.2.2 ; python_version >= "3.10" and python_version < "3.11" -fastapi==0.115.4 ; python_version >= "3.10" and python_version < "4.0" -filelock==3.16.1 ; python_version >= "3.10" and python_version < "4.0" -googleapis-common-protos==1.65.0 ; python_version >= "3.10" and python_version < "4.0" +fastapi==0.115.6 ; python_version >= "3.10" and python_version < "4.0" +googleapis-common-protos==1.66.0 ; python_version >= "3.10" and python_version < "4.0" greenlet==3.1.1 ; python_version >= "3.10" and python_version < "4.0" -grpcio-tools==1.67.1 ; python_version >= "3.10" and python_version < "4.0" -grpcio==1.67.1 ; python_version >= "3.10" and python_version < "4.0" +grpcio-tools==1.69.0 ; python_version >= "3.10" and python_version < "4.0" +grpcio==1.69.0 ; python_version >= "3.10" and python_version < "4.0" h11==0.14.0 ; python_version >= "3.10" and python_version < "4.0" -httpcore==1.0.6 ; python_version >= "3.10" and python_version < "4.0" +httpcore==1.0.7 ; python_version >= "3.10" and python_version < "4.0" httpx[socks]==0.25.2 ; python_version >= "3.10" and python_version < "4.0" -identify==2.6.1 ; python_version >= "3.10" and python_version < "4.0" idna==3.10 ; python_version >= "3.10" and python_version < "4.0" importlib-metadata==6.11.0 ; python_version >= "3.10" and python_version < "4.0" -importlib-resources==6.4.5 ; python_version >= "3.10" and python_version < "4.0" -limits==3.13.0 ; python_version >= "3.10" and python_version < "4.0" -loguru==0.7.2 ; python_version >= "3.10" and python_version < "4.0" -marshmallow==3.23.0 ; python_version >= "3.10" and python_version < "4.0" +jinja2==3.1.5 ; python_version >= "3.10" and python_version < "4.0" +limits==4.0.1 ; python_version >= "3.10" and python_version < "4.0" +loguru==0.7.3 ; python_version >= "3.10" and python_version < "4.0" +markupsafe==3.0.2 ; python_version >= "3.10" and python_version < "4.0" +marshmallow==3.25.1 ; python_version >= "3.10" and python_version < "4.0" mnemonic==0.20 ; python_version >= "3.10" and python_version < "4.0" mypy-protobuf==3.6.0 ; python_version >= "3.10" and python_version < "4.0" -nodeenv==1.9.1 ; python_version >= "3.10" and python_version < "4.0" -packaging==24.1 ; python_version >= "3.10" and python_version < "4.0" -platformdirs==4.3.6 ; python_version >= "3.10" and python_version < "4.0" -pre-commit==3.8.0 ; python_version >= "3.10" and python_version < "4.0" -protobuf==5.28.3 ; python_version >= "3.10" and python_version < "4.0" +packaging==24.2 ; python_version >= "3.10" and python_version < "4.0" +protobuf==5.29.3 ; python_version >= "3.10" and python_version < "4.0" pycparser==2.22 ; python_version >= "3.10" and python_version < "4.0" pycryptodomex==3.21.0 ; python_version >= "3.10" and python_version < "4.0" -pydantic==1.10.18 ; python_version >= "3.10" and python_version < "4.0" +pydantic==1.10.21 ; python_version >= "3.10" and python_version < "4.0" +pyjwt==2.10.1 ; python_version >= "3.10" and python_version < "4.0" python-dotenv==1.0.1 ; python_version >= "3.10" and python_version < "4.0" -pyyaml==6.0.2 ; python_version >= "3.10" and python_version < "4.0" +redis==5.2.1 ; python_version >= "3.10" and python_version < "4.0" secp256k1==0.14.0 ; python_version >= "3.10" and python_version < "4.0" -setuptools==75.3.0 ; python_version >= "3.10" and python_version < "4.0" -six==1.16.0 ; python_version >= "3.10" and python_version < "4.0" +setuptools==75.8.0 ; python_version >= "3.10" and python_version < "4.0" +six==1.17.0 ; python_version >= "3.10" and python_version < "4.0" slowapi==0.1.9 ; python_version >= "3.10" and python_version < "4.0" sniffio==1.3.1 ; python_version >= "3.10" and python_version < "4.0" socksio==1.0.0 ; python_version >= "3.10" and python_version < "4.0" -sqlalchemy[asyncio]==2.0.36 ; python_version >= "3.10" and python_version < "4.0" -starlette==0.41.2 ; python_version >= "3.10" and python_version < "4.0" -types-protobuf==5.28.3.20241030 ; python_version >= "3.10" and python_version < "4.0" +sqlalchemy[asyncio]==2.0.37 ; python_version >= "3.10" and python_version < "4.0" +starlette==0.41.3 ; python_version >= "3.10" and python_version < "4.0" +types-protobuf==5.29.1.20241207 ; python_version >= "3.10" and python_version < "4.0" typing-extensions==4.12.2 ; python_version >= "3.10" and python_version < "4.0" uvicorn==0.31.1 ; python_version >= "3.10" and python_version < "4.0" -virtualenv==20.27.1 ; python_version >= "3.10" and python_version < "4.0" websocket-client==1.8.0 ; python_version >= "3.10" and python_version < "4.0" websockets==12.0 ; python_version >= "3.10" and python_version < "4.0" wheel==0.41.3 ; python_version >= "3.10" and python_version < "4.0" -win32-setctime==1.1.0 ; python_version >= "3.10" and python_version < "4.0" and sys_platform == "win32" -wrapt==1.16.0 ; python_version >= "3.10" and python_version < "4.0" -zipp==3.20.2 ; python_version >= "3.10" and python_version < "4.0" +win32-setctime==1.2.0 ; python_version >= "3.10" and python_version < "4.0" and sys_platform == "win32" +wrapt==1.17.2 ; python_version >= "3.10" and python_version < "4.0" +zipp==3.21.0 ; python_version >= "3.10" and python_version < "4.0" +zstandard==0.23.0 ; python_version >= "3.10" and python_version < "4.0" diff --git a/tests/helpers.py b/tests/helpers.py index d769b19..812cb5a 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -215,7 +215,7 @@ def pay_onchain(address: str, sats: int) -> str: return run_cmd(cmd) -async def pay_if_regtest(bolt11: str): +async def pay_if_regtest(bolt11: str) -> None: if is_regtest: pay_real_invoice(bolt11) if is_fake: diff --git a/tests/test_core.py b/tests/test_core.py index bf03663..6637330 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -2,6 +2,7 @@ import pytest from cashu.core.base import TokenV3, TokenV4, Unit from cashu.core.helpers import calculate_number_of_blank_outputs +from cashu.core.secret import Secret, SecretKind, Tags from cashu.core.split import amount_split from cashu.wallet.helpers import deserialize_token_from_string @@ -262,3 +263,52 @@ def test_parse_token_v3_v4_base64_keyset_id(): # this token can not be serialized to V4 token = deserialize_token_from_string(token_v3_base64_keyset_serialized) assert isinstance(token, TokenV3) + + +def test_secret_equality(): + assert Secret( + kind=SecretKind.P2PK.value, data="asd", tags=Tags([["asd", "wasd"], ["mew"]]) + ) == Secret( + kind=SecretKind.P2PK.value, data="asd", tags=Tags([["asd", "wasd"], ["mew"]]) + ) + + +def test_secret_set_dict(): + d = dict() + s = Secret( + kind=SecretKind.P2PK.value, + data="asd", + tags=Tags([["asd", "wasd"], ["mew"]]), + nonce="abcd", + ) + s2 = Secret( + kind=SecretKind.P2PK.value, + data="asd", + tags=Tags([["asd", "wasd"], ["mew"]]), + nonce="efgh", + ) + # test set + assert len(set([s, s2])) == 1 + # test dict + d[s] = "test" + assert d[s] == "test" + assert ( + d[ + Secret( + kind=SecretKind.P2PK.value, + data="asd", + tags=Tags([["asd", "wasd"], ["mew"]]), + ) + ] + == "test" + ) + assert ( + d[ + Secret( + kind=SecretKind.P2PK.value, + data="asd", + tags=Tags([["asd", "wasd"], ["mew"]]), + ) + ] + == "test" + ) diff --git a/tests/test_mint_api.py b/tests/test_mint_api.py index 174f0b6..bbf7df4 100644 --- a/tests/test_mint_api.py +++ b/tests/test_mint_api.py @@ -159,7 +159,7 @@ async def test_api_keyset_keys_old_keyset_id(ledger: Ledger): settings.debug_mint_only_deprecated, reason="settings.debug_mint_only_deprecated is set", ) -async def test_split(ledger: Ledger, wallet: Wallet): +async def test_swap(ledger: Ledger, wallet: Wallet): mint_quote = await wallet.request_mint(64) await pay_if_regtest(mint_quote.request) await wallet.mint(64, quote_id=mint_quote.quote) diff --git a/tests/test_mint_p2pk.py b/tests/test_mint_p2pk.py new file mode 100644 index 0000000..1b28180 --- /dev/null +++ b/tests/test_mint_p2pk.py @@ -0,0 +1,323 @@ +import pytest +import pytest_asyncio + +from cashu.core.base import P2PKWitness +from cashu.mint.ledger import Ledger +from cashu.wallet.wallet import Wallet as Wallet1 +from tests.conftest import SERVER_ENDPOINT +from tests.helpers import pay_if_regtest + + +async def assert_err(f, msg): + """Compute f() and expect an error message 'msg'.""" + try: + await f + except Exception as exc: + if msg not in str(exc.args[0]): + raise Exception(f"Expected error: {msg}, got: {exc.args[0]}") + return + raise Exception(f"Expected error: {msg}, got no error") + + +@pytest_asyncio.fixture(scope="function") +async def wallet1(ledger: Ledger): + wallet1 = await Wallet1.with_db( + url=SERVER_ENDPOINT, + db="test_data/wallet1", + name="wallet1", + ) + await wallet1.load_mint() + yield wallet1 + + +@pytest.mark.asyncio +async def test_ledger_inputs_require_sigall_detection(wallet1: Wallet1, ledger: Ledger): + """Test the ledger function that detects if any inputs require SIG_ALL.""" + # Mint tokens to the wallet + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await ledger.get_mint_quote(mint_quote.quote) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Create two proofs: one with SIG_INPUTS and one with SIG_ALL + pubkey = await wallet1.create_p2pk_pubkey() + + # Create a proof with SIG_INPUTS + secret_lock_inputs = await wallet1.create_p2pk_lock(pubkey, sig_all=False) + _, send_proofs_inputs = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock_inputs + ) + + # Create a new mint quote for the second mint operation + mint_quote_2 = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote_2.request) + await ledger.get_mint_quote(mint_quote_2.quote) + await wallet1.mint(64, quote_id=mint_quote_2.quote) + + # Create a proof with SIG_ALL + secret_lock_all = await wallet1.create_p2pk_lock(pubkey, sig_all=True) + _, send_proofs_all = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock_all + ) + + # Test that _inputs_require_sigall correctly detects SIG_ALL flag + assert not ledger._inputs_require_sigall( + send_proofs_inputs + ), "Should not detect SIG_ALL" + assert ledger._inputs_require_sigall(send_proofs_all), "Should detect SIG_ALL" + + # Test with a mixed list of proofs (should detect SIG_ALL if any proof has it) + mixed_proofs = send_proofs_inputs + send_proofs_all + assert ledger._inputs_require_sigall( + mixed_proofs + ), "Should detect SIG_ALL in mixed list" + + +@pytest.mark.asyncio +async def test_ledger_verify_p2pk_signature_validation( + wallet1: Wallet1, ledger: Ledger +): + """Test the signature validation for P2PK inputs.""" + # Mint tokens to the wallet + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await ledger.get_mint_quote(mint_quote.quote) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Create a p2pk lock + pubkey = await wallet1.create_p2pk_pubkey() + secret_lock = await wallet1.create_p2pk_lock(pubkey) + + # Create locked tokens + _, send_proofs = await wallet1.swap_to_send( + wallet1.proofs, 32, secret_lock=secret_lock + ) + + # Sign the tokens + signed_proofs = wallet1.sign_p2pk_sig_inputs(send_proofs) + assert len(signed_proofs) > 0, "Should have signed proofs" + + # Verify that a valid witness was added to the proofs + for proof in signed_proofs: + assert proof.witness is not None, "Proof should have a witness" + witness = P2PKWitness.from_witness(proof.witness) + assert len(witness.signatures) > 0, "Witness should have a signature" + + # Generate outputs for the swap + output_amounts = [32] + secrets, rs, derivation_paths = await wallet1.generate_n_secrets( + len(output_amounts) + ) + outputs, rs = wallet1._construct_outputs(output_amounts, secrets, rs) + + # The swap should succeed because the signatures are valid + promises = await ledger.swap(proofs=signed_proofs, outputs=outputs) + assert len(promises) == len( + outputs + ), "Should have the same number of promises as outputs" + + # Test for a failure + # Create a fake witness with an incorrect signature + fake_signature = "0" * 128 # Just a fake 64-byte hex string + for proof in send_proofs: + proof.witness = P2PKWitness(signatures=[fake_signature]).json() + + # The swap should fail because the signatures are invalid + await assert_err( + ledger.swap(proofs=send_proofs, outputs=outputs), + "signature threshold not met", + ) + + +@pytest.mark.asyncio +async def test_ledger_verify_incorrect_signature(wallet1: Wallet1, ledger: Ledger): + """Test rejection of incorrect signatures for P2PK inputs.""" + # Mint tokens to the wallet + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await ledger.get_mint_quote(mint_quote.quote) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Create a p2pk lock + pubkey = await wallet1.create_p2pk_pubkey() + secret_lock = await wallet1.create_p2pk_lock(pubkey) + + # Create locked tokens + _, send_proofs = await wallet1.swap_to_send( + wallet1.proofs, 32, secret_lock=secret_lock + ) + + # Create a fake witness with an incorrect signature + fake_signature = "0" * 128 # Just a fake 64-byte hex string + for proof in send_proofs: + proof.witness = P2PKWitness(signatures=[fake_signature]).json() + + # Generate outputs for the swap + output_amounts = [32] + secrets, rs, derivation_paths = await wallet1.generate_n_secrets( + len(output_amounts) + ) + outputs, rs = wallet1._construct_outputs(output_amounts, secrets, rs) + + # The swap should fail because the signatures are invalid + await assert_err( + ledger.swap(proofs=send_proofs, outputs=outputs), + "signature threshold not met", + ) + + +@pytest.mark.asyncio +async def test_ledger_verify_sigall_validation(wallet1: Wallet1, ledger: Ledger): + """Test validation of SIG_ALL signature that covers both inputs and outputs.""" + # Mint tokens to the wallet + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await ledger.get_mint_quote(mint_quote.quote) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Create a p2pk lock with SIG_ALL + pubkey = await wallet1.create_p2pk_pubkey() + secret_lock = await wallet1.create_p2pk_lock(pubkey, sig_all=True) + + # Create locked tokens + _, send_proofs = await wallet1.swap_to_send( + wallet1.proofs, 32, secret_lock=secret_lock + ) + + # Generate outputs for the swap + output_amounts = [32] + secrets, rs, derivation_paths = await wallet1.generate_n_secrets( + len(output_amounts) + ) + outputs, rs = wallet1._construct_outputs(output_amounts, secrets, rs) + + # Create the message to sign (all inputs + all outputs) + message_to_sign = "".join([p.secret for p in send_proofs] + [o.B_ for o in outputs]) + + # Sign the message with the wallet's private key + signature = wallet1.schnorr_sign_message(message_to_sign) + + # Add the signature to the first proof only (as required for SIG_ALL) + send_proofs[0].witness = P2PKWitness(signatures=[signature]).json() + + # The swap should succeed because the SIG_ALL signature is valid + promises = await ledger.swap(proofs=send_proofs, outputs=outputs) + assert len(promises) == len( + outputs + ), "Should have the same number of promises as outputs" + + +@pytest.mark.asyncio +async def test_ledger_verify_incorrect_sigall_signature( + wallet1: Wallet1, ledger: Ledger +): + """Test rejection of incorrect SIG_ALL signatures.""" + # Mint tokens to the wallet + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await ledger.get_mint_quote(mint_quote.quote) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Create a p2pk lock with SIG_ALL + pubkey = await wallet1.create_p2pk_pubkey() + secret_lock = await wallet1.create_p2pk_lock(pubkey, sig_all=True) + + # Create locked tokens + _, send_proofs = await wallet1.swap_to_send( + wallet1.proofs, 32, secret_lock=secret_lock + ) + + # Generate outputs for the swap + output_amounts = [32] + secrets, rs, derivation_paths = await wallet1.generate_n_secrets( + len(output_amounts) + ) + outputs, rs = wallet1._construct_outputs(output_amounts, secrets, rs) + + # Create a fake witness with an incorrect signature + fake_signature = "0" * 128 # Just a fake 64-byte hex string + send_proofs[0].witness = P2PKWitness(signatures=[fake_signature]).json() + + # The swap should fail because the SIG_ALL signature is invalid + await assert_err( + ledger.swap(proofs=send_proofs, outputs=outputs), + "signature threshold not met", + ) + + +@pytest.mark.asyncio +async def test_ledger_swap_p2pk_without_signature(wallet1: Wallet1, ledger: Ledger): + """Test ledger swap with p2pk locked tokens without providing signatures.""" + # Mint tokens to the wallet + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await ledger.get_mint_quote(mint_quote.quote) + await wallet1.mint(64, quote_id=mint_quote.quote) + assert wallet1.balance == 64 + + # Create a p2pk lock with wallet's own public key + pubkey = await wallet1.create_p2pk_pubkey() + secret_lock = await wallet1.create_p2pk_lock(pubkey) + + # Use swap_to_send to create p2pk locked proofs + _, send_proofs = await wallet1.swap_to_send( + wallet1.proofs, 32, secret_lock=secret_lock + ) + + # Generate outputs for the swap + output_amounts = [32] + secrets, rs, derivation_paths = await wallet1.generate_n_secrets( + len(output_amounts) + ) + outputs, rs = wallet1._construct_outputs(output_amounts, secrets, rs) + + # Attempt to swap WITHOUT adding signatures - this should fail + await assert_err( + ledger.swap(proofs=send_proofs, outputs=outputs), + "Witness is missing for p2pk signature", + ) + + +@pytest.mark.asyncio +async def test_ledger_swap_p2pk_with_signature(wallet1: Wallet1, ledger: Ledger): + """Test ledger swap with p2pk locked tokens with proper signatures.""" + # Mint tokens to the wallet + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await ledger.get_mint_quote(mint_quote.quote) + await wallet1.mint(64, quote_id=mint_quote.quote) + assert wallet1.balance == 64 + + # Create a p2pk lock with wallet's own public key + pubkey = await wallet1.create_p2pk_pubkey() + secret_lock = await wallet1.create_p2pk_lock(pubkey) + + # Use swap_to_send to create p2pk locked proofs + _, send_proofs = await wallet1.swap_to_send( + wallet1.proofs, 32, secret_lock=secret_lock + ) + + # Generate outputs for the swap + output_amounts = [32] + secrets, rs, derivation_paths = await wallet1.generate_n_secrets( + len(output_amounts) + ) + outputs, rs = wallet1._construct_outputs(output_amounts, secrets, rs) + + # Sign the p2pk inputs before sending to the ledger + signed_proofs = wallet1.sign_p2pk_sig_inputs(send_proofs) + + # Extract signed proofs and put them back in the send_proofs list + signed_proofs_secrets = [p.secret for p in signed_proofs] + for p in send_proofs: + if p.secret in signed_proofs_secrets: + send_proofs[send_proofs.index(p)] = signed_proofs[ + signed_proofs_secrets.index(p.secret) + ] + + # Now swap with signatures - this should succeed + promises = await ledger.swap(proofs=send_proofs, outputs=outputs) + + # Verify the result + assert len(promises) == len(outputs) + assert [p.amount for p in promises] == [o.amount for o in outputs] diff --git a/tests/test_mint_p2pk_comprehensive.py b/tests/test_mint_p2pk_comprehensive.py new file mode 100644 index 0000000..fed5312 --- /dev/null +++ b/tests/test_mint_p2pk_comprehensive.py @@ -0,0 +1,635 @@ +import copy +import time +from typing import List + +import pytest +import pytest_asyncio + +from cashu.core.base import BlindedMessage, P2PKWitness +from cashu.core.migrations import migrate_databases +from cashu.core.p2pk import P2PKSecret, SigFlags +from cashu.core.secret import Secret, SecretKind, Tags +from cashu.mint.ledger import Ledger +from cashu.wallet import migrations +from cashu.wallet.wallet import Wallet +from tests.conftest import SERVER_ENDPOINT +from tests.helpers import pay_if_regtest + + +async def assert_err(f, msg): + """Compute f() and expect an error message 'msg'.""" + try: + await f + except Exception as exc: + if msg not in str(exc.args[0]): + raise Exception(f"Expected error: {msg}, got: {exc.args[0]}") + return + raise Exception(f"Expected error: {msg}, got no error") + + +@pytest_asyncio.fixture(scope="function") +async def wallet1(ledger: Ledger): + wallet1 = await Wallet.with_db( + url=SERVER_ENDPOINT, + db="test_data/wallet1_p2pk_comprehensive", + name="wallet1", + ) + await migrate_databases(wallet1.db, migrations) + await wallet1.load_mint() + yield wallet1 + + +@pytest_asyncio.fixture(scope="function") +async def wallet2(ledger: Ledger): + wallet2 = await Wallet.with_db( + url=SERVER_ENDPOINT, + db="test_data/wallet2_p2pk_comprehensive", + name="wallet2", + ) + await migrate_databases(wallet2.db, migrations) + await wallet2.load_mint() + yield wallet2 + + +@pytest_asyncio.fixture(scope="function") +async def wallet3(ledger: Ledger): + wallet3 = await Wallet.with_db( + url=SERVER_ENDPOINT, + db="test_data/wallet3_p2pk_comprehensive", + name="wallet3", + ) + await migrate_databases(wallet3.db, migrations) + await wallet3.load_mint() + yield wallet3 + + +@pytest.mark.asyncio +async def test_p2pk_sig_inputs_basic(wallet1: Wallet, wallet2: Wallet, ledger: Ledger): + """Test basic P2PK with SIG_INPUTS.""" + # Mint tokens to wallet1 + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Verify wallet1 has tokens + assert wallet1.balance == 64 + + # Create locked tokens from wallet1 to wallet2 + pubkey_wallet2 = await wallet2.create_p2pk_pubkey() + secret_lock = await wallet1.create_p2pk_lock(pubkey_wallet2) + _, send_proofs = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock + ) + + # Verify that sent tokens have P2PK secrets with SIG_INPUTS flag + for proof in send_proofs: + p2pk_secret = Secret.deserialize(proof.secret) + assert p2pk_secret.kind == SecretKind.P2PK.value + assert P2PKSecret.from_secret(p2pk_secret).sigflag == SigFlags.SIG_INPUTS + + # Try to redeem without signatures (should fail) + unsigned_proofs = copy.deepcopy(send_proofs) + for proof in unsigned_proofs: + proof.witness = None + await assert_err( + ledger.swap( + proofs=unsigned_proofs, outputs=await create_test_outputs(wallet2, 16) + ), + "Witness is missing for p2pk signature", + ) + + # Redeem with proper signatures + signed_proofs = wallet2.sign_p2pk_sig_inputs(send_proofs) + assert all(p.witness is not None for p in signed_proofs) + + # Now swap should succeed + outputs = await create_test_outputs(wallet2, 16) + promises = await ledger.swap(proofs=signed_proofs, outputs=outputs) + assert len(promises) == len(outputs) + + +@pytest.mark.asyncio +async def test_p2pk_sig_all_valid(wallet1: Wallet, wallet2: Wallet, ledger: Ledger): + """Test P2PK with SIG_ALL where the signature covers both inputs and outputs.""" + # Mint tokens to wallet1 + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Create locked tokens with SIG_ALL + pubkey_wallet2 = await wallet2.create_p2pk_pubkey() + secret_lock = await wallet1.create_p2pk_lock(pubkey_wallet2, sig_all=True) + _, send_proofs = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock + ) + + # Verify that sent tokens have P2PK secrets with SIG_ALL flag + for proof in send_proofs: + p2pk_secret = Secret.deserialize(proof.secret) + assert p2pk_secret.kind == SecretKind.P2PK.value + assert P2PKSecret.from_secret(p2pk_secret).sigflag == SigFlags.SIG_ALL + + # Create outputs for redemption + outputs = await create_test_outputs(wallet2, 16) + + # Create a message from concatenated inputs and outputs + message_to_sign = "".join([p.secret for p in send_proofs] + [o.B_ for o in outputs]) + + # Sign with wallet2's private key + signature = wallet2.schnorr_sign_message(message_to_sign) + + # Add the signature to the first proof only (since it's SIG_ALL) + send_proofs[0].witness = P2PKWitness(signatures=[signature]).json() + + # Swap should succeed + promises = await ledger.swap(proofs=send_proofs, outputs=outputs) + assert len(promises) == len(outputs) + + +@pytest.mark.asyncio +async def test_p2pk_sig_all_invalid(wallet1: Wallet, wallet2: Wallet, ledger: Ledger): + """Test P2PK with SIG_ALL where the signature is invalid.""" + # Mint tokens to wallet1 + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Create locked tokens with SIG_ALL + pubkey_wallet2 = await wallet2.create_p2pk_pubkey() + secret_lock = await wallet1.create_p2pk_lock(pubkey_wallet2, sig_all=True) + _, send_proofs = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock + ) + + # Create outputs for redemption + outputs = await create_test_outputs(wallet2, 16) + + # Add an invalid signature + fake_signature = "0" * 128 # Just a fake 64-byte hex string + send_proofs[0].witness = P2PKWitness(signatures=[fake_signature]).json() + + # Swap should fail + await assert_err( + ledger.swap(proofs=send_proofs, outputs=outputs), "signature threshold not met" + ) + + +@pytest.mark.asyncio +async def test_p2pk_sig_all_mixed(wallet1: Wallet, wallet2: Wallet, ledger: Ledger): + """Test that attempting to use mixed SIG_ALL and SIG_INPUTS proofs fails.""" + # Mint tokens to wallet1 + mint_quote = await wallet1.request_mint(128) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(128, quote_id=mint_quote.quote) + + # Create outputs + outputs = await create_test_outputs(wallet2, 32) # 16 + 16 + + # Create a proof with SIG_ALL + pubkey_wallet2 = await wallet2.create_p2pk_pubkey() + secret_lock_all = await wallet1.create_p2pk_lock(pubkey_wallet2, sig_all=True) + _, proofs_sig_all = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock_all + ) + # sign proofs_sig_all + signed_proofs_sig_all = wallet2.add_witness_swap_sig_all(proofs_sig_all, outputs) + + # Mint more tokens to wallet1 for the SIG_INPUTS test + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Create a proof with SIG_INPUTS + secret_lock_inputs = await wallet1.create_p2pk_lock(pubkey_wallet2, sig_all=False) + _, proofs_sig_inputs = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock_inputs + ) + # sign proofs_sig_inputs + signed_proofs_sig_inputs = wallet2.sign_p2pk_sig_inputs(proofs_sig_inputs) + + # Combine the proofs + mixed_proofs = signed_proofs_sig_all + signed_proofs_sig_inputs + + # Add an invalid signature to the SIG_ALL proof + mixed_proofs[0].witness = P2PKWitness(signatures=["0" * 128]).json() + + # Try to use the mixed proofs (should fail) + await assert_err( + ledger.swap(proofs=mixed_proofs, outputs=outputs), + "not all secrets are equal.", + ) + + +@pytest.mark.asyncio +async def test_p2pk_multisig_2_of_3( + wallet1: Wallet, wallet2: Wallet, wallet3: Wallet, ledger: Ledger +): + """Test P2PK with 2-of-3 multisig.""" + # Mint tokens to wallet1 + mint_quote = await wallet1.request_mint(6400) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(6400, quote_id=mint_quote.quote) + + # Get pubkeys from all wallets + pubkey1 = await wallet1.create_p2pk_pubkey() + pubkey2 = await wallet2.create_p2pk_pubkey() + pubkey3 = await wallet3.create_p2pk_pubkey() + + # Create 2-of-3 multisig tokens locked to all three wallets + tags = Tags([["pubkeys", pubkey2, pubkey3]]) + secret_lock = await wallet1.create_p2pk_lock(pubkey1, tags=tags, n_sigs=2) + _, send_proofs = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock + ) + + # Create outputs for redemption + outputs = await create_test_outputs(wallet1, 16) + + # Sign with wallet1 (first signature) + signed_proofs = wallet1.sign_p2pk_sig_inputs(send_proofs) + + # Try to redeem with only 1 signature (should fail) + await assert_err( + ledger.swap(proofs=signed_proofs, outputs=outputs), + "not enough pubkeys (3) or signatures (1) present for n_sigs (2).", + ) + + # Mint new tokens for the second test + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Create new locked tokens + _, send_proofs2 = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock + ) + + # Sign with wallet1 (first signature) + signed_proofs2 = wallet1.sign_p2pk_sig_inputs(send_proofs2) + + # Add signature from wallet2 (second signature) + signed_proofs2 = wallet2.sign_p2pk_sig_inputs(signed_proofs2) + + # Now redemption should succeed with 2 of 3 signatures + # Create outputs for redemption + outputs = await create_test_outputs(wallet1, 16) + promises = await ledger.swap(proofs=signed_proofs2, outputs=outputs) + assert len(promises) == len(outputs) + + # Mint new tokens for the third test + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Create new locked tokens + _, send_proofs3 = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock + ) + + # Alternative: sign with wallet1 and wallet3 + signed_proofs3 = wallet1.sign_p2pk_sig_inputs(send_proofs3) + signed_proofs3 = wallet3.sign_p2pk_sig_inputs(signed_proofs3) + + # This should also succeed + # Create outputs for redemption + outputs = await create_test_outputs(wallet1, 16) + promises2 = await ledger.swap(proofs=signed_proofs3, outputs=outputs) + assert len(promises2) == len(outputs) + + +@pytest.mark.asyncio +async def test_p2pk_timelock(wallet1: Wallet, wallet2: Wallet, ledger: Ledger): + """Test P2PK with a timelock that expires.""" + # Mint tokens to wallet1 + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Create tokens with a 2-second timelock + pubkey_wallet2 = await wallet2.create_p2pk_pubkey() + # Set a past timestamp to ensure test works consistently + past_time = int(time.time()) - 10 + tags = Tags([["locktime", str(past_time)]]) + secret_lock = await wallet1.create_p2pk_lock(pubkey_wallet2, tags=tags) + _, send_proofs = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock + ) + + # Store current time to check if locktime passed + locktime = 0 + for proof in send_proofs: + secret = Secret.deserialize(proof.secret) + p2pk_secret = P2PKSecret.from_secret(secret) + locktime = p2pk_secret.locktime + + # Create outputs + outputs = await create_test_outputs(wallet1, 16) + + # Verify that current time is past the locktime + assert locktime is not None, "Locktime should not be None" + assert ( + int(time.time()) > locktime + ), f"Current time ({int(time.time())}) should be greater than locktime ({locktime})" + + # Try to redeem without signature after locktime (should succeed) + unsigned_proofs = copy.deepcopy(send_proofs) + for proof in unsigned_proofs: + proof.witness = None + + promises = await ledger.swap(proofs=unsigned_proofs, outputs=outputs) + assert len(promises) == len(outputs) + + +@pytest.mark.asyncio +async def test_p2pk_timelock_with_refund_before_locktime( + wallet1: Wallet, wallet2: Wallet, wallet3: Wallet, ledger: Ledger +): + """Test P2PK with a timelock and refund pubkeys before locktime.""" + # Mint tokens to wallet1 + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Get pubkeys + pubkey_wallet2 = await wallet2.create_p2pk_pubkey() # Receiver + pubkey_wallet3 = await wallet3.create_p2pk_pubkey() # Refund key + + # Create tokens with a 2-second timelock and refund key + future_time = int(time.time()) + 60 # 60 seconds in the future + refund_tags = Tags([["refund", pubkey_wallet3], ["locktime", str(future_time)]]) + secret_lock = await wallet1.create_p2pk_lock(pubkey_wallet2, tags=refund_tags) + _, send_proofs = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock + ) + + # Create outputs + outputs = await create_test_outputs(wallet1, 16) + + # Try to redeem without any signature before locktime (should fail) + unsigned_proofs = copy.deepcopy(send_proofs) + for proof in unsigned_proofs: + proof.witness = None + + await assert_err( + ledger.swap(proofs=unsigned_proofs, outputs=outputs), + "Witness is missing for p2pk signature", + ) + + # Try to redeem with refund key signature before locktime (should fail) + refund_signed_proofs = wallet3.sign_p2pk_sig_inputs(send_proofs) + + await assert_err( + ledger.swap(proofs=refund_signed_proofs, outputs=outputs), + "signature threshold not met", # Refund key can't be used before locktime + ) + + +@pytest.mark.asyncio +async def test_p2pk_timelock_with_receiver_signature( + wallet1: Wallet, wallet2: Wallet, wallet3: Wallet, ledger: Ledger +): + """Test P2PK with a timelock and refund pubkeys with receiver signature.""" + # Mint tokens to wallet1 + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Get pubkeys + pubkey_wallet2 = await wallet2.create_p2pk_pubkey() # Receiver + pubkey_wallet3 = await wallet3.create_p2pk_pubkey() # Refund key + + # Create tokens with a 2-second timelock and refund key + future_time = int(time.time()) + 60 # 60 seconds in the future + refund_tags = Tags([["refund", pubkey_wallet3], ["locktime", str(future_time)]]) + secret_lock = await wallet1.create_p2pk_lock(pubkey_wallet2, tags=refund_tags) + _, send_proofs = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock + ) + + # Create outputs + outputs = await create_test_outputs(wallet1, 16) + + # Try to redeem with the correct receiver signature (should succeed) + receiver_signed_proofs = wallet2.sign_p2pk_sig_inputs(send_proofs) + + promises = await ledger.swap(proofs=receiver_signed_proofs, outputs=outputs) + assert len(promises) == len(outputs) + + +@pytest.mark.asyncio +async def test_p2pk_timelock_with_refund_after_locktime( + wallet1: Wallet, wallet2: Wallet, wallet3: Wallet, ledger: Ledger +): + """Test P2PK with a timelock and refund pubkeys after locktime.""" + # Mint tokens to wallet1 + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Get pubkeys + pubkey_wallet2 = await wallet2.create_p2pk_pubkey() # Receiver + pubkey_wallet3 = await wallet3.create_p2pk_pubkey() # Refund key + + # Create tokens with a past timestamp for locktime testing + past_time = int(time.time()) - 10 # 10 seconds in the past + refund_tags_past = Tags([["refund", pubkey_wallet3], ["locktime", str(past_time)]]) + secret_lock_past = await wallet1.create_p2pk_lock( + pubkey_wallet2, tags=refund_tags_past + ) + _, send_proofs3 = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock_past + ) + + # Try to redeem with refund key after locktime (should succeed) + refund_signed_proofs2 = wallet3.sign_p2pk_sig_inputs(send_proofs3) + + # This should work because locktime has passed and refund key is used + # Create outputs + outputs = await create_test_outputs(wallet1, 16) + promises2 = await ledger.swap(proofs=refund_signed_proofs2, outputs=outputs) + assert len(promises2) == len(outputs) + + +@pytest.mark.asyncio +async def test_p2pk_n_sigs_refund( + wallet1: Wallet, wallet2: Wallet, wallet3: Wallet, ledger: Ledger +): + """Test P2PK with a timelock and multiple refund pubkeys with n_sigs_refund.""" + # Mint tokens to wallet1 + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Get pubkeys + pubkey_wallet1 = await wallet1.create_p2pk_pubkey() # Receiver + pubkey_wallet2 = await wallet2.create_p2pk_pubkey() # Refund key 1 + pubkey_wallet3 = await wallet3.create_p2pk_pubkey() # Refund key 2 + + # Create tokens with a future timelock and 2-of-2 refund requirement + future_time = int(time.time()) + 60 # 60 seconds in the future + refund_tags = Tags( + [ + ["refund", pubkey_wallet2, pubkey_wallet3], + ["n_sigs_refund", "2"], + ["locktime", str(future_time)], + ] + ) + secret_lock = await wallet1.create_p2pk_lock(pubkey_wallet1, tags=refund_tags) + _, send_proofs = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock + ) + + # Create outputs + outputs = await create_test_outputs(wallet1, 16) + + # Mint new tokens for receiver test + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Create new locked tokens + _, send_proofs2 = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock + ) + + # Try to redeem with receiver key (should succeed before locktime) + receiver_signed_proofs = wallet1.sign_p2pk_sig_inputs(send_proofs2) + promises = await ledger.swap(proofs=receiver_signed_proofs, outputs=outputs) + assert len(promises) == len(outputs) + + # Mint new tokens for the refund test + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Create tokens with a past locktime for refund testing + past_time = int(time.time()) - 10 # 10 seconds in the past + refund_tags_past = Tags( + [ + ["refund", pubkey_wallet2, pubkey_wallet3], + ["n_sigs_refund", "2"], + ["locktime", str(past_time)], + ] + ) + secret_lock_past = await wallet1.create_p2pk_lock( + pubkey_wallet1, tags=refund_tags_past + ) + _, send_proofs3 = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock_past + ) + + # Try to redeem with only one refund key signature (should fail) + refund_signed_proofs = wallet2.sign_p2pk_sig_inputs(send_proofs3) + + await assert_err( + ledger.swap(proofs=refund_signed_proofs, outputs=outputs), + "not enough pubkeys (2) or signatures (1) present for n_sigs (2).", + ) + + # Mint new tokens for the final test + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Create tokens with same past locktime + _, send_proofs4 = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock_past + ) + + # Add both refund signatures + refund_signed_proofs2 = wallet2.sign_p2pk_sig_inputs(send_proofs4) + refund_signed_proofs2 = wallet3.sign_p2pk_sig_inputs(refund_signed_proofs2) + + # Now it should succeed with 2-of-2 refund signatures + # Create outputs + outputs = await create_test_outputs(wallet1, 16) + promises2 = await ledger.swap(proofs=refund_signed_proofs2, outputs=outputs) + assert len(promises2) == len(outputs) + + +@pytest.mark.asyncio +async def test_p2pk_invalid_pubkey_check( + wallet1: Wallet, wallet2: Wallet, ledger: Ledger +): + """Test that an invalid public key is properly rejected.""" + # Mint tokens to wallet1 + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Create an invalid pubkey string (too short) + invalid_pubkey = "03aaff" + + # Try to create a P2PK lock with invalid pubkey + # This should fail in create_p2pk_lock, but if it doesn't, let's handle it gracefully + try: + secret_lock = await wallet1.create_p2pk_lock(invalid_pubkey) + _, send_proofs = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock + ) + + # Create outputs + outputs = await create_test_outputs(wallet1, 16) + + # Verify it fails during validation + await assert_err( + ledger.swap(proofs=send_proofs, outputs=outputs), + "failed to deserialize pubkey", # Generic error for pubkey issues + ) + except Exception as e: + # If it fails during creation, that's fine too + assert ( + "pubkey" in str(e).lower() or "key" in str(e).lower() + ), f"Expected error about invalid public key, got: {str(e)}" + + +@pytest.mark.asyncio +async def test_p2pk_sig_all_with_multiple_pubkeys( + wallet1: Wallet, wallet2: Wallet, wallet3: Wallet, ledger: Ledger +): + """Test SIG_ALL combined with multiple pubkeys/n_sigs.""" + # Mint tokens to wallet1 + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Get pubkeys + pubkey1 = await wallet1.create_p2pk_pubkey() + pubkey2 = await wallet2.create_p2pk_pubkey() + pubkey3 = await wallet3.create_p2pk_pubkey() + + # Create tokens with SIG_ALL and 2-of-3 multisig + tags = Tags([["pubkeys", pubkey2, pubkey3]]) + secret_lock = await wallet1.create_p2pk_lock( + pubkey1, tags=tags, n_sigs=2, sig_all=True + ) + _, send_proofs = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock + ) + + # Create outputs + outputs = await create_test_outputs(wallet1, 16) + + # Create message to sign (all inputs + all outputs) + message_to_sign = "".join([p.secret for p in send_proofs] + [o.B_ for o in outputs]) + + # Sign with wallet1's key + signature1 = wallet1.schnorr_sign_message(message_to_sign) + + # Sign with wallet2's key + signature2 = wallet2.schnorr_sign_message(message_to_sign) + + # Add both signatures to the first proof only (SIG_ALL) + send_proofs[0].witness = P2PKWitness(signatures=[signature1, signature2]).json() + + # This should succeed with 2 valid signatures + promises = await ledger.swap(proofs=send_proofs, outputs=outputs) + assert len(promises) == len(outputs) + + +async def create_test_outputs(wallet: Wallet, amount: int) -> List[BlindedMessage]: + """Helper to create blinded outputs for testing.""" + output_amounts = [amount] + secrets, rs, _ = await wallet.generate_n_secrets(len(output_amounts)) + outputs, _ = wallet._construct_outputs(output_amounts, secrets, rs) + return outputs diff --git a/tests/test_wallet_htlc.py b/tests/test_wallet_htlc.py index 22da648..8a2ab36 100644 --- a/tests/test_wallet_htlc.py +++ b/tests/test_wallet_htlc.py @@ -1,4 +1,5 @@ import asyncio +import copy import hashlib import secrets from typing import List @@ -10,6 +11,8 @@ from cashu.core.base import HTLCWitness, Proof from cashu.core.crypto.secp import PrivateKey from cashu.core.htlc import HTLCSecret from cashu.core.migrations import migrate_databases +from cashu.core.p2pk import SigFlags +from cashu.core.secret import SecretKind from cashu.wallet import migrations from cashu.wallet.wallet import Wallet from cashu.wallet.wallet import Wallet as Wallet1 @@ -107,7 +110,7 @@ async def test_htlc_redeem_with_wrong_preimage(wallet1: Wallet, wallet2: Wallet) for p in send_proofs: p.witness = HTLCWitness(preimage=preimage).json() await assert_err( - wallet2.redeem(send_proofs), "Mint Error: HTLC preimage does not match" + wallet1.redeem(send_proofs), "Mint Error: HTLC preimage does not match" ) @@ -143,7 +146,7 @@ async def test_htlc_redeem_with_wrong_signature(wallet1: Wallet, wallet2: Wallet preimage=preimage, hashlock_pubkeys=[pubkey_wallet1] ) _, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret) - signatures = wallet1.sign_proofs(send_proofs) + signatures = wallet1.signatures_proofs_sig_inputs(send_proofs) for p, s in zip(send_proofs, signatures): p.witness = HTLCWitness( preimage=preimage, signatures=[f"{s[:-5]}11111"] @@ -151,7 +154,7 @@ async def test_htlc_redeem_with_wrong_signature(wallet1: Wallet, wallet2: Wallet await assert_err( wallet2.redeem(send_proofs), - "Mint Error: no valid signature provided for input.", + "Mint Error: signature threshold not met", ) @@ -168,11 +171,11 @@ async def test_htlc_redeem_with_correct_signature(wallet1: Wallet, wallet2: Wall ) _, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret) - signatures = wallet1.sign_proofs(send_proofs) + signatures = wallet1.signatures_proofs_sig_inputs(send_proofs) for p, s in zip(send_proofs, signatures): p.witness = HTLCWitness(preimage=preimage, signatures=[s]).json() - await wallet2.redeem(send_proofs) + await wallet1.redeem(send_proofs) @pytest.mark.asyncio @@ -191,8 +194,8 @@ async def test_htlc_redeem_with_2_of_1_signatures(wallet1: Wallet, wallet2: Wall ) _, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret) - signatures1 = wallet1.sign_proofs(send_proofs) - signatures2 = wallet2.sign_proofs(send_proofs) + signatures1 = wallet1.signatures_proofs_sig_inputs(send_proofs) + signatures2 = wallet2.signatures_proofs_sig_inputs(send_proofs) for p, s1, s2 in zip(send_proofs, signatures1, signatures2): p.witness = HTLCWitness(preimage=preimage, signatures=[s1, s2]).json() @@ -215,8 +218,8 @@ async def test_htlc_redeem_with_2_of_2_signatures(wallet1: Wallet, wallet2: Wall ) _, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret) - signatures1 = wallet1.sign_proofs(send_proofs) - signatures2 = wallet2.sign_proofs(send_proofs) + signatures1 = wallet1.signatures_proofs_sig_inputs(send_proofs) + signatures2 = wallet2.signatures_proofs_sig_inputs(send_proofs) for p, s1, s2 in zip(send_proofs, signatures1, signatures2): p.witness = HTLCWitness(preimage=preimage, signatures=[s1, s2]).json() @@ -241,8 +244,8 @@ async def test_htlc_redeem_with_2_of_2_signatures_with_duplicate_pubkeys( ) _, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret) - signatures1 = wallet1.sign_proofs(send_proofs) - signatures2 = wallet2.sign_proofs(send_proofs) + signatures1 = wallet1.signatures_proofs_sig_inputs(send_proofs) + signatures2 = wallet2.signatures_proofs_sig_inputs(send_proofs) for p, s1, s2 in zip(send_proofs, signatures1, signatures2): p.witness = HTLCWitness(preimage=preimage, signatures=[s1, s2]).json() @@ -270,14 +273,14 @@ async def test_htlc_redeem_with_3_of_3_signatures_but_only_2_provided( ) _, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret) - signatures1 = wallet1.sign_proofs(send_proofs) - signatures2 = wallet2.sign_proofs(send_proofs) + signatures1 = wallet1.signatures_proofs_sig_inputs(send_proofs) + signatures2 = wallet2.signatures_proofs_sig_inputs(send_proofs) for p, s1, s2 in zip(send_proofs, signatures1, signatures2): p.witness = HTLCWitness(preimage=preimage, signatures=[s1, s2]).json() await assert_err( wallet2.redeem(send_proofs), - "Mint Error: not enough signatures provided: 2 < 3.", + "Mint Error: not enough pubkeys (2) or signatures (2) present for n_sigs (3).", ) @@ -303,8 +306,8 @@ async def test_htlc_redeem_with_2_of_3_signatures_with_2_valid_and_1_invalid_pro ) _, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret) - signatures1 = wallet1.sign_proofs(send_proofs) - signatures2 = wallet2.sign_proofs(send_proofs) + signatures1 = wallet1.signatures_proofs_sig_inputs(send_proofs) + signatures2 = wallet2.signatures_proofs_sig_inputs(send_proofs) signatures3 = [f"{s[:-5]}11111" for s in signatures1] # wrong signature for p, s1, s2, s3 in zip(send_proofs, signatures1, signatures2, signatures3): p.witness = HTLCWitness(preimage=preimage, signatures=[s1, s2, s3]).json() @@ -312,39 +315,6 @@ async def test_htlc_redeem_with_2_of_3_signatures_with_2_valid_and_1_invalid_pro await wallet2.redeem(send_proofs) -@pytest.mark.asyncio -async def test_htlc_redeem_with_3_of_3_signatures_with_2_valid_and_1_invalid_provided( - wallet1: Wallet, wallet2: Wallet -): - mint_quote = await wallet1.request_mint(64) - await pay_if_regtest(mint_quote.request) - await wallet1.mint(64, quote_id=mint_quote.quote) - preimage = "00000000000000000000000000000000" - pubkey_wallet1 = await wallet1.create_p2pk_pubkey() - pubkey_wallet2 = await wallet2.create_p2pk_pubkey() - privatekey_wallet3 = PrivateKey(secrets.token_bytes(32), raw=True) - assert privatekey_wallet3.pubkey - pubkey_wallet3 = privatekey_wallet3.pubkey.serialize().hex() - - # preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest() - secret = await wallet1.create_htlc_lock( - preimage=preimage, - hashlock_pubkeys=[pubkey_wallet1, pubkey_wallet2, pubkey_wallet3], - hashlock_n_sigs=3, - ) - _, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret) - - signatures1 = wallet1.sign_proofs(send_proofs) - signatures2 = wallet2.sign_proofs(send_proofs) - signatures3 = [f"{s[:-5]}11111" for s in signatures1] # wrong signature - for p, s1, s2, s3 in zip(send_proofs, signatures1, signatures2, signatures3): - p.witness = HTLCWitness(preimage=preimage, signatures=[s1, s2, s3]).json() - - await assert_err( - wallet2.redeem(send_proofs), "Mint Error: signature threshold not met. 2 < 3." - ) - - @pytest.mark.asyncio async def test_htlc_redeem_hashlock_wrong_signature_timelock_correct_signature( wallet1: Wallet, wallet2: Wallet @@ -364,14 +334,14 @@ async def test_htlc_redeem_hashlock_wrong_signature_timelock_correct_signature( ) _, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret) - signatures = wallet1.sign_proofs(send_proofs) + signatures = wallet1.signatures_proofs_sig_inputs(send_proofs) for p, s in zip(send_proofs, signatures): p.witness = HTLCWitness(preimage=preimage, signatures=[s]).json() # should error because we used wallet2 signatures for the hash lock await assert_err( wallet1.redeem(send_proofs), - "Mint Error: no valid signature provided for input.", + "Mint Error: signature threshold not met", ) await asyncio.sleep(2) @@ -394,11 +364,12 @@ async def test_htlc_redeem_hashlock_wrong_signature_timelock_wrong_signature( preimage=preimage, hashlock_pubkeys=[pubkey_wallet2], locktime_seconds=2, - locktime_pubkeys=[pubkey_wallet1], + locktime_pubkeys=[pubkey_wallet1, pubkey_wallet2], + locktime_n_sigs=2, ) _, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret) - signatures = wallet1.sign_proofs(send_proofs) + signatures = wallet1.signatures_proofs_sig_inputs(send_proofs) for p, s in zip(send_proofs, signatures): p.witness = HTLCWitness( preimage=preimage, signatures=[f"{s[:-5]}11111"] @@ -407,12 +378,175 @@ async def test_htlc_redeem_hashlock_wrong_signature_timelock_wrong_signature( # should error because we used wallet2 signatures for the hash lock await assert_err( wallet1.redeem(send_proofs), - "Mint Error: no valid signature provided for input.", + "Mint Error: signature threshold not met. 0 < 1.", ) await asyncio.sleep(2) - # should fail since lock time has passed and we provided a wrong signature for timelock + # should fail since lock time has passed and we provided not enough signatures for the timelock locktime_n_sigs await assert_err( wallet1.redeem(send_proofs), - "Mint Error: no valid signature provided for input.", + "Mint Error: signature threshold not met. 1 < 2.", ) + + +@pytest.mark.asyncio +async def test_htlc_redeem_timelock_2_of_2_signatures(wallet1: Wallet, wallet2: Wallet): + """Testing the 2-of-2 timelock (refund) signature case.""" + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + preimage = "00000000000000000000000000000000" + pubkey_wallet1 = await wallet1.create_p2pk_pubkey() + pubkey_wallet2 = await wallet2.create_p2pk_pubkey() + # preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest() + secret = await wallet1.create_htlc_lock( + preimage=preimage, + hashlock_pubkeys=[pubkey_wallet2], + locktime_seconds=2, + locktime_pubkeys=[pubkey_wallet1, pubkey_wallet2], + locktime_n_sigs=2, + ) + _, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret) + send_proofs_copy = send_proofs.copy() + + signatures = wallet1.signatures_proofs_sig_inputs(send_proofs) + for p, s in zip(send_proofs, signatures): + p.witness = HTLCWitness(preimage=preimage, signatures=[s]).json() + + # should error because we used wallet2 signatures for the hash lock + await assert_err( + wallet1.redeem(send_proofs), + "Mint Error: signature threshold not met. 0 < 1.", + ) + + await asyncio.sleep(2) + # locktime has passed + + # should fail. lock time has passed but we provided only wallet1 signature for timelock, we need 2 though + await assert_err( + wallet1.redeem(send_proofs), + "Mint Error: not enough pubkeys (2) or signatures (1) present for n_sigs (2).", + ) + + # let's add the second signature + send_proofs_copy = wallet2.sign_p2pk_sig_inputs(send_proofs_copy) + + # now we can redeem it + await wallet1.redeem(send_proofs_copy) + + +@pytest.mark.asyncio +async def test_htlc_sigall_behavior(wallet1: Wallet, wallet2: Wallet): + """Test HTLC with SIG_ALL flag, requiring signatures on both inputs and outputs.""" + # Mint tokens for testing + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Setup HTLC parameters + preimage = "00000000000000000000000000000000" + # preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest() + pubkey_wallet2 = await wallet2.create_p2pk_pubkey() + + # Create HTLC lock with SIG_ALL flag + secret = await wallet1.create_htlc_lock( + preimage=preimage, + hashlock_pubkeys=[pubkey_wallet2], + hashlock_n_sigs=1, + ) + + # Modify the secret to use SIG_ALL + secret.tags["sigflag"] = SigFlags.SIG_ALL.value + + # Send tokens with this HTLC lock + _, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret) + + # verify sigflag is SIG_ALL + assert HTLCSecret.from_secret(secret).kind == SecretKind.HTLC.value + assert HTLCSecret.from_secret(secret).sigflag == SigFlags.SIG_ALL + + # first redeem fails because no preimage + await assert_err( + wallet2.redeem(send_proofs), "Mint Error: no HTLC preimage provided" + ) + + # we add the preimage to the proof + for p in send_proofs: + p.witness = HTLCWitness(preimage=preimage).json() + + # Should succeed, redeem adds signatures to the proof + await wallet2.redeem(send_proofs) + + +@pytest.mark.asyncio +async def test_htlc_n_sigs_refund_locktime(wallet1: Wallet, wallet2: Wallet): + """Test HTLC with n_sigs_refund parameter requiring multiple signatures for refund after locktime.""" + # Create a third wallet for the third signature + wallet3 = await Wallet.with_db( + SERVER_ENDPOINT, "test_data/wallet_htlc_3", "wallet3" + ) + await migrate_databases(wallet3.db, migrations) + wallet3.private_key = PrivateKey(secrets.token_bytes(32), raw=True) + await wallet3.load_mint() + + # Mint tokens for testing + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Setup parameters + preimage = "00000000000000000000000000000000" + pubkey_wallet1 = await wallet1.create_p2pk_pubkey() + pubkey_wallet2 = await wallet2.create_p2pk_pubkey() + pubkey_wallet3 = await wallet3.create_p2pk_pubkey() + + # Wrong preimage - making it so we can only spend via locktime + wrong_preimage = "11111111111111111111111111111111" + + # Create HTLC with: + # 1. Timelock in the past + # 2. Three refund pubkeys with 2-of-3 signature requirement + secret = await wallet1.create_htlc_lock( + preimage=wrong_preimage, # this ensures we can't redeem via preimage + hashlock_pubkeys=[pubkey_wallet2], + locktime_seconds=-200000, + locktime_pubkeys=[pubkey_wallet1, pubkey_wallet2, pubkey_wallet3], + locktime_n_sigs=2, # require 2 of 3 signatures for refund + ) + + # # Send tokens with this lock + _, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret) + send_proofs_copy = copy.deepcopy(send_proofs) + + # # First, try correct preimage but should fail as we're using wrong preimage hash + # for p in send_proofs: + # p.witness = HTLCWitness(preimage=preimage).json() + + # await assert_err( + # wallet2.redeem(send_proofs), "Mint Error: HTLC preimage does not match" + # ) + + # # Wait for locktime to pass + # await asyncio.sleep(2) + + # Try redeeming with only 1 signature after locktime + signatures1 = wallet1.signatures_proofs_sig_inputs(send_proofs_copy) + for p, sig in zip(send_proofs_copy, signatures1): + p.witness = HTLCWitness(preimage=preimage, signatures=[sig]).json() + + # Should fail because we need 2 signatures + await assert_err( + wallet1.redeem(send_proofs_copy), + "Mint Error: not enough pubkeys (3) or signatures (1) present for n_sigs (2)", + ) + + # Make a fresh copy and add 2 signatures + send_proofs_copy2 = copy.deepcopy(send_proofs) + signatures1 = wallet1.signatures_proofs_sig_inputs(send_proofs_copy2) + signatures2 = wallet2.signatures_proofs_sig_inputs(send_proofs_copy2) + + for p, sig1, sig2 in zip(send_proofs_copy2, signatures1, signatures2): + p.witness = HTLCWitness(preimage=preimage, signatures=[sig1, sig2]).json() + + # Should succeed with 2 of 3 signatures after locktime + await wallet1.redeem(send_proofs_copy2) diff --git a/tests/test_wallet_p2pk.py b/tests/test_wallet_p2pk.py index 94afbc5..794ffb8 100644 --- a/tests/test_wallet_p2pk.py +++ b/tests/test_wallet_p2pk.py @@ -1,17 +1,19 @@ import asyncio import copy +import hashlib import json import secrets from typing import List import pytest import pytest_asyncio +from coincurve import PrivateKey as CoincurvePrivateKey -from cashu.core.base import Proof +from cashu.core.base import P2PKWitness, Proof from cashu.core.crypto.secp import PrivateKey, PublicKey from cashu.core.migrations import migrate_databases -from cashu.core.p2pk import SigFlags -from cashu.core.secret import Tags +from cashu.core.p2pk import P2PKSecret, SigFlags +from cashu.core.secret import Secret, SecretKind, Tags from cashu.wallet import migrations from cashu.wallet.wallet import Wallet from cashu.wallet.wallet import Wallet as Wallet1 @@ -121,7 +123,7 @@ async def test_p2pk_receive_with_wrong_private_key(wallet1: Wallet, wallet2: Wal wallet2.private_key = PrivateKey() # wrong private key await assert_err( wallet2.redeem(send_proofs), - "Mint Error: no valid signature provided for input.", + "", ) @@ -145,7 +147,7 @@ async def test_p2pk_short_locktime_receive_with_wrong_private_key( send_proofs_copy = copy.deepcopy(send_proofs) await assert_err( wallet2.redeem(send_proofs), - "Mint Error: no valid signature provided for input.", + "", ) await asyncio.sleep(2) # should succeed because even with the wrong private key we @@ -160,7 +162,8 @@ async def test_p2pk_locktime_with_refund_pubkey(wallet1: Wallet, wallet2: Wallet await wallet1.mint(64, quote_id=mint_quote.quote) pubkey_wallet2 = await wallet2.create_p2pk_pubkey() # receiver side # sender side - garbage_pubkey = PrivateKey().pubkey + garbage_priv = PrivateKey(secrets.token_bytes(32), raw=True) + garbage_pubkey = garbage_priv.pubkey assert garbage_pubkey secret_lock = await wallet1.create_p2pk_lock( garbage_pubkey.serialize().hex(), # create lock to unspendable pubkey @@ -175,7 +178,7 @@ async def test_p2pk_locktime_with_refund_pubkey(wallet1: Wallet, wallet2: Wallet # and locktime has not passed await assert_err( wallet2.redeem(send_proofs), - "Mint Error: no valid signature provided for input.", + "", ) await asyncio.sleep(2) # we can now redeem because of the refund locktime @@ -189,7 +192,8 @@ async def test_p2pk_locktime_with_wrong_refund_pubkey(wallet1: Wallet, wallet2: await wallet1.mint(64, quote_id=mint_quote.quote) await wallet2.create_p2pk_pubkey() # receiver side # sender side - garbage_pubkey = PrivateKey().pubkey + garbage_priv = PrivateKey(secrets.token_bytes(32), raw=True) + garbage_pubkey = garbage_priv.pubkey garbage_pubkey_2 = PrivateKey().pubkey assert garbage_pubkey assert garbage_pubkey_2 @@ -206,13 +210,13 @@ async def test_p2pk_locktime_with_wrong_refund_pubkey(wallet1: Wallet, wallet2: # and locktime has not passed await assert_err( wallet2.redeem(send_proofs), - "Mint Error: no valid signature provided for input.", + "", ) await asyncio.sleep(2) # we still can't redeem it because we used garbage_pubkey_2 as a refund pubkey await assert_err( wallet2.redeem(send_proofs_copy), - "Mint Error: no valid signature provided for input.", + "", ) @@ -226,7 +230,8 @@ async def test_p2pk_locktime_with_second_refund_pubkey( pubkey_wallet1 = await wallet1.create_p2pk_pubkey() # receiver side pubkey_wallet2 = await wallet2.create_p2pk_pubkey() # receiver side # sender side - garbage_pubkey = PrivateKey().pubkey + garbage_priv = PrivateKey(secrets.token_bytes(32), raw=True) + garbage_pubkey = garbage_priv.pubkey assert garbage_pubkey secret_lock = await wallet1.create_p2pk_lock( garbage_pubkey.serialize().hex(), # create lock to unspendable pubkey @@ -241,15 +246,65 @@ async def test_p2pk_locktime_with_second_refund_pubkey( send_proofs_copy = copy.deepcopy(send_proofs) # receiver side: can't redeem since we used a garbage pubkey # and locktime has not passed + # WALLET WILL ADD A SIGNATURE BECAUSE IT SEES ITS REFUND PUBKEY (it adds a signature even though the locktime hasn't passed) await assert_err( wallet1.redeem(send_proofs), - "Mint Error: no valid signature provided for input.", + "Mint Error: signature threshold not met. 0 < 1.", ) await asyncio.sleep(2) # we can now redeem because of the refund locktime await wallet1.redeem(send_proofs_copy) +@pytest.mark.asyncio +async def test_p2pk_locktime_with_2_of_2_refund_pubkeys( + wallet1: Wallet, wallet2: Wallet +): + """Testing the case where we expect a 2-of-2 signature from the refund pubkeys""" + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + pubkey_wallet1 = await wallet1.create_p2pk_pubkey() # receiver side + pubkey_wallet2 = await wallet2.create_p2pk_pubkey() # receiver side + # sender side + garbage_priv = PrivateKey(secrets.token_bytes(32), raw=True) + garbage_pubkey = garbage_priv.pubkey + assert garbage_pubkey + secret_lock = await wallet1.create_p2pk_lock( + garbage_pubkey.serialize().hex(), # create lock to unspendable pubkey + locktime_seconds=2, # locktime + tags=Tags( + [["refund", pubkey_wallet2, pubkey_wallet1], ["n_sigs_refund", "2"]], + ), # multiple refund pubkeys + ) # sender side + _, send_proofs = await wallet1.swap_to_send( + wallet1.proofs, 8, secret_lock=secret_lock + ) + # we need to copy the send_proofs because the redeem function + # modifies the send_proofs in place by adding the signatures + send_proofs_copy = copy.deepcopy(send_proofs) + send_proofs_copy2 = copy.deepcopy(send_proofs) + # receiver side: can't redeem since we used a garbage pubkey + # and locktime has not passed + await assert_err( + wallet1.redeem(send_proofs), + "Mint Error: signature threshold not met. 0 < 1.", + ) + await asyncio.sleep(2) + + # now is the refund time, but we can't redeem it because we need 2 signatures + await assert_err( + wallet1.redeem(send_proofs_copy), + "not enough pubkeys (2) or signatures (1) present for n_sigs (2)", + ) + + # let's add the second signature + send_proofs_copy2 = wallet2.sign_p2pk_sig_inputs(send_proofs_copy2) + + # now we can redeem it + await wallet1.redeem(send_proofs_copy2) + + @pytest.mark.asyncio async def test_p2pk_multisig_2_of_2(wallet1: Wallet, wallet2: Wallet): mint_quote = await wallet1.request_mint(64) @@ -267,7 +322,7 @@ async def test_p2pk_multisig_2_of_2(wallet1: Wallet, wallet2: Wallet): wallet1.proofs, 8, secret_lock=secret_lock ) # add signatures of wallet1 - send_proofs = wallet1.add_signature_witnesses_to_proofs(send_proofs) + send_proofs = wallet1.sign_p2pk_sig_inputs(send_proofs) # here we add the signatures of wallet2 await wallet2.redeem(send_proofs) @@ -289,10 +344,65 @@ async def test_p2pk_multisig_duplicate_signature(wallet1: Wallet, wallet2: Walle wallet1.proofs, 8, secret_lock=secret_lock ) # add signatures of wallet2 – this is a duplicate signature - send_proofs = wallet2.add_signature_witnesses_to_proofs(send_proofs) + send_proofs = wallet2.sign_p2pk_sig_inputs(send_proofs) + # wallet does not add a second signature if it finds its own signature already in the witness + await assert_err( + wallet2.redeem(send_proofs), + "Mint Error: not enough pubkeys (2) or signatures (1) present for n_sigs (2).", + ) + + +@pytest.mark.asyncio +async def test_p2pk_multisig_two_signatures_same_pubkey( + wallet1: Wallet, wallet2: Wallet +): + # we generate two different signatures from the same private key + mint_quote = await wallet2.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet2.mint(64, quote_id=mint_quote.quote) + pubkey_wallet1 = await wallet1.create_p2pk_pubkey() + pubkey_wallet2 = await wallet2.create_p2pk_pubkey() + assert pubkey_wallet1 != pubkey_wallet2 + # p2pk test + secret_lock = await wallet1.create_p2pk_lock( + pubkey_wallet2, tags=Tags([["pubkeys", pubkey_wallet1]]), n_sigs=2 + ) + + _, send_proofs = await wallet2.swap_to_send( + wallet2.proofs, 1, secret_lock=secret_lock + ) + assert len(send_proofs) == 1 + proof = send_proofs[0] + # create coincurve private key so we can sign the message + coincurve_privatekey2 = CoincurvePrivateKey( + bytes.fromhex(wallet2.private_key.serialize()) + ) + # check if private keys are the same + assert coincurve_privatekey2.to_hex() == wallet2.private_key.serialize() + + msg = hashlib.sha256(proof.secret.encode("utf-8")).digest() + coincurve_signature = coincurve_privatekey2.sign_schnorr(msg) + + # add signatures of wallet2 – this is a duplicate signature + send_proofs = wallet2.sign_p2pk_sig_inputs(send_proofs) + + # the signatures from coincurve are not the same as the ones from wallet2 + assert coincurve_signature.hex() != proof.p2pksigs[0] + + # verify both signatures: + assert PublicKey(bytes.fromhex(pubkey_wallet2), raw=True).schnorr_verify( + msg, bytes.fromhex(proof.p2pksigs[0]), None, raw=True + ) + assert PublicKey(bytes.fromhex(pubkey_wallet2), raw=True).schnorr_verify( + msg, coincurve_signature, None, raw=True + ) + + # add coincurve signature, and the wallet2 signature will be added during .redeem + send_proofs[0].witness = P2PKWitness(signatures=[coincurve_signature.hex()]).json() + # here we add the signatures of wallet2 await assert_err( - wallet2.redeem(send_proofs), "Mint Error: signatures must be unique." + wallet2.redeem(send_proofs), "Mint Error: signature threshold not met. 1 < 2." ) @@ -313,7 +423,7 @@ async def test_p2pk_multisig_quorum_not_met_1_of_2(wallet1: Wallet, wallet2: Wal ) await assert_err( wallet2.redeem(send_proofs), - "Mint Error: not enough signatures provided: 1 < 2.", + "Mint Error: not enough pubkeys (2) or signatures (1) present for n_sigs (2)", ) @@ -324,21 +434,26 @@ async def test_p2pk_multisig_quorum_not_met_2_of_3(wallet1: Wallet, wallet2: Wal await wallet1.mint(64, quote_id=mint_quote.quote) pubkey_wallet1 = await wallet1.create_p2pk_pubkey() pubkey_wallet2 = await wallet2.create_p2pk_pubkey() + garbage_priv = PrivateKey(secrets.token_bytes(32), raw=True) + garbage_pubkey = garbage_priv.pubkey + assert garbage_pubkey assert pubkey_wallet1 != pubkey_wallet2 # p2pk test secret_lock = await wallet1.create_p2pk_lock( - pubkey_wallet2, tags=Tags([["pubkeys", pubkey_wallet1]]), n_sigs=3 + pubkey_wallet2, + tags=Tags([["pubkeys", pubkey_wallet1, garbage_pubkey.serialize().hex()]]), + n_sigs=3, ) - + # create locked proofs _, send_proofs = await wallet1.swap_to_send( wallet1.proofs, 8, secret_lock=secret_lock ) # add signatures of wallet1 - send_proofs = wallet1.add_signature_witnesses_to_proofs(send_proofs) + send_proofs = wallet1.sign_p2pk_sig_inputs(send_proofs) # here we add the signatures of wallet2 await assert_err( wallet2.redeem(send_proofs), - "Mint Error: not enough signatures provided: 2 < 3.", + "Mint Error: not enough pubkeys (3) or signatures (2) present for n_sigs (3)", ) @@ -380,10 +495,9 @@ async def test_p2pk_multisig_with_wrong_first_private_key( _, send_proofs = await wallet1.swap_to_send( wallet1.proofs, 8, secret_lock=secret_lock ) - # add signatures of wallet1 - send_proofs = wallet1.add_signature_witnesses_to_proofs(send_proofs) await assert_err( - wallet2.redeem(send_proofs), "Mint Error: signature threshold not met. 1 < 2." + wallet2.redeem(send_proofs), + "Mint Error: not enough pubkeys (2) or signatures (1) present for n_sigs (2).", ) @@ -412,7 +526,7 @@ async def test_secret_initialized_with_tags(wallet1: Wallet): pubkey = PrivateKey().pubkey assert pubkey secret = await wallet1.create_p2pk_lock( - pubkey=pubkey.serialize().hex(), + data=pubkey.serialize().hex(), tags=tags, ) assert secret.locktime == 100 @@ -425,7 +539,7 @@ async def test_secret_initialized_with_arguments(wallet1: Wallet): pubkey = PrivateKey().pubkey assert pubkey secret = await wallet1.create_p2pk_lock( - pubkey=pubkey.serialize().hex(), + data=pubkey.serialize().hex(), locktime_seconds=100, n_sigs=3, sig_all=True, @@ -434,3 +548,157 @@ async def test_secret_initialized_with_arguments(wallet1: Wallet): assert secret.locktime > 1689000000 assert secret.n_sigs == 3 assert secret.sigflag == SigFlags.SIG_ALL + + +@pytest.mark.asyncio +async def test_wallet_verify_is_p2pk_input(wallet1: Wallet1): + """Test the wallet correctly identifies P2PK inputs.""" + # Mint tokens to the wallet + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Create a p2pk lock with wallet's own public key + pubkey = await wallet1.create_p2pk_pubkey() + secret_lock = await wallet1.create_p2pk_lock(pubkey) + + # Use swap_to_send to create p2pk locked proofs + _, send_proofs = await wallet1.swap_to_send( + wallet1.proofs, 32, secret_lock=secret_lock + ) + + # Now get a proof and check if it's detected as P2PK + proof = send_proofs[0] + + # This tests the internal method that recognizes a P2PK input + secret = Secret.deserialize(proof.secret) + assert secret.kind == SecretKind.P2PK.value, "Secret should be of kind P2PK" + + # We can verify that we can convert it to a P2PKSecret + p2pk_secret = P2PKSecret.from_secret(secret) + assert p2pk_secret.data == pubkey, "P2PK secret data should contain the pubkey" + + +@pytest.mark.asyncio +async def test_wallet_verify_p2pk_sigflag_is_sig_inputs(wallet1: Wallet1): + """Test the wallet correctly identifies the SIG_INPUTS flag.""" + # Mint tokens to the wallet + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Create a p2pk lock with SIG_INPUTS (default) + pubkey = await wallet1.create_p2pk_pubkey() + secret_lock = await wallet1.create_p2pk_lock(pubkey, sig_all=False) + + # Use swap_to_send to create p2pk locked proofs + _, send_proofs = await wallet1.swap_to_send( + wallet1.proofs, 32, secret_lock=secret_lock + ) + + # Check if sigflag is correctly identified as SIG_INPUTS + proof = send_proofs[0] + secret = Secret.deserialize(proof.secret) + p2pk_secret = P2PKSecret.from_secret(secret) + + assert p2pk_secret.sigflag == SigFlags.SIG_INPUTS, "Sigflag should be SIG_INPUTS" + + +@pytest.mark.asyncio +async def test_wallet_verify_p2pk_sigflag_is_sig_all(wallet1: Wallet1): + """Test the wallet correctly identifies the SIG_ALL flag.""" + # Mint tokens to the wallet + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Create a p2pk lock with SIG_ALL + pubkey = await wallet1.create_p2pk_pubkey() + secret_lock = await wallet1.create_p2pk_lock(pubkey, sig_all=True) + + # Use swap_to_send to create p2pk locked proofs + _, send_proofs = await wallet1.swap_to_send( + wallet1.proofs, 32, secret_lock=secret_lock + ) + + # Check if sigflag is correctly identified as SIG_ALL + proof = send_proofs[0] + secret = Secret.deserialize(proof.secret) + p2pk_secret = P2PKSecret.from_secret(secret) + + assert p2pk_secret.sigflag == SigFlags.SIG_ALL, "Sigflag should be SIG_ALL" + + +@pytest.mark.asyncio +async def test_p2pk_locktime_with_3_of_3_refund_pubkeys( + wallet1: Wallet, wallet2: Wallet +): + """Testing the case where we expect a 3-of-3 signature from the refund pubkeys""" + # Create a third wallet for this test + wallet3 = await Wallet.with_db( + SERVER_ENDPOINT, "test_data/wallet_p2pk_3", "wallet3" + ) + await migrate_databases(wallet3.db, migrations) + wallet3.private_key = PrivateKey(secrets.token_bytes(32), raw=True) + await wallet3.load_mint() + + # Get tokens and create public keys for all wallets + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + pubkey_wallet1 = await wallet1.create_p2pk_pubkey() + pubkey_wallet2 = await wallet2.create_p2pk_pubkey() + pubkey_wallet3 = await wallet3.create_p2pk_pubkey() + + # Create an unspendable lock with refund conditions requiring 3 signatures + garbage_pubkey = PrivateKey().pubkey + assert garbage_pubkey + secret_lock = await wallet1.create_p2pk_lock( + garbage_pubkey.serialize().hex(), # create lock to unspendable pubkey + locktime_seconds=2, # locktime + tags=Tags( + [ + ["refund", pubkey_wallet1, pubkey_wallet2, pubkey_wallet3], + ["n_sigs_refund", "3"], + ], + ), # multiple refund pubkeys with required 3 signatures + ) + + # Send tokens with this lock + _, send_proofs = await wallet1.swap_to_send( + wallet1.proofs, 8, secret_lock=secret_lock + ) + + # Create copies for different test scenarios + send_proofs_copy1 = copy.deepcopy(send_proofs) + send_proofs_copy2 = copy.deepcopy(send_proofs) + + # Verify tokens can't be redeemed before locktime + await assert_err( + wallet1.redeem(send_proofs), + "Mint Error: signature threshold not met. 0 < 1.", + ) + + # Wait for locktime to expire + await asyncio.sleep(2) + + # Try with only 1 signature (wallet1) - should fail + await assert_err( + wallet1.redeem(send_proofs_copy1), + "not enough pubkeys (3) or signatures (1) present for n_sigs (3)", + ) + + # Add second signature (wallet2) + send_proofs_copy2 = wallet2.sign_p2pk_sig_inputs(send_proofs_copy2) + + # Try with 2 signatures - should still fail + await assert_err( + wallet1.redeem(send_proofs_copy2), + "not enough pubkeys (3) or signatures (2) present for n_sigs (3)", + ) + + # Add the third signature (wallet3) + send_proofs_copy2 = wallet3.sign_p2pk_sig_inputs(send_proofs_copy2) + + # Now with 3 signatures it should succeed + await wallet1.redeem(send_proofs_copy2) diff --git a/tests/test_wallet_p2pk_methods.py b/tests/test_wallet_p2pk_methods.py new file mode 100644 index 0000000..a6c8f9a --- /dev/null +++ b/tests/test_wallet_p2pk_methods.py @@ -0,0 +1,468 @@ +import copy +import hashlib +import secrets + +import pytest +import pytest_asyncio + +from cashu.core.base import P2PKWitness +from cashu.core.crypto.secp import PrivateKey +from cashu.core.migrations import migrate_databases +from cashu.core.p2pk import P2PKSecret, SigFlags +from cashu.core.secret import SecretKind, Tags +from cashu.wallet import migrations +from cashu.wallet.wallet import Wallet +from tests.conftest import SERVER_ENDPOINT +from tests.helpers import pay_if_regtest + + +async def assert_err(f, msg): + """Compute f() and expect an error message 'msg'.""" + try: + await f + except Exception as exc: + if msg not in str(exc.args[0]): + raise Exception(f"Expected error: {msg}, got: {exc.args[0]}") + return + raise Exception(f"Expected error: {msg}, got no error") + + +@pytest_asyncio.fixture(scope="function") +async def wallet1(): + wallet1 = await Wallet.with_db( + SERVER_ENDPOINT, "test_data/wallet_p2pk_methods_1", "wallet1" + ) + await migrate_databases(wallet1.db, migrations) + await wallet1.load_mint() + yield wallet1 + + +@pytest_asyncio.fixture(scope="function") +async def wallet2(): + wallet2 = await Wallet.with_db( + SERVER_ENDPOINT, "test_data/wallet_p2pk_methods_2", "wallet2" + ) + await migrate_databases(wallet2.db, migrations) + wallet2.private_key = PrivateKey(secrets.token_bytes(32), raw=True) + await wallet2.load_mint() + yield wallet2 + + +@pytest.mark.asyncio +async def test_create_p2pk_lock_default(wallet1: Wallet): + """Test creating a P2PK lock with default parameters.""" + pubkey = await wallet1.create_p2pk_pubkey() + secret_lock = await wallet1.create_p2pk_lock(pubkey) + + # Verify created lock properties + assert isinstance(secret_lock, P2PKSecret) + assert secret_lock.kind == SecretKind.P2PK.value + assert secret_lock.data == pubkey + assert secret_lock.locktime is None + assert secret_lock.sigflag == SigFlags.SIG_INPUTS + assert secret_lock.n_sigs == 1 + + +@pytest.mark.asyncio +async def test_create_p2pk_lock_with_options(wallet1: Wallet): + """Test creating a P2PK lock with all options specified.""" + pubkey = await wallet1.create_p2pk_pubkey() + secret_lock = await wallet1.create_p2pk_lock( + pubkey, + locktime_seconds=3600, + sig_all=True, + n_sigs=2, + tags=Tags([["custom_tag", "custom_value"]]), + ) + + # Verify created lock properties + assert isinstance(secret_lock, P2PKSecret) + assert secret_lock.kind == SecretKind.P2PK.value + assert secret_lock.data == pubkey + assert secret_lock.locktime is not None + assert secret_lock.sigflag == SigFlags.SIG_ALL + assert secret_lock.n_sigs == 2 + assert secret_lock.tags.get_tag("custom_tag") == "custom_value" + + +@pytest.mark.asyncio +async def test_signatures_proofs_sig_inputs(wallet1: Wallet): + """Test signing proofs with the private key.""" + # Mint tokens + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Create locked proofs + pubkey = await wallet1.create_p2pk_pubkey() + secret_lock = await wallet1.create_p2pk_lock(pubkey) + _, proofs = await wallet1.swap_to_send(wallet1.proofs, 32, secret_lock=secret_lock) + + # Test signatures_proofs_sig_inputs + signatures = wallet1.signatures_proofs_sig_inputs(proofs) + + # Verify signatures were created + assert len(signatures) == len(proofs) + assert all(isinstance(sig, str) for sig in signatures) + assert all(len(sig) == 128 for sig in signatures) # 64-byte hex signatures + + # Verify the signatures are valid + for proof, signature in zip(proofs, signatures): + message = proof.secret.encode("utf-8") + sig_bytes = bytes.fromhex(signature) + # Make sure wallet has a pubkey + assert wallet1.private_key.pubkey is not None + assert wallet1.private_key.pubkey.schnorr_verify( + hashlib.sha256(message).digest(), sig_bytes, None, raw=True + ) + + +@pytest.mark.asyncio +async def test_schnorr_sign_message(wallet1: Wallet): + """Test signing an arbitrary message.""" + # Define a test message + message = "test message to sign" + + # Sign the message + signature = wallet1.schnorr_sign_message(message) + + # Verify signature format + assert isinstance(signature, str) + assert len(signature) == 128 # 64-byte hex signature + + # Verify signature is valid + sig_bytes = bytes.fromhex(signature) + # Make sure wallet has a pubkey + assert wallet1.private_key.pubkey is not None + assert wallet1.private_key.pubkey.schnorr_verify( + hashlib.sha256(message.encode("utf-8")).digest(), sig_bytes, None, raw=True + ) + + +@pytest.mark.asyncio +async def test_inputs_require_sigall_detection(wallet1: Wallet): + """Test detection of SIG_ALL flag in proof inputs.""" + # Mint tokens + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Create proofs with SIG_INPUTS + pubkey = await wallet1.create_p2pk_pubkey() + secret_lock_inputs = await wallet1.create_p2pk_lock(pubkey, sig_all=False) + _, proofs_sig_inputs = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock_inputs + ) + + # Create proofs with SIG_ALL + mint_quote_2 = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote_2.request) + await wallet1.mint(64, quote_id=mint_quote_2.quote) + secret_lock_all = await wallet1.create_p2pk_lock(pubkey, sig_all=True) + _, proofs_sig_all = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock_all + ) + + # Test detection of SIG_ALL + assert not wallet1._inputs_require_sigall(proofs_sig_inputs) + assert wallet1._inputs_require_sigall(proofs_sig_all) + + # Test mixed list of proofs + mixed_proofs = proofs_sig_inputs + proofs_sig_all + assert wallet1._inputs_require_sigall(mixed_proofs) + + +@pytest.mark.asyncio +async def test_add_witness_swap_sig_all(wallet1: Wallet): + """Test adding a witness to the first proof for SIG_ALL.""" + # Mint tokens + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Create proofs with SIG_ALL + pubkey = await wallet1.create_p2pk_pubkey() + secret_lock = await wallet1.create_p2pk_lock(pubkey, sig_all=True) + _, proofs = await wallet1.swap_to_send(wallet1.proofs, 16, secret_lock=secret_lock) + + # Create some outputs + output_amounts = [16] + secrets, rs, _ = await wallet1.generate_n_secrets(len(output_amounts)) + outputs, _ = wallet1._construct_outputs(output_amounts, secrets, rs) + + # Add witness + signed_proofs = wallet1.add_witness_swap_sig_all(proofs, outputs) + + # Verify the first proof has a witness + assert signed_proofs[0].witness is not None + witness = P2PKWitness.from_witness(signed_proofs[0].witness) + assert len(witness.signatures) == 1 + + # Verify the signature includes both inputs and outputs + message_to_sign = "".join([p.secret for p in proofs] + [o.B_ for o in outputs]) + signature = wallet1.schnorr_sign_message(message_to_sign) + assert witness.signatures[0] == signature + + +@pytest.mark.asyncio +async def test_sign_proofs_inplace_swap(wallet1: Wallet): + """Test signing proofs in place for a swap operation.""" + # Mint tokens + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Create SIG_ALL proofs + pubkey = await wallet1.create_p2pk_pubkey() + secret_lock = await wallet1.create_p2pk_lock(pubkey, sig_all=True) + _, proofs = await wallet1.swap_to_send(wallet1.proofs, 16, secret_lock=secret_lock) + + # Create some outputs + output_amounts = [16] + secrets, rs, _ = await wallet1.generate_n_secrets(len(output_amounts)) + outputs, _ = wallet1._construct_outputs(output_amounts, secrets, rs) + + # Sign proofs + signed_proofs = wallet1.sign_proofs_inplace_swap(proofs, outputs) + + # Verify the first proof has a witness with a signature + assert signed_proofs[0].witness is not None + witness = P2PKWitness.from_witness(signed_proofs[0].witness) + assert len(witness.signatures) == 1 + + +@pytest.mark.asyncio +async def test_add_signatures_to_proofs(wallet1: Wallet): + """Test adding signatures to proofs.""" + # Mint tokens + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Create P2PK proofs + pubkey = await wallet1.create_p2pk_pubkey() + secret_lock = await wallet1.create_p2pk_lock(pubkey) + _, proofs = await wallet1.swap_to_send(wallet1.proofs, 16, secret_lock=secret_lock) + + # Generate signatures + signatures = wallet1.signatures_proofs_sig_inputs(proofs) + + # Add signatures to proofs + signed_proofs = wallet1.add_signatures_to_proofs(proofs, signatures) + + # Verify signatures were added to the proofs + for proof in signed_proofs: + assert proof.witness is not None + witness = P2PKWitness.from_witness(proof.witness) + assert len(witness.signatures) == 1 + + # Test adding same signatures to already signed proofs (should not duplicate) + signed_proofs = wallet1.add_signatures_to_proofs(signed_proofs, signatures) + + # Verify the signatures were not duplicated + for proof in signed_proofs: + assert proof.witness + witness = P2PKWitness.from_witness(proof.witness) + # Should still have 1 signature because duplicates aren't added + assert len(witness.signatures) == 1 + + +@pytest.mark.asyncio +async def test_filter_proofs_locked_to_our_pubkey(wallet1: Wallet, wallet2: Wallet): + """Test filtering proofs locked to our public key.""" + # Mint tokens to wallet1 + mint_quote = await wallet1.request_mint(640) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(640, quote_id=mint_quote.quote) + + # Get pubkeys for both wallets + pubkey1 = await wallet1.create_p2pk_pubkey() + pubkey2 = await wallet2.create_p2pk_pubkey() + + # Create proofs locked to wallet1's pubkey + secret_lock1 = await wallet1.create_p2pk_lock(pubkey1) + _, proofs1 = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock1 + ) + + # Create proofs locked to wallet2's pubkey + secret_lock2 = await wallet1.create_p2pk_lock(pubkey2) + _, proofs2 = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock2 + ) + + # Create proofs with multiple pubkeys + secret_lock3 = await wallet1.create_p2pk_lock( + pubkey1, tags=Tags([["pubkeys", pubkey2]]) + ) + _, proofs3 = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock3 + ) + + # Sign the proofs to avoid witness errors + signed_proofs1 = wallet1.sign_p2pk_sig_inputs(proofs1) + signed_proofs2 = wallet2.sign_p2pk_sig_inputs(proofs2) + signed_proofs3 = wallet1.sign_p2pk_sig_inputs(proofs3) + signed_proofs3 = wallet2.sign_p2pk_sig_inputs(signed_proofs3) + + # Ensure pubkeys are available + assert wallet1.private_key.pubkey is not None + assert wallet2.private_key.pubkey is not None + + # Filter using wallet1 + filtered1 = wallet1.filter_proofs_locked_to_our_pubkey( + signed_proofs1 + signed_proofs2 + signed_proofs3 + ) + # wallet1 should find proofs1 and proofs3 + assert len(filtered1) == len(signed_proofs1) + len(signed_proofs3) + + # Filter using wallet2 + filtered2 = wallet2.filter_proofs_locked_to_our_pubkey( + signed_proofs1 + signed_proofs2 + signed_proofs3 + ) + # wallet2 should find proofs2 and proofs3 + assert len(filtered2) == len(signed_proofs2) + len(signed_proofs3) + + +@pytest.mark.asyncio +async def test_sign_p2pk_sig_inputs(wallet1: Wallet): + """Test signing P2PK SIG_INPUTS proofs.""" + # Mint tokens + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Create a mix of P2PK and non-P2PK proofs + pubkey = await wallet1.create_p2pk_pubkey() + + # Regular proofs (not P2PK) + _, regular_proofs = await wallet1.swap_to_send(wallet1.proofs, 16) + + # P2PK SIG_INPUTS proofs + secret_lock_inputs = await wallet1.create_p2pk_lock(pubkey, sig_all=False) + _, p2pk_input_proofs = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock_inputs + ) + + # P2PK SIG_ALL proofs - these won't be signed by sign_p2pk_sig_inputs + secret_lock_all = await wallet1.create_p2pk_lock(pubkey, sig_all=True) + _, p2pk_all_proofs = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock_all + ) + + # P2PK locked to a different pubkey - these won't be signed + garbage_pubkey_p = PrivateKey().pubkey + assert garbage_pubkey_p is not None + garbage_pubkey = garbage_pubkey_p.serialize().hex() + secret_lock_other = await wallet1.create_p2pk_lock(garbage_pubkey) + _, p2pk_other_proofs = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock_other + ) + + # Mix all proofs + mixed_proofs = ( + regular_proofs + p2pk_input_proofs + p2pk_all_proofs + p2pk_other_proofs + ) + + # Sign the mixed proofs + signed_proofs = wallet1.sign_p2pk_sig_inputs(mixed_proofs) + + # Only P2PK SIG_INPUTS proofs locked to our pubkey should be signed + assert len(signed_proofs) == len(p2pk_input_proofs) + + # Verify the signatures were added + for proof in signed_proofs: + assert proof.witness is not None + witness = P2PKWitness.from_witness(proof.witness) + assert len(witness.signatures) == 1 + + +@pytest.mark.asyncio +async def test_add_witnesses_sig_inputs(wallet1: Wallet): + """Test adding witnesses to P2PK SIG_INPUTS proofs.""" + # Mint tokens + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Create a mix of P2PK and non-P2PK proofs + pubkey = await wallet1.create_p2pk_pubkey() + + # Regular proofs (not P2PK) + _, regular_proofs = await wallet1.swap_to_send(wallet1.proofs, 16) + + # P2PK SIG_INPUTS proofs + secret_lock_inputs = await wallet1.create_p2pk_lock(pubkey, sig_all=False) + _, p2pk_input_proofs = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock_inputs + ) + + # Mix all proofs and make a copy for comparison + mixed_proofs = regular_proofs + p2pk_input_proofs + mixed_proofs_copy = copy.deepcopy(mixed_proofs) + + # Add witnesses to the proofs + signed_proofs = wallet1.add_witnesses_sig_inputs(mixed_proofs) + + # Verify that only P2PK proofs have witnesses added + for i, (orig_proof, signed_proof) in enumerate( + zip(mixed_proofs_copy, signed_proofs) + ): + if i < len(regular_proofs): + # Regular proofs should be unchanged + assert signed_proof.witness == orig_proof.witness + else: + # P2PK proofs should have witnesses added + assert signed_proof.witness is not None + witness = P2PKWitness.from_witness(signed_proof.witness) + assert len(witness.signatures) == 1 + + +@pytest.mark.asyncio +async def test_edge_cases(wallet1: Wallet, wallet2: Wallet): + """Test various edge cases for the WalletP2PK methods.""" + # Mint tokens + mint_quote = await wallet1.request_mint(64) + await pay_if_regtest(mint_quote.request) + await wallet1.mint(64, quote_id=mint_quote.quote) + + # Case 1: Empty list of proofs + assert wallet1.signatures_proofs_sig_inputs([]) == [] + assert wallet1.add_signatures_to_proofs([], []) == [] + assert wallet1.filter_proofs_locked_to_our_pubkey([]) == [] + assert wallet1.sign_p2pk_sig_inputs([]) == [] + assert wallet1.add_witnesses_sig_inputs([]) == [] + + # Case 2: Mismatched number of proofs and signatures + pubkey = await wallet1.create_p2pk_pubkey() + secret_lock = await wallet1.create_p2pk_lock(pubkey) + _, proofs = await wallet1.swap_to_send(wallet1.proofs, 16, secret_lock=secret_lock) + assert len(proofs) == 1 + # Create fake signatures but we have only one proof - this should fail + signatures = ["fake_signature1", "fake_signature2"] + assert len(signatures) != len(proofs) + + # This should raise an assertion error + with pytest.raises(AssertionError, match="wrong number of signatures"): + wallet1.add_signatures_to_proofs(proofs, signatures) + + # Case 3: SIG_ALL with proofs locked to different public keys + assert wallet1.private_key.pubkey is not None + garbage_pubkey = PrivateKey().pubkey + assert garbage_pubkey is not None + secret_lock_other = await wallet1.create_p2pk_lock( + garbage_pubkey.serialize().hex(), sig_all=True + ) + _, other_proofs = await wallet1.swap_to_send( + wallet1.proofs, 16, secret_lock=secret_lock_other + ) + + output_amounts = [16] + secrets, rs, _ = await wallet1.generate_n_secrets(len(output_amounts)) + outputs, _ = wallet1._construct_outputs(output_amounts, secrets, rs) + + # wallet1 shouldn't add signatures because proofs are locked to a different pubkey + signed_proofs = wallet1.add_witness_swap_sig_all(other_proofs, outputs) + # Check each proof for None witness + for proof in signed_proofs: + assert proof.witness is None