mirror of
https://github.com/aljazceru/nutshell.git
synced 2025-12-20 18:44:20 +01:00
* wallet_quotes_wip * fix quote in db * fix subscription test * clean up api * fix api tests * fix balance check
419 lines
16 KiB
Python
419 lines
16 KiB
Python
import asyncio
|
|
import hashlib
|
|
import secrets
|
|
from typing import List
|
|
|
|
import pytest
|
|
import pytest_asyncio
|
|
|
|
from cashu.core.base import HTLCWitness, Proof
|
|
from cashu.core.crypto.secp import PrivateKey
|
|
from cashu.core.htlc import HTLCSecret
|
|
from cashu.core.migrations import migrate_databases
|
|
from cashu.wallet import migrations
|
|
from cashu.wallet.wallet import Wallet
|
|
from cashu.wallet.wallet import Wallet as Wallet1
|
|
from cashu.wallet.wallet import Wallet as Wallet2
|
|
from tests.conftest import SERVER_ENDPOINT
|
|
from tests.helpers import pay_if_regtest
|
|
|
|
|
|
async def assert_err(f, msg):
|
|
"""Compute f() and expect an error message 'msg'."""
|
|
try:
|
|
await f
|
|
except Exception as exc:
|
|
if msg not in str(exc.args[0]):
|
|
raise Exception(f"Expected error: {msg}, got: {exc.args[0]}")
|
|
return
|
|
raise Exception(f"Expected error: {msg}, got no error")
|
|
|
|
|
|
def assert_amt(proofs: List[Proof], expected: int):
|
|
"""Assert amounts the proofs contain."""
|
|
assert [p.amount for p in proofs] == expected
|
|
|
|
|
|
@pytest_asyncio.fixture(scope="function")
|
|
async def wallet1():
|
|
wallet1 = await Wallet1.with_db(
|
|
SERVER_ENDPOINT, "test_data/wallet_p2pk_1", "wallet1"
|
|
)
|
|
await migrate_databases(wallet1.db, migrations)
|
|
await wallet1.load_mint()
|
|
yield wallet1
|
|
|
|
|
|
@pytest_asyncio.fixture(scope="function")
|
|
async def wallet2():
|
|
wallet2 = await Wallet2.with_db(
|
|
SERVER_ENDPOINT, "test_data/wallet_p2pk_2", "wallet2"
|
|
)
|
|
await migrate_databases(wallet2.db, migrations)
|
|
wallet2.private_key = PrivateKey(secrets.token_bytes(32), raw=True)
|
|
await wallet2.load_mint()
|
|
yield wallet2
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_create_htlc_secret(wallet1: Wallet):
|
|
mint_quote = await wallet1.request_mint(64)
|
|
await pay_if_regtest(mint_quote.request)
|
|
await wallet1.mint(64, quote_id=mint_quote.quote)
|
|
preimage = "00000000000000000000000000000000"
|
|
preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest()
|
|
secret = await wallet1.create_htlc_lock(preimage=preimage)
|
|
assert secret.data == preimage_hash
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_htlc_split(wallet1: Wallet, wallet2: Wallet):
|
|
mint_quote = await wallet1.request_mint(64)
|
|
await pay_if_regtest(mint_quote.request)
|
|
await wallet1.mint(64, quote_id=mint_quote.quote)
|
|
preimage = "00000000000000000000000000000000"
|
|
preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest()
|
|
secret = await wallet1.create_htlc_lock(preimage=preimage)
|
|
_, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret)
|
|
for p in send_proofs:
|
|
assert HTLCSecret.deserialize(p.secret).data == preimage_hash
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_htlc_redeem_with_preimage(wallet1: Wallet, wallet2: Wallet):
|
|
mint_quote = await wallet1.request_mint(64)
|
|
await pay_if_regtest(mint_quote.request)
|
|
await wallet1.mint(64, quote_id=mint_quote.quote)
|
|
preimage = "00000000000000000000000000000000"
|
|
# preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest()
|
|
secret = await wallet1.create_htlc_lock(preimage=preimage)
|
|
_, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret)
|
|
for p in send_proofs:
|
|
p.witness = HTLCWitness(preimage=preimage).json()
|
|
await wallet2.redeem(send_proofs)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_htlc_redeem_with_wrong_preimage(wallet1: Wallet, wallet2: Wallet):
|
|
mint_quote = await wallet1.request_mint(64)
|
|
await pay_if_regtest(mint_quote.request)
|
|
await wallet1.mint(64, quote_id=mint_quote.quote)
|
|
preimage = "00000000000000000000000000000000"
|
|
# preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest()
|
|
secret = await wallet1.create_htlc_lock(
|
|
preimage=f"{preimage[:-5]}11111"
|
|
) # wrong preimage
|
|
_, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret)
|
|
for p in send_proofs:
|
|
p.witness = HTLCWitness(preimage=preimage).json()
|
|
await assert_err(
|
|
wallet2.redeem(send_proofs), "Mint Error: HTLC preimage does not match"
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_htlc_redeem_with_no_signature(wallet1: Wallet, wallet2: Wallet):
|
|
mint_quote = await wallet1.request_mint(64)
|
|
await pay_if_regtest(mint_quote.request)
|
|
await wallet1.mint(64, quote_id=mint_quote.quote)
|
|
preimage = "00000000000000000000000000000000"
|
|
pubkey_wallet1 = await wallet1.create_p2pk_pubkey()
|
|
# preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest()
|
|
secret = await wallet1.create_htlc_lock(
|
|
preimage=preimage, hashlock_pubkeys=[pubkey_wallet1]
|
|
)
|
|
_, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret)
|
|
for p in send_proofs:
|
|
p.witness = HTLCWitness(preimage=preimage).json()
|
|
await assert_err(
|
|
wallet2.redeem(send_proofs),
|
|
"Mint Error: no signatures in proof.",
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_htlc_redeem_with_wrong_signature(wallet1: Wallet, wallet2: Wallet):
|
|
mint_quote = await wallet1.request_mint(64)
|
|
await pay_if_regtest(mint_quote.request)
|
|
await wallet1.mint(64, quote_id=mint_quote.quote)
|
|
preimage = "00000000000000000000000000000000"
|
|
pubkey_wallet1 = await wallet1.create_p2pk_pubkey()
|
|
# preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest()
|
|
secret = await wallet1.create_htlc_lock(
|
|
preimage=preimage, hashlock_pubkeys=[pubkey_wallet1]
|
|
)
|
|
_, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret)
|
|
signatures = wallet1.sign_proofs(send_proofs)
|
|
for p, s in zip(send_proofs, signatures):
|
|
p.witness = HTLCWitness(
|
|
preimage=preimage, signatures=[f"{s[:-5]}11111"]
|
|
).json() # wrong signature
|
|
|
|
await assert_err(
|
|
wallet2.redeem(send_proofs),
|
|
"Mint Error: no valid signature provided for input.",
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_htlc_redeem_with_correct_signature(wallet1: Wallet, wallet2: Wallet):
|
|
mint_quote = await wallet1.request_mint(64)
|
|
await pay_if_regtest(mint_quote.request)
|
|
await wallet1.mint(64, quote_id=mint_quote.quote)
|
|
preimage = "00000000000000000000000000000000"
|
|
pubkey_wallet1 = await wallet1.create_p2pk_pubkey()
|
|
# preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest()
|
|
secret = await wallet1.create_htlc_lock(
|
|
preimage=preimage, hashlock_pubkeys=[pubkey_wallet1]
|
|
)
|
|
_, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret)
|
|
|
|
signatures = wallet1.sign_proofs(send_proofs)
|
|
for p, s in zip(send_proofs, signatures):
|
|
p.witness = HTLCWitness(preimage=preimage, signatures=[s]).json()
|
|
|
|
await wallet2.redeem(send_proofs)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_htlc_redeem_with_2_of_1_signatures(wallet1: Wallet, wallet2: Wallet):
|
|
mint_quote = await wallet1.request_mint(64)
|
|
await pay_if_regtest(mint_quote.request)
|
|
await wallet1.mint(64, quote_id=mint_quote.quote)
|
|
preimage = "00000000000000000000000000000000"
|
|
pubkey_wallet1 = await wallet1.create_p2pk_pubkey()
|
|
pubkey_wallet2 = await wallet2.create_p2pk_pubkey()
|
|
# preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest()
|
|
secret = await wallet1.create_htlc_lock(
|
|
preimage=preimage,
|
|
hashlock_pubkeys=[pubkey_wallet1, pubkey_wallet2],
|
|
hashlock_n_sigs=1,
|
|
)
|
|
_, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret)
|
|
|
|
signatures1 = wallet1.sign_proofs(send_proofs)
|
|
signatures2 = wallet2.sign_proofs(send_proofs)
|
|
for p, s1, s2 in zip(send_proofs, signatures1, signatures2):
|
|
p.witness = HTLCWitness(preimage=preimage, signatures=[s1, s2]).json()
|
|
|
|
await wallet2.redeem(send_proofs)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_htlc_redeem_with_2_of_2_signatures(wallet1: Wallet, wallet2: Wallet):
|
|
mint_quote = await wallet1.request_mint(64)
|
|
await pay_if_regtest(mint_quote.request)
|
|
await wallet1.mint(64, quote_id=mint_quote.quote)
|
|
preimage = "00000000000000000000000000000000"
|
|
pubkey_wallet1 = await wallet1.create_p2pk_pubkey()
|
|
pubkey_wallet2 = await wallet2.create_p2pk_pubkey()
|
|
# preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest()
|
|
secret = await wallet1.create_htlc_lock(
|
|
preimage=preimage,
|
|
hashlock_pubkeys=[pubkey_wallet1, pubkey_wallet2],
|
|
hashlock_n_sigs=2,
|
|
)
|
|
_, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret)
|
|
|
|
signatures1 = wallet1.sign_proofs(send_proofs)
|
|
signatures2 = wallet2.sign_proofs(send_proofs)
|
|
for p, s1, s2 in zip(send_proofs, signatures1, signatures2):
|
|
p.witness = HTLCWitness(preimage=preimage, signatures=[s1, s2]).json()
|
|
|
|
await wallet2.redeem(send_proofs)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_htlc_redeem_with_2_of_2_signatures_with_duplicate_pubkeys(
|
|
wallet1: Wallet, wallet2: Wallet
|
|
):
|
|
mint_quote = await wallet1.request_mint(64)
|
|
await pay_if_regtest(mint_quote.request)
|
|
await wallet1.mint(64, quote_id=mint_quote.quote)
|
|
preimage = "00000000000000000000000000000000"
|
|
pubkey_wallet1 = await wallet1.create_p2pk_pubkey()
|
|
pubkey_wallet2 = pubkey_wallet1
|
|
# preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest()
|
|
secret = await wallet1.create_htlc_lock(
|
|
preimage=preimage,
|
|
hashlock_pubkeys=[pubkey_wallet1, pubkey_wallet2],
|
|
hashlock_n_sigs=2,
|
|
)
|
|
_, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret)
|
|
|
|
signatures1 = wallet1.sign_proofs(send_proofs)
|
|
signatures2 = wallet2.sign_proofs(send_proofs)
|
|
for p, s1, s2 in zip(send_proofs, signatures1, signatures2):
|
|
p.witness = HTLCWitness(preimage=preimage, signatures=[s1, s2]).json()
|
|
|
|
await assert_err(
|
|
wallet2.redeem(send_proofs),
|
|
"Mint Error: pubkeys must be unique.",
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_htlc_redeem_with_3_of_3_signatures_but_only_2_provided(
|
|
wallet1: Wallet, wallet2: Wallet
|
|
):
|
|
mint_quote = await wallet1.request_mint(64)
|
|
await pay_if_regtest(mint_quote.request)
|
|
await wallet1.mint(64, quote_id=mint_quote.quote)
|
|
preimage = "00000000000000000000000000000000"
|
|
pubkey_wallet1 = await wallet1.create_p2pk_pubkey()
|
|
pubkey_wallet2 = await wallet2.create_p2pk_pubkey()
|
|
# preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest()
|
|
secret = await wallet1.create_htlc_lock(
|
|
preimage=preimage,
|
|
hashlock_pubkeys=[pubkey_wallet1, pubkey_wallet2],
|
|
hashlock_n_sigs=3,
|
|
)
|
|
_, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret)
|
|
|
|
signatures1 = wallet1.sign_proofs(send_proofs)
|
|
signatures2 = wallet2.sign_proofs(send_proofs)
|
|
for p, s1, s2 in zip(send_proofs, signatures1, signatures2):
|
|
p.witness = HTLCWitness(preimage=preimage, signatures=[s1, s2]).json()
|
|
|
|
await assert_err(
|
|
wallet2.redeem(send_proofs),
|
|
"Mint Error: not enough signatures provided: 2 < 3.",
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_htlc_redeem_with_2_of_3_signatures_with_2_valid_and_1_invalid_provided(
|
|
wallet1: Wallet, wallet2: Wallet
|
|
):
|
|
mint_quote = await wallet1.request_mint(64)
|
|
await pay_if_regtest(mint_quote.request)
|
|
await wallet1.mint(64, quote_id=mint_quote.quote)
|
|
preimage = "00000000000000000000000000000000"
|
|
pubkey_wallet1 = await wallet1.create_p2pk_pubkey()
|
|
pubkey_wallet2 = await wallet2.create_p2pk_pubkey()
|
|
privatekey_wallet3 = PrivateKey(secrets.token_bytes(32), raw=True)
|
|
assert privatekey_wallet3.pubkey
|
|
pubkey_wallet3 = privatekey_wallet3.pubkey.serialize().hex()
|
|
|
|
# preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest()
|
|
secret = await wallet1.create_htlc_lock(
|
|
preimage=preimage,
|
|
hashlock_pubkeys=[pubkey_wallet1, pubkey_wallet2, pubkey_wallet3],
|
|
hashlock_n_sigs=2,
|
|
)
|
|
_, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret)
|
|
|
|
signatures1 = wallet1.sign_proofs(send_proofs)
|
|
signatures2 = wallet2.sign_proofs(send_proofs)
|
|
signatures3 = [f"{s[:-5]}11111" for s in signatures1] # wrong signature
|
|
for p, s1, s2, s3 in zip(send_proofs, signatures1, signatures2, signatures3):
|
|
p.witness = HTLCWitness(preimage=preimage, signatures=[s1, s2, s3]).json()
|
|
|
|
await wallet2.redeem(send_proofs)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_htlc_redeem_with_3_of_3_signatures_with_2_valid_and_1_invalid_provided(
|
|
wallet1: Wallet, wallet2: Wallet
|
|
):
|
|
mint_quote = await wallet1.request_mint(64)
|
|
await pay_if_regtest(mint_quote.request)
|
|
await wallet1.mint(64, quote_id=mint_quote.quote)
|
|
preimage = "00000000000000000000000000000000"
|
|
pubkey_wallet1 = await wallet1.create_p2pk_pubkey()
|
|
pubkey_wallet2 = await wallet2.create_p2pk_pubkey()
|
|
privatekey_wallet3 = PrivateKey(secrets.token_bytes(32), raw=True)
|
|
assert privatekey_wallet3.pubkey
|
|
pubkey_wallet3 = privatekey_wallet3.pubkey.serialize().hex()
|
|
|
|
# preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest()
|
|
secret = await wallet1.create_htlc_lock(
|
|
preimage=preimage,
|
|
hashlock_pubkeys=[pubkey_wallet1, pubkey_wallet2, pubkey_wallet3],
|
|
hashlock_n_sigs=3,
|
|
)
|
|
_, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret)
|
|
|
|
signatures1 = wallet1.sign_proofs(send_proofs)
|
|
signatures2 = wallet2.sign_proofs(send_proofs)
|
|
signatures3 = [f"{s[:-5]}11111" for s in signatures1] # wrong signature
|
|
for p, s1, s2, s3 in zip(send_proofs, signatures1, signatures2, signatures3):
|
|
p.witness = HTLCWitness(preimage=preimage, signatures=[s1, s2, s3]).json()
|
|
|
|
await assert_err(
|
|
wallet2.redeem(send_proofs), "Mint Error: signature threshold not met. 2 < 3."
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_htlc_redeem_hashlock_wrong_signature_timelock_correct_signature(
|
|
wallet1: Wallet, wallet2: Wallet
|
|
):
|
|
mint_quote = await wallet1.request_mint(64)
|
|
await pay_if_regtest(mint_quote.request)
|
|
await wallet1.mint(64, quote_id=mint_quote.quote)
|
|
preimage = "00000000000000000000000000000000"
|
|
pubkey_wallet1 = await wallet1.create_p2pk_pubkey()
|
|
pubkey_wallet2 = await wallet2.create_p2pk_pubkey()
|
|
# preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest()
|
|
secret = await wallet1.create_htlc_lock(
|
|
preimage=preimage,
|
|
hashlock_pubkeys=[pubkey_wallet2],
|
|
locktime_seconds=2,
|
|
locktime_pubkeys=[pubkey_wallet1],
|
|
)
|
|
_, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret)
|
|
|
|
signatures = wallet1.sign_proofs(send_proofs)
|
|
for p, s in zip(send_proofs, signatures):
|
|
p.witness = HTLCWitness(preimage=preimage, signatures=[s]).json()
|
|
|
|
# should error because we used wallet2 signatures for the hash lock
|
|
await assert_err(
|
|
wallet1.redeem(send_proofs),
|
|
"Mint Error: no valid signature provided for input.",
|
|
)
|
|
|
|
await asyncio.sleep(2)
|
|
# should succeed since lock time has passed and we provided wallet1 signature for timelock
|
|
await wallet1.redeem(send_proofs)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_htlc_redeem_hashlock_wrong_signature_timelock_wrong_signature(
|
|
wallet1: Wallet, wallet2: Wallet
|
|
):
|
|
mint_quote = await wallet1.request_mint(64)
|
|
await pay_if_regtest(mint_quote.request)
|
|
await wallet1.mint(64, quote_id=mint_quote.quote)
|
|
preimage = "00000000000000000000000000000000"
|
|
pubkey_wallet1 = await wallet1.create_p2pk_pubkey()
|
|
pubkey_wallet2 = await wallet2.create_p2pk_pubkey()
|
|
# preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest()
|
|
secret = await wallet1.create_htlc_lock(
|
|
preimage=preimage,
|
|
hashlock_pubkeys=[pubkey_wallet2],
|
|
locktime_seconds=2,
|
|
locktime_pubkeys=[pubkey_wallet1],
|
|
)
|
|
_, send_proofs = await wallet1.swap_to_send(wallet1.proofs, 8, secret_lock=secret)
|
|
|
|
signatures = wallet1.sign_proofs(send_proofs)
|
|
for p, s in zip(send_proofs, signatures):
|
|
p.witness = HTLCWitness(
|
|
preimage=preimage, signatures=[f"{s[:-5]}11111"]
|
|
).json() # wrong signature
|
|
|
|
# should error because we used wallet2 signatures for the hash lock
|
|
await assert_err(
|
|
wallet1.redeem(send_proofs),
|
|
"Mint Error: no valid signature provided for input.",
|
|
)
|
|
|
|
await asyncio.sleep(2)
|
|
# should fail since lock time has passed and we provided a wrong signature for timelock
|
|
await assert_err(
|
|
wallet1.redeem(send_proofs),
|
|
"Mint Error: no valid signature provided for input.",
|
|
)
|