import copy import time from typing import List import pytest import pytest_asyncio from cashu.core.base import BlindedMessage, P2PKWitness from cashu.core.migrations import migrate_databases from cashu.core.p2pk import P2PKSecret, SigFlags from cashu.core.secret import Secret, SecretKind, Tags from cashu.mint.ledger import Ledger from cashu.wallet import migrations from cashu.wallet.wallet import Wallet from tests.conftest import SERVER_ENDPOINT from tests.helpers import pay_if_regtest async def assert_err(f, msg): """Compute f() and expect an error message 'msg'.""" try: await f except Exception as exc: if msg not in str(exc.args[0]): raise Exception(f"Expected error: {msg}, got: {exc.args[0]}") return raise Exception(f"Expected error: {msg}, got no error") @pytest_asyncio.fixture(scope="function") async def wallet1(ledger: Ledger): wallet1 = await Wallet.with_db( url=SERVER_ENDPOINT, db="test_data/wallet1_p2pk_comprehensive", name="wallet1", ) await migrate_databases(wallet1.db, migrations) await wallet1.load_mint() yield wallet1 @pytest_asyncio.fixture(scope="function") async def wallet2(ledger: Ledger): wallet2 = await Wallet.with_db( url=SERVER_ENDPOINT, db="test_data/wallet2_p2pk_comprehensive", name="wallet2", ) await migrate_databases(wallet2.db, migrations) await wallet2.load_mint() yield wallet2 @pytest_asyncio.fixture(scope="function") async def wallet3(ledger: Ledger): wallet3 = await Wallet.with_db( url=SERVER_ENDPOINT, db="test_data/wallet3_p2pk_comprehensive", name="wallet3", ) await migrate_databases(wallet3.db, migrations) await wallet3.load_mint() yield wallet3 @pytest.mark.asyncio async def test_p2pk_sig_inputs_basic(wallet1: Wallet, wallet2: Wallet, ledger: Ledger): """Test basic P2PK with SIG_INPUTS.""" # Mint tokens to wallet1 mint_quote = await wallet1.request_mint(64) await pay_if_regtest(mint_quote.request) await wallet1.mint(64, quote_id=mint_quote.quote) # Verify wallet1 has tokens assert wallet1.balance == 64 # Create locked tokens from wallet1 to wallet2 pubkey_wallet2 = await wallet2.create_p2pk_pubkey() secret_lock = await wallet1.create_p2pk_lock(pubkey_wallet2) _, send_proofs = await wallet1.swap_to_send( wallet1.proofs, 16, secret_lock=secret_lock ) # Verify that sent tokens have P2PK secrets with SIG_INPUTS flag for proof in send_proofs: p2pk_secret = Secret.deserialize(proof.secret) assert p2pk_secret.kind == SecretKind.P2PK.value assert P2PKSecret.from_secret(p2pk_secret).sigflag == SigFlags.SIG_INPUTS # Try to redeem without signatures (should fail) unsigned_proofs = copy.deepcopy(send_proofs) for proof in unsigned_proofs: proof.witness = None await assert_err( ledger.swap( proofs=unsigned_proofs, outputs=await create_test_outputs(wallet2, 16) ), "Witness is missing for p2pk signature", ) # Redeem with proper signatures signed_proofs = wallet2.sign_p2pk_sig_inputs(send_proofs) assert all(p.witness is not None for p in signed_proofs) # Now swap should succeed outputs = await create_test_outputs(wallet2, 16) promises = await ledger.swap(proofs=signed_proofs, outputs=outputs) assert len(promises) == len(outputs) @pytest.mark.asyncio async def test_p2pk_sig_all_valid(wallet1: Wallet, wallet2: Wallet, ledger: Ledger): """Test P2PK with SIG_ALL where the signature covers both inputs and outputs.""" # Mint tokens to wallet1 mint_quote = await wallet1.request_mint(64) await pay_if_regtest(mint_quote.request) await wallet1.mint(64, quote_id=mint_quote.quote) # Create locked tokens with SIG_ALL pubkey_wallet2 = await wallet2.create_p2pk_pubkey() secret_lock = await wallet1.create_p2pk_lock(pubkey_wallet2, sig_all=True) _, send_proofs = await wallet1.swap_to_send( wallet1.proofs, 16, secret_lock=secret_lock ) # Verify that sent tokens have P2PK secrets with SIG_ALL flag for proof in send_proofs: p2pk_secret = Secret.deserialize(proof.secret) assert p2pk_secret.kind == SecretKind.P2PK.value assert P2PKSecret.from_secret(p2pk_secret).sigflag == SigFlags.SIG_ALL # Create outputs for redemption outputs = await create_test_outputs(wallet2, 16) # Create a message from concatenated inputs and outputs message_to_sign = "".join([p.secret for p in send_proofs] + [o.B_ for o in outputs]) # Sign with wallet2's private key signature = wallet2.schnorr_sign_message(message_to_sign) # Add the signature to the first proof only (since it's SIG_ALL) send_proofs[0].witness = P2PKWitness(signatures=[signature]).json() # Swap should succeed promises = await ledger.swap(proofs=send_proofs, outputs=outputs) assert len(promises) == len(outputs) @pytest.mark.asyncio async def test_p2pk_sig_all_invalid(wallet1: Wallet, wallet2: Wallet, ledger: Ledger): """Test P2PK with SIG_ALL where the signature is invalid.""" # Mint tokens to wallet1 mint_quote = await wallet1.request_mint(64) await pay_if_regtest(mint_quote.request) await wallet1.mint(64, quote_id=mint_quote.quote) # Create locked tokens with SIG_ALL pubkey_wallet2 = await wallet2.create_p2pk_pubkey() secret_lock = await wallet1.create_p2pk_lock(pubkey_wallet2, sig_all=True) _, send_proofs = await wallet1.swap_to_send( wallet1.proofs, 16, secret_lock=secret_lock ) # Create outputs for redemption outputs = await create_test_outputs(wallet2, 16) # Add an invalid signature fake_signature = "0" * 128 # Just a fake 64-byte hex string send_proofs[0].witness = P2PKWitness(signatures=[fake_signature]).json() # Swap should fail await assert_err( ledger.swap(proofs=send_proofs, outputs=outputs), "signature threshold not met" ) @pytest.mark.asyncio async def test_p2pk_sig_all_mixed(wallet1: Wallet, wallet2: Wallet, ledger: Ledger): """Test that attempting to use mixed SIG_ALL and SIG_INPUTS proofs fails.""" # Mint tokens to wallet1 mint_quote = await wallet1.request_mint(128) await pay_if_regtest(mint_quote.request) await wallet1.mint(128, quote_id=mint_quote.quote) # Create outputs outputs = await create_test_outputs(wallet2, 32) # 16 + 16 # Create a proof with SIG_ALL pubkey_wallet2 = await wallet2.create_p2pk_pubkey() secret_lock_all = await wallet1.create_p2pk_lock(pubkey_wallet2, sig_all=True) _, proofs_sig_all = await wallet1.swap_to_send( wallet1.proofs, 16, secret_lock=secret_lock_all ) # sign proofs_sig_all signed_proofs_sig_all = wallet2.add_witness_swap_sig_all(proofs_sig_all, outputs) # Mint more tokens to wallet1 for the SIG_INPUTS test mint_quote = await wallet1.request_mint(64) await pay_if_regtest(mint_quote.request) await wallet1.mint(64, quote_id=mint_quote.quote) # Create a proof with SIG_INPUTS secret_lock_inputs = await wallet1.create_p2pk_lock(pubkey_wallet2, sig_all=False) _, proofs_sig_inputs = await wallet1.swap_to_send( wallet1.proofs, 16, secret_lock=secret_lock_inputs ) # sign proofs_sig_inputs signed_proofs_sig_inputs = wallet2.sign_p2pk_sig_inputs(proofs_sig_inputs) # Combine the proofs mixed_proofs = signed_proofs_sig_all + signed_proofs_sig_inputs # Add an invalid signature to the SIG_ALL proof mixed_proofs[0].witness = P2PKWitness(signatures=["0" * 128]).json() # Try to use the mixed proofs (should fail) await assert_err( ledger.swap(proofs=mixed_proofs, outputs=outputs), "not all secrets are equal.", ) @pytest.mark.asyncio async def test_p2pk_multisig_2_of_3( wallet1: Wallet, wallet2: Wallet, wallet3: Wallet, ledger: Ledger ): """Test P2PK with 2-of-3 multisig.""" # Mint tokens to wallet1 mint_quote = await wallet1.request_mint(6400) await pay_if_regtest(mint_quote.request) await wallet1.mint(6400, quote_id=mint_quote.quote) # Get pubkeys from all wallets pubkey1 = await wallet1.create_p2pk_pubkey() pubkey2 = await wallet2.create_p2pk_pubkey() pubkey3 = await wallet3.create_p2pk_pubkey() # Create 2-of-3 multisig tokens locked to all three wallets tags = Tags([["pubkeys", pubkey2, pubkey3]]) secret_lock = await wallet1.create_p2pk_lock(pubkey1, tags=tags, n_sigs=2) _, send_proofs = await wallet1.swap_to_send( wallet1.proofs, 16, secret_lock=secret_lock ) # Create outputs for redemption outputs = await create_test_outputs(wallet1, 16) # Sign with wallet1 (first signature) signed_proofs = wallet1.sign_p2pk_sig_inputs(send_proofs) # Try to redeem with only 1 signature (should fail) await assert_err( ledger.swap(proofs=signed_proofs, outputs=outputs), "not enough pubkeys (3) or signatures (1) present for n_sigs (2).", ) # Mint new tokens for the second test mint_quote = await wallet1.request_mint(64) await pay_if_regtest(mint_quote.request) await wallet1.mint(64, quote_id=mint_quote.quote) # Create new locked tokens _, send_proofs2 = await wallet1.swap_to_send( wallet1.proofs, 16, secret_lock=secret_lock ) # Sign with wallet1 (first signature) signed_proofs2 = wallet1.sign_p2pk_sig_inputs(send_proofs2) # Add signature from wallet2 (second signature) signed_proofs2 = wallet2.sign_p2pk_sig_inputs(signed_proofs2) # Now redemption should succeed with 2 of 3 signatures # Create outputs for redemption outputs = await create_test_outputs(wallet1, 16) promises = await ledger.swap(proofs=signed_proofs2, outputs=outputs) assert len(promises) == len(outputs) # Mint new tokens for the third test mint_quote = await wallet1.request_mint(64) await pay_if_regtest(mint_quote.request) await wallet1.mint(64, quote_id=mint_quote.quote) # Create new locked tokens _, send_proofs3 = await wallet1.swap_to_send( wallet1.proofs, 16, secret_lock=secret_lock ) # Alternative: sign with wallet1 and wallet3 signed_proofs3 = wallet1.sign_p2pk_sig_inputs(send_proofs3) signed_proofs3 = wallet3.sign_p2pk_sig_inputs(signed_proofs3) # This should also succeed # Create outputs for redemption outputs = await create_test_outputs(wallet1, 16) promises2 = await ledger.swap(proofs=signed_proofs3, outputs=outputs) assert len(promises2) == len(outputs) @pytest.mark.asyncio async def test_p2pk_timelock(wallet1: Wallet, wallet2: Wallet, ledger: Ledger): """Test P2PK with a timelock that expires.""" # Mint tokens to wallet1 mint_quote = await wallet1.request_mint(64) await pay_if_regtest(mint_quote.request) await wallet1.mint(64, quote_id=mint_quote.quote) # Create tokens with a 2-second timelock pubkey_wallet2 = await wallet2.create_p2pk_pubkey() # Set a past timestamp to ensure test works consistently past_time = int(time.time()) - 10 tags = Tags([["locktime", str(past_time)]]) secret_lock = await wallet1.create_p2pk_lock(pubkey_wallet2, tags=tags) _, send_proofs = await wallet1.swap_to_send( wallet1.proofs, 16, secret_lock=secret_lock ) # Store current time to check if locktime passed locktime = 0 for proof in send_proofs: secret = Secret.deserialize(proof.secret) p2pk_secret = P2PKSecret.from_secret(secret) locktime = p2pk_secret.locktime # Create outputs outputs = await create_test_outputs(wallet1, 16) # Verify that current time is past the locktime assert locktime is not None, "Locktime should not be None" assert ( int(time.time()) > locktime ), f"Current time ({int(time.time())}) should be greater than locktime ({locktime})" # Try to redeem without signature after locktime (should succeed) unsigned_proofs = copy.deepcopy(send_proofs) for proof in unsigned_proofs: proof.witness = None promises = await ledger.swap(proofs=unsigned_proofs, outputs=outputs) assert len(promises) == len(outputs) @pytest.mark.asyncio async def test_p2pk_timelock_with_refund_before_locktime( wallet1: Wallet, wallet2: Wallet, wallet3: Wallet, ledger: Ledger ): """Test P2PK with a timelock and refund pubkeys before locktime.""" # Mint tokens to wallet1 mint_quote = await wallet1.request_mint(64) await pay_if_regtest(mint_quote.request) await wallet1.mint(64, quote_id=mint_quote.quote) # Get pubkeys pubkey_wallet2 = await wallet2.create_p2pk_pubkey() # Receiver pubkey_wallet3 = await wallet3.create_p2pk_pubkey() # Refund key # Create tokens with a 2-second timelock and refund key future_time = int(time.time()) + 60 # 60 seconds in the future refund_tags = Tags([["refund", pubkey_wallet3], ["locktime", str(future_time)]]) secret_lock = await wallet1.create_p2pk_lock(pubkey_wallet2, tags=refund_tags) _, send_proofs = await wallet1.swap_to_send( wallet1.proofs, 16, secret_lock=secret_lock ) # Create outputs outputs = await create_test_outputs(wallet1, 16) # Try to redeem without any signature before locktime (should fail) unsigned_proofs = copy.deepcopy(send_proofs) for proof in unsigned_proofs: proof.witness = None await assert_err( ledger.swap(proofs=unsigned_proofs, outputs=outputs), "Witness is missing for p2pk signature", ) # Try to redeem with refund key signature before locktime (should fail) refund_signed_proofs = wallet3.sign_p2pk_sig_inputs(send_proofs) await assert_err( ledger.swap(proofs=refund_signed_proofs, outputs=outputs), "signature threshold not met", # Refund key can't be used before locktime ) @pytest.mark.asyncio async def test_p2pk_timelock_with_receiver_signature( wallet1: Wallet, wallet2: Wallet, wallet3: Wallet, ledger: Ledger ): """Test P2PK with a timelock and refund pubkeys with receiver signature.""" # Mint tokens to wallet1 mint_quote = await wallet1.request_mint(64) await pay_if_regtest(mint_quote.request) await wallet1.mint(64, quote_id=mint_quote.quote) # Get pubkeys pubkey_wallet2 = await wallet2.create_p2pk_pubkey() # Receiver pubkey_wallet3 = await wallet3.create_p2pk_pubkey() # Refund key # Create tokens with a 2-second timelock and refund key future_time = int(time.time()) + 60 # 60 seconds in the future refund_tags = Tags([["refund", pubkey_wallet3], ["locktime", str(future_time)]]) secret_lock = await wallet1.create_p2pk_lock(pubkey_wallet2, tags=refund_tags) _, send_proofs = await wallet1.swap_to_send( wallet1.proofs, 16, secret_lock=secret_lock ) # Create outputs outputs = await create_test_outputs(wallet1, 16) # Try to redeem with the correct receiver signature (should succeed) receiver_signed_proofs = wallet2.sign_p2pk_sig_inputs(send_proofs) promises = await ledger.swap(proofs=receiver_signed_proofs, outputs=outputs) assert len(promises) == len(outputs) @pytest.mark.asyncio async def test_p2pk_timelock_with_refund_after_locktime( wallet1: Wallet, wallet2: Wallet, wallet3: Wallet, ledger: Ledger ): """Test P2PK with a timelock and refund pubkeys after locktime.""" # Mint tokens to wallet1 mint_quote = await wallet1.request_mint(64) await pay_if_regtest(mint_quote.request) await wallet1.mint(64, quote_id=mint_quote.quote) # Get pubkeys pubkey_wallet2 = await wallet2.create_p2pk_pubkey() # Receiver pubkey_wallet3 = await wallet3.create_p2pk_pubkey() # Refund key # Create tokens with a past timestamp for locktime testing past_time = int(time.time()) - 10 # 10 seconds in the past refund_tags_past = Tags([["refund", pubkey_wallet3], ["locktime", str(past_time)]]) secret_lock_past = await wallet1.create_p2pk_lock( pubkey_wallet2, tags=refund_tags_past ) _, send_proofs3 = await wallet1.swap_to_send( wallet1.proofs, 16, secret_lock=secret_lock_past ) # Try to redeem with refund key after locktime (should succeed) refund_signed_proofs2 = wallet3.sign_p2pk_sig_inputs(send_proofs3) # This should work because locktime has passed and refund key is used # Create outputs outputs = await create_test_outputs(wallet1, 16) promises2 = await ledger.swap(proofs=refund_signed_proofs2, outputs=outputs) assert len(promises2) == len(outputs) @pytest.mark.asyncio async def test_p2pk_n_sigs_refund( wallet1: Wallet, wallet2: Wallet, wallet3: Wallet, ledger: Ledger ): """Test P2PK with a timelock and multiple refund pubkeys with n_sigs_refund.""" # Mint tokens to wallet1 mint_quote = await wallet1.request_mint(64) await pay_if_regtest(mint_quote.request) await wallet1.mint(64, quote_id=mint_quote.quote) # Get pubkeys pubkey_wallet1 = await wallet1.create_p2pk_pubkey() # Receiver pubkey_wallet2 = await wallet2.create_p2pk_pubkey() # Refund key 1 pubkey_wallet3 = await wallet3.create_p2pk_pubkey() # Refund key 2 # Create tokens with a future timelock and 2-of-2 refund requirement future_time = int(time.time()) + 60 # 60 seconds in the future refund_tags = Tags( [ ["refund", pubkey_wallet2, pubkey_wallet3], ["n_sigs_refund", "2"], ["locktime", str(future_time)], ] ) secret_lock = await wallet1.create_p2pk_lock(pubkey_wallet1, tags=refund_tags) _, send_proofs = await wallet1.swap_to_send( wallet1.proofs, 16, secret_lock=secret_lock ) # Create outputs outputs = await create_test_outputs(wallet1, 16) # Mint new tokens for receiver test mint_quote = await wallet1.request_mint(64) await pay_if_regtest(mint_quote.request) await wallet1.mint(64, quote_id=mint_quote.quote) # Create new locked tokens _, send_proofs2 = await wallet1.swap_to_send( wallet1.proofs, 16, secret_lock=secret_lock ) # Try to redeem with receiver key (should succeed before locktime) receiver_signed_proofs = wallet1.sign_p2pk_sig_inputs(send_proofs2) promises = await ledger.swap(proofs=receiver_signed_proofs, outputs=outputs) assert len(promises) == len(outputs) # Mint new tokens for the refund test mint_quote = await wallet1.request_mint(64) await pay_if_regtest(mint_quote.request) await wallet1.mint(64, quote_id=mint_quote.quote) # Create tokens with a past locktime for refund testing past_time = int(time.time()) - 10 # 10 seconds in the past refund_tags_past = Tags( [ ["refund", pubkey_wallet2, pubkey_wallet3], ["n_sigs_refund", "2"], ["locktime", str(past_time)], ] ) secret_lock_past = await wallet1.create_p2pk_lock( pubkey_wallet1, tags=refund_tags_past ) _, send_proofs3 = await wallet1.swap_to_send( wallet1.proofs, 16, secret_lock=secret_lock_past ) # Try to redeem with only one refund key signature (should fail) refund_signed_proofs = wallet2.sign_p2pk_sig_inputs(send_proofs3) await assert_err( ledger.swap(proofs=refund_signed_proofs, outputs=outputs), "not enough pubkeys (2) or signatures (1) present for n_sigs (2).", ) # Mint new tokens for the final test mint_quote = await wallet1.request_mint(64) await pay_if_regtest(mint_quote.request) await wallet1.mint(64, quote_id=mint_quote.quote) # Create tokens with same past locktime _, send_proofs4 = await wallet1.swap_to_send( wallet1.proofs, 16, secret_lock=secret_lock_past ) # Add both refund signatures refund_signed_proofs2 = wallet2.sign_p2pk_sig_inputs(send_proofs4) refund_signed_proofs2 = wallet3.sign_p2pk_sig_inputs(refund_signed_proofs2) # Now it should succeed with 2-of-2 refund signatures # Create outputs outputs = await create_test_outputs(wallet1, 16) promises2 = await ledger.swap(proofs=refund_signed_proofs2, outputs=outputs) assert len(promises2) == len(outputs) @pytest.mark.asyncio async def test_p2pk_invalid_pubkey_check( wallet1: Wallet, wallet2: Wallet, ledger: Ledger ): """Test that an invalid public key is properly rejected.""" # Mint tokens to wallet1 mint_quote = await wallet1.request_mint(64) await pay_if_regtest(mint_quote.request) await wallet1.mint(64, quote_id=mint_quote.quote) # Create an invalid pubkey string (too short) invalid_pubkey = "03aaff" # Try to create a P2PK lock with invalid pubkey # This should fail in create_p2pk_lock, but if it doesn't, let's handle it gracefully try: secret_lock = await wallet1.create_p2pk_lock(invalid_pubkey) _, send_proofs = await wallet1.swap_to_send( wallet1.proofs, 16, secret_lock=secret_lock ) # Create outputs outputs = await create_test_outputs(wallet1, 16) # Verify it fails during validation await assert_err( ledger.swap(proofs=send_proofs, outputs=outputs), "failed to deserialize pubkey", # Generic error for pubkey issues ) except Exception as e: # If it fails during creation, that's fine too assert ( "pubkey" in str(e).lower() or "key" in str(e).lower() ), f"Expected error about invalid public key, got: {str(e)}" @pytest.mark.asyncio async def test_p2pk_sig_all_with_multiple_pubkeys( wallet1: Wallet, wallet2: Wallet, wallet3: Wallet, ledger: Ledger ): """Test SIG_ALL combined with multiple pubkeys/n_sigs.""" # Mint tokens to wallet1 mint_quote = await wallet1.request_mint(64) await pay_if_regtest(mint_quote.request) await wallet1.mint(64, quote_id=mint_quote.quote) # Get pubkeys pubkey1 = await wallet1.create_p2pk_pubkey() pubkey2 = await wallet2.create_p2pk_pubkey() pubkey3 = await wallet3.create_p2pk_pubkey() # Create tokens with SIG_ALL and 2-of-3 multisig tags = Tags([["pubkeys", pubkey2, pubkey3]]) secret_lock = await wallet1.create_p2pk_lock( pubkey1, tags=tags, n_sigs=2, sig_all=True ) _, send_proofs = await wallet1.swap_to_send( wallet1.proofs, 16, secret_lock=secret_lock ) # Create outputs outputs = await create_test_outputs(wallet1, 16) # Create message to sign (all inputs + all outputs) message_to_sign = "".join([p.secret for p in send_proofs] + [o.B_ for o in outputs]) # Sign with wallet1's key signature1 = wallet1.schnorr_sign_message(message_to_sign) # Sign with wallet2's key signature2 = wallet2.schnorr_sign_message(message_to_sign) # Add both signatures to the first proof only (SIG_ALL) send_proofs[0].witness = P2PKWitness(signatures=[signature1, signature2]).json() # This should succeed with 2 valid signatures promises = await ledger.swap(proofs=send_proofs, outputs=outputs) assert len(promises) == len(outputs) async def create_test_outputs(wallet: Wallet, amount: int) -> List[BlindedMessage]: """Helper to create blinded outputs for testing.""" output_amounts = [amount] secrets, rs, _ = await wallet.generate_n_secrets(len(output_amounts)) outputs, _ = wallet._construct_outputs(output_amounts, secrets, rs) return outputs