* add htlc files

* refactor mint into several components

* add hash lock signatures

* add refund signature checks

* simplify hash lock signature check

* clean up
This commit is contained in:
callebtc
2023-09-23 19:08:38 +02:00
committed by GitHub
parent 6282e0a22a
commit f1b621fa90
14 changed files with 865 additions and 430 deletions

View File

@@ -48,6 +48,8 @@ class Proof(BaseModel):
p2pksigs: Union[List[str], None] = [] # P2PK signature
p2shscript: Union[P2SHScript, None] = None # P2SH spending condition
htlcpreimage: Union[str, None] = None # HTLC unlocking preimage
htlcsignature: Union[str, None] = None # HTLC unlocking signature
# whether this proof is reserved for sending, used for coin management in the wallet
reserved: Union[None, bool] = False
# unique ID of send attempt, used for grouping pending tokens in the wallet

17
cashu/core/htlc.py Normal file
View File

@@ -0,0 +1,17 @@
from typing import Union
from .secret import Secret, SecretKind
class HTLCSecret(Secret):
@classmethod
def from_secret(cls, secret: Secret):
assert secret.kind == SecretKind.HTLC, "Secret is not a HTLC secret"
# NOTE: exclude tags in .dict() because it doesn't deserialize it properly
# need to add it back in manually with tags=secret.tags
return cls(**secret.dict(exclude={"tags"}), tags=secret.tags)
@property
def locktime(self) -> Union[None, int]:
locktime = self.tags.get_tag("locktime")
return int(locktime) if locktime else None

View File

@@ -1,17 +1,12 @@
import hashlib
import json
import time
from typing import Any, Dict, List, Optional, Union
from typing import List, Union
from loguru import logger
from pydantic import BaseModel
from .crypto.secp import PrivateKey, PublicKey
class SecretKind:
P2SH = "P2SH"
P2PK = "P2PK"
from .secret import Secret, SecretKind
class SigFlags:
@@ -21,69 +16,6 @@ class SigFlags:
SIG_ALL = "SIG_ALL" # require signatures on inputs and outputs
class Tags(BaseModel):
"""
Tags are used to encode additional information in the Secret of a Proof.
"""
__root__: List[List[str]] = []
def __init__(self, tags: Optional[List[List[str]]] = None, **kwargs):
super().__init__(**kwargs)
self.__root__ = tags or []
def __setitem__(self, key: str, value: str) -> None:
self.__root__.append([key, value])
def __getitem__(self, key: str) -> Union[str, None]:
return self.get_tag(key)
def get_tag(self, tag_name: str) -> Union[str, None]:
for tag in self.__root__:
if tag[0] == tag_name:
return tag[1]
return None
def get_tag_all(self, tag_name: str) -> List[str]:
all_tags = []
for tag in self.__root__:
if tag[0] == tag_name:
for t in tag[1:]:
all_tags.append(t)
return all_tags
class Secret(BaseModel):
"""Describes spending condition encoded in the secret field of a Proof."""
kind: str
data: str
tags: Tags
nonce: Union[None, str] = None
def serialize(self) -> str:
data_dict: Dict[str, Any] = {
"data": self.data,
"nonce": self.nonce or PrivateKey().serialize()[:32],
}
if self.tags.__root__:
logger.debug(f"Serializing tags: {self.tags.__root__}")
data_dict["tags"] = self.tags.__root__
return json.dumps(
[self.kind, data_dict],
)
@classmethod
def deserialize(cls, from_proof: str):
kind, kwargs = json.loads(from_proof)
data = kwargs.pop("data")
nonce = kwargs.pop("nonce")
tags_list: List = kwargs.pop("tags", None)
tags = Tags(tags=tags_list)
logger.debug(f"Deserialized Secret: {kind}, {data}, {nonce}, {tags}")
return cls(kind=kind, data=data, nonce=nonce, tags=tags)
class P2PKSecret(Secret):
@classmethod
def from_secret(cls, secret: Secret):

View File

@@ -27,7 +27,7 @@ def step0_carol_privkey():
def step0_carol_checksig_redeemscript(carol_pubkey):
"""Create script"""
txin_redeemScript = CScript([carol_pubkey, OP_CHECKSIG])
txin_redeemScript = CScript([carol_pubkey, OP_CHECKSIG]) # type: ignore
# txin_redeemScript = CScript([-123, OP_CHECKLOCKTIMEVERIFY])
# txin_redeemScript = CScript([3, 3, OP_LESSTHAN, OP_VERIFY])
return txin_redeemScript
@@ -58,7 +58,7 @@ def step2_carol_sign_tx(txin_redeemScript, privatekey):
tx, txin = step1_bob_carol_create_tx(txin_p2sh_address)
sighash = SignatureHash(txin_redeemScript, tx, 0, SIGHASH_ALL)
sig = privatekey.sign(sighash) + bytes([SIGHASH_ALL])
txin.scriptSig = CScript([sig, txin_redeemScript])
txin.scriptSig = CScript([sig, txin_redeemScript]) # type: ignore
return txin

76
cashu/core/secret.py Normal file
View File

@@ -0,0 +1,76 @@
import json
from typing import Any, Dict, List, Optional, Union
from loguru import logger
from pydantic import BaseModel
from .crypto.secp import PrivateKey
class SecretKind:
P2SH = "P2SH"
P2PK = "P2PK"
HTLC = "HTLC"
class Tags(BaseModel):
"""
Tags are used to encode additional information in the Secret of a Proof.
"""
__root__: List[List[str]] = []
def __init__(self, tags: Optional[List[List[str]]] = None, **kwargs):
super().__init__(**kwargs)
self.__root__ = tags or []
def __setitem__(self, key: str, value: str) -> None:
self.__root__.append([key, value])
def __getitem__(self, key: str) -> Union[str, None]:
return self.get_tag(key)
def get_tag(self, tag_name: str) -> Union[str, None]:
for tag in self.__root__:
if tag[0] == tag_name:
return tag[1]
return None
def get_tag_all(self, tag_name: str) -> List[str]:
all_tags = []
for tag in self.__root__:
if tag[0] == tag_name:
for t in tag[1:]:
all_tags.append(t)
return all_tags
class Secret(BaseModel):
"""Describes spending condition encoded in the secret field of a Proof."""
kind: str
data: str
tags: Tags
nonce: Union[None, str] = None
def serialize(self) -> str:
data_dict: Dict[str, Any] = {
"data": self.data,
"nonce": self.nonce or PrivateKey().serialize()[:32],
}
if self.tags.__root__:
logger.debug(f"Serializing tags: {self.tags.__root__}")
data_dict["tags"] = self.tags.__root__
return json.dumps(
[self.kind, data_dict],
)
@classmethod
def deserialize(cls, from_proof: str):
kind, kwargs = json.loads(from_proof)
data = kwargs.pop("data")
nonce = kwargs.pop("nonce")
tags_list: List = kwargs.pop("tags", None)
tags = Tags(tags=tags_list)
logger.debug(f"Deserialized Secret: {kind}, {data}, {nonce}, {tags}")
return cls(kind=kind, data=data, nonce=nonce, tags=tags)

285
cashu/mint/conditions.py Normal file
View File

@@ -0,0 +1,285 @@
import hashlib
import time
from typing import List
from loguru import logger
from ..core.base import (
BlindedMessage,
Proof,
)
from ..core.crypto.secp import PublicKey
from ..core.errors import (
TransactionError,
)
from ..core.htlc import HTLCSecret
from ..core.p2pk import (
P2PKSecret,
SigFlags,
verify_p2pk_signature,
)
from ..core.script import verify_bitcoin_script
from ..core.secret import Secret, SecretKind
class LedgerSpendingConditions:
def _verify_input_spending_conditions(self, proof: Proof) -> bool:
"""
Verify spending conditions:
Condition: P2SH - Witnesses proof.p2shscript
Condition: P2PK - Witness: proof.p2pksigs
Condition: HTLC - Witness: proof.htlcpreimage, proof.htlcsignature
"""
# P2SH
try:
secret = Secret.deserialize(proof.secret)
logger.trace(f"proof.secret: {proof.secret}")
logger.trace(f"secret: {secret}")
except Exception:
# secret is not a spending condition so we treat is a normal secret
return True
if secret.kind == SecretKind.P2SH:
p2pk_secret = P2PKSecret.from_secret(secret)
# check if locktime is in the past
now = time.time()
if p2pk_secret.locktime and p2pk_secret.locktime < now:
logger.trace(f"p2sh locktime ran out ({p2pk_secret.locktime}<{now}).")
return True
logger.trace(f"p2sh locktime still active ({p2pk_secret.locktime}>{now}).")
if (
proof.p2shscript is None
or proof.p2shscript.script is None
or proof.p2shscript.signature is None
):
# no script present although secret indicates one
raise TransactionError("no script in proof.")
# execute and verify P2SH
txin_p2sh_address, valid = verify_bitcoin_script(
proof.p2shscript.script, proof.p2shscript.signature
)
if not valid:
raise TransactionError("script invalid.")
# check if secret commits to script address
assert secret.data == str(txin_p2sh_address), (
f"secret does not contain correct P2SH address: {secret.data} is not"
f" {txin_p2sh_address}."
)
return True
# P2PK
if secret.kind == SecretKind.P2PK:
p2pk_secret = P2PKSecret.from_secret(secret)
# check if locktime is in the past
pubkeys = p2pk_secret.get_p2pk_pubkey_from_secret()
assert len(set(pubkeys)) == len(pubkeys), "pubkeys must be unique."
logger.trace(f"pubkeys: {pubkeys}")
# we will get an empty list if the locktime has passed and no refund pubkey is present
if not pubkeys:
return True
# now we check the signature
if not proof.p2pksigs:
# no signature present although secret indicates one
logger.error(f"no p2pk signatures in proof: {proof.p2pksigs}")
raise TransactionError("no p2pk signatures in proof.")
# we make sure that there are no duplicate signatures
if len(set(proof.p2pksigs)) != len(proof.p2pksigs):
raise TransactionError("p2pk signatures must be unique.")
# we parse the secret as a P2PK commitment
# assert len(proof.secret.split(":")) == 5, "p2pk secret format invalid."
# INPUTS: check signatures proof.p2pksigs against pubkey
# we expect the signature to be on the pubkey (=message) itself
n_sigs_required = p2pk_secret.n_sigs or 1
assert n_sigs_required > 0, "n_sigs must be positive."
# check if enough signatures are present
assert len(proof.p2pksigs) >= n_sigs_required, (
f"not enough signatures provided: {len(proof.p2pksigs)} <"
f" {n_sigs_required}."
)
n_valid_sigs_per_output = 0
# loop over all signatures in output
for input_sig in proof.p2pksigs:
for pubkey in pubkeys:
logger.trace(f"verifying signature {input_sig} by pubkey {pubkey}.")
logger.trace(f"Message: {p2pk_secret.serialize().encode('utf-8')}")
if verify_p2pk_signature(
message=p2pk_secret.serialize().encode("utf-8"),
pubkey=PublicKey(bytes.fromhex(pubkey), raw=True),
signature=bytes.fromhex(input_sig),
):
n_valid_sigs_per_output += 1
logger.trace(
f"p2pk signature on input is valid: {input_sig} on"
f" {pubkey}."
)
continue
else:
logger.trace(
f"p2pk signature on input is invalid: {input_sig} on"
f" {pubkey}."
)
# check if we have enough valid signatures
assert n_valid_sigs_per_output, "no valid signature provided for input."
assert n_valid_sigs_per_output >= n_sigs_required, (
f"signature threshold not met. {n_valid_sigs_per_output} <"
f" {n_sigs_required}."
)
logger.trace(
f"{n_valid_sigs_per_output} of {n_sigs_required} valid signatures"
" found."
)
logger.trace(proof.p2pksigs)
logger.trace("p2pk signature on inputs is valid.")
return True
# HTLC
if secret.kind == SecretKind.HTLC:
htlc_secret = HTLCSecret.from_secret(secret)
# time lock
# check if locktime is in the past
if htlc_secret.locktime and htlc_secret.locktime < time.time():
refund_pubkeys = htlc_secret.tags.get_tag_all("refund")
if refund_pubkeys:
assert proof.htlcsignature, TransactionError(
"no HTLC refund signature provided"
)
for pubkey in refund_pubkeys:
if verify_p2pk_signature(
message=htlc_secret.serialize().encode("utf-8"),
pubkey=PublicKey(bytes.fromhex(pubkey), raw=True),
signature=bytes.fromhex(proof.htlcsignature),
):
# a signature matches
return True
raise TransactionError("HTLC refund signatures did not match.")
# no pubkeys given in secret, anyone can spend
return True
# hash lock
assert proof.htlcpreimage, TransactionError("no HTLC preimage provided")
# first we check whether a correct preimage was included
if not hashlib.sha256(
bytes.fromhex(proof.htlcpreimage)
).digest() == bytes.fromhex(htlc_secret.data):
raise TransactionError("HTLC preimage does not match.")
# then we check whether a signature is required
hashlock_pubkeys = htlc_secret.tags.get_tag_all("pubkeys")
if hashlock_pubkeys:
assert proof.htlcsignature, TransactionError(
"HTLC no hash lock signatures provided."
)
for pubkey in hashlock_pubkeys:
if verify_p2pk_signature(
message=htlc_secret.serialize().encode("utf-8"),
pubkey=PublicKey(bytes.fromhex(pubkey), raw=True),
signature=bytes.fromhex(proof.htlcsignature),
):
# a signature matches
return True
# none of the pubkeys had a match
raise TransactionError("HTLC hash lock signatures did not match.")
# no pubkeys were included, anyone can spend
return True
# no spending condition present
return True
def _verify_output_spending_conditions(
self, proofs: List[Proof], outputs: List[BlindedMessage]
) -> bool:
"""
Verify spending conditions:
Condition: P2PK - Witness: output.p2pksigs
"""
# P2PK
pubkeys_per_proof = []
n_sigs = []
for proof in proofs:
try:
secret = P2PKSecret.deserialize(proof.secret)
# get all p2pk pubkeys from secrets
pubkeys_per_proof.append(secret.get_p2pk_pubkey_from_secret())
# get signature threshold from secrets
n_sigs.append(secret.n_sigs)
except Exception:
# secret is not a spending condition so we treat is a normal secret
return True
# for all proofs all pubkeys must be the same
assert (
len(set([tuple(pubs_output) for pubs_output in pubkeys_per_proof])) == 1
), "pubkeys in all proofs must match."
pubkeys = pubkeys_per_proof[0]
if not pubkeys:
# no pubkeys present
return True
logger.trace(f"pubkeys: {pubkeys}")
# TODO: add limit for maximum number of pubkeys
# for all proofs all n_sigs must be the same
assert len(set(n_sigs)) == 1, "n_sigs in all proofs must match."
n_sigs_required = n_sigs[0] or 1
# first we check if all secrets are P2PK
if not all(
[Secret.deserialize(p.secret).kind == SecretKind.P2PK for p in proofs]
):
# not all secrets are P2PK
return True
# now we check if any of the secrets has sigflag==SIG_ALL
if not any(
[
P2PKSecret.deserialize(p.secret).sigflag == SigFlags.SIG_ALL
for p in proofs
]
):
# no secret has sigflag==SIG_ALL
return True
# loop over all outputs and check if the signatures are valid for pubkeys with a threshold of n_sig
for output in outputs:
# we expect the signature to be on the pubkey (=message) itself
assert output.p2pksigs, "no signatures in output."
# TODO: add limit for maximum number of signatures
# we check whether any signature is duplicate
assert len(set(output.p2pksigs)) == len(
output.p2pksigs
), "duplicate signatures in output."
n_valid_sigs_per_output = 0
# loop over all signatures in output
for output_sig in output.p2pksigs:
for pubkey in pubkeys:
if verify_p2pk_signature(
message=output.B_.encode("utf-8"),
pubkey=PublicKey(bytes.fromhex(pubkey), raw=True),
signature=bytes.fromhex(output_sig),
):
n_valid_sigs_per_output += 1
assert n_valid_sigs_per_output, "no valid signature provided for output."
assert n_valid_sigs_per_output >= n_sigs_required, (
f"signature threshold not met. {n_valid_sigs_per_output} <"
f" {n_sigs_required}."
)
logger.trace(
f"{n_valid_sigs_per_output} of {n_sigs_required} valid signatures"
" found."
)
logger.trace(output.p2pksigs)
logger.trace("p2pk signatures on output is valid.")
return True

View File

@@ -1,6 +1,5 @@
import asyncio
import math
import time
from typing import Dict, List, Literal, Optional, Set, Tuple, Union
from loguru import logger
@@ -24,28 +23,20 @@ from ..core.errors import (
KeysetError,
KeysetNotFoundError,
LightningError,
NoSecretInProofsError,
NotAllowedError,
SecretTooLongError,
TokenAlreadySpentError,
TransactionError,
)
from ..core.helpers import fee_reserve, sum_proofs
from ..core.p2pk import (
P2PKSecret,
Secret,
SecretKind,
SigFlags,
verify_p2pk_signature,
)
from ..core.script import verify_bitcoin_script
from ..core.settings import settings
from ..core.split import amount_split
from ..lightning.base import Wallet
from ..mint.crud import LedgerCrud
from .conditions import LedgerSpendingConditions
from .verification import LedgerVerification
class Ledger:
class Ledger(LedgerVerification, LedgerSpendingConditions):
locks: Dict[str, asyncio.Lock] = {} # holds multiprocessing locks
proofs_pending_lock: asyncio.Lock = (
asyncio.Lock()
@@ -76,7 +67,7 @@ class Ledger:
logger.trace(f"crud: loaded {len(proofs_used)} used proofs")
self.proofs_used = set(proofs_used)
async def load_keyset(self, derivation_path, autosave=True):
async def load_keyset(self, derivation_path, autosave=True) -> MintKeyset:
"""Load the keyset for a derivation path if it already exists. If not generate new one and store in the db.
Args:
@@ -207,6 +198,11 @@ class Ledger:
dleq=DLEQ(e=e.serialize(), s=s.serialize()),
)
def _check_proofs_spendable(self, proofs: List[Proof]):
"""Checks whether the proofs were already spent."""
if not all([p.secret not in self.proofs_used for p in proofs]):
raise TokenAlreadySpentError()
def _check_spendable(self, proof: Proof):
"""Checks whether the proof was already spent."""
return proof.secret not in self.proofs_used
@@ -220,288 +216,6 @@ class Ledger:
]
return pending_states
def _verify_secret_criteria(self, proof: Proof) -> Literal[True]:
"""Verifies that a secret is present and is not too long (DOS prevention)."""
if proof.secret is None or proof.secret == "":
raise NoSecretInProofsError()
if len(proof.secret) > 512:
raise SecretTooLongError()
return True
def _verify_proof_bdhke(self, proof: Proof):
"""Verifies that the proof of promise was issued by this ledger."""
if not self._check_spendable(proof):
raise TokenAlreadySpentError()
# if no keyset id is given in proof, assume the current one
if not proof.id:
private_key_amount = self.keyset.private_keys[proof.amount]
else:
assert proof.id in self.keysets.keysets, f"keyset {proof.id} unknown"
logger.trace(
f"Validating proof with keyset {self.keysets.keysets[proof.id].id}."
)
# use the appropriate active keyset for this proof.id
private_key_amount = self.keysets.keysets[proof.id].private_keys[
proof.amount
]
C = PublicKey(bytes.fromhex(proof.C), raw=True)
return b_dhke.verify(private_key_amount, C, proof.secret)
def _verify_input_spending_conditions(self, proof: Proof) -> bool:
"""
Verify spending conditions:
Condition: P2SH - Witnesses proof.p2shscript
Condition: P2PK - Witness: proof.p2pksigs
"""
# P2SH
try:
secret = Secret.deserialize(proof.secret)
logger.trace(f"proof.secret: {proof.secret}")
logger.trace(f"secret: {secret}")
except Exception:
# secret is not a spending condition so we treat is a normal secret
return True
if secret.kind == SecretKind.P2SH:
p2pk_secret = P2PKSecret.from_secret(secret)
# check if locktime is in the past
now = time.time()
if p2pk_secret.locktime and p2pk_secret.locktime < now:
logger.trace(f"p2sh locktime ran out ({p2pk_secret.locktime}<{now}).")
return True
logger.trace(f"p2sh locktime still active ({p2pk_secret.locktime}>{now}).")
if (
proof.p2shscript is None
or proof.p2shscript.script is None
or proof.p2shscript.signature is None
):
# no script present although secret indicates one
raise TransactionError("no script in proof.")
# execute and verify P2SH
txin_p2sh_address, valid = verify_bitcoin_script(
proof.p2shscript.script, proof.p2shscript.signature
)
if not valid:
raise TransactionError("script invalid.")
# check if secret commits to script address
assert secret.data == str(txin_p2sh_address), (
f"secret does not contain correct P2SH address: {secret.data} is not"
f" {txin_p2sh_address}."
)
return True
# P2PK
if secret.kind == SecretKind.P2PK:
p2pk_secret = P2PKSecret.from_secret(secret)
# check if locktime is in the past
pubkeys = p2pk_secret.get_p2pk_pubkey_from_secret()
assert len(set(pubkeys)) == len(pubkeys), "pubkeys must be unique."
logger.trace(f"pubkeys: {pubkeys}")
# we will get an empty list if the locktime has passed and no refund pubkey is present
if not pubkeys:
return True
# now we check the signature
if not proof.p2pksigs:
# no signature present although secret indicates one
logger.error(f"no p2pk signatures in proof: {proof.p2pksigs}")
raise TransactionError("no p2pk signatures in proof.")
# we make sure that there are no duplicate signatures
if len(set(proof.p2pksigs)) != len(proof.p2pksigs):
raise TransactionError("p2pk signatures must be unique.")
# we parse the secret as a P2PK commitment
# assert len(proof.secret.split(":")) == 5, "p2pk secret format invalid."
# INPUTS: check signatures proof.p2pksigs against pubkey
# we expect the signature to be on the pubkey (=message) itself
n_sigs_required = p2pk_secret.n_sigs or 1
assert n_sigs_required > 0, "n_sigs must be positive."
# check if enough signatures are present
assert len(proof.p2pksigs) >= n_sigs_required, (
f"not enough signatures provided: {len(proof.p2pksigs)} <"
f" {n_sigs_required}."
)
n_valid_sigs_per_output = 0
# loop over all signatures in output
for input_sig in proof.p2pksigs:
for pubkey in pubkeys:
logger.trace(f"verifying signature {input_sig} by pubkey {pubkey}.")
logger.trace(f"Message: {p2pk_secret.serialize().encode('utf-8')}")
if verify_p2pk_signature(
message=p2pk_secret.serialize().encode("utf-8"),
pubkey=PublicKey(bytes.fromhex(pubkey), raw=True),
signature=bytes.fromhex(input_sig),
):
n_valid_sigs_per_output += 1
logger.trace(
f"p2pk signature on input is valid: {input_sig} on"
f" {pubkey}."
)
continue
else:
logger.trace(
f"p2pk signature on input is invalid: {input_sig} on"
f" {pubkey}."
)
# check if we have enough valid signatures
assert n_valid_sigs_per_output, "no valid signature provided for input."
assert n_valid_sigs_per_output >= n_sigs_required, (
f"signature threshold not met. {n_valid_sigs_per_output} <"
f" {n_sigs_required}."
)
logger.trace(
f"{n_valid_sigs_per_output} of {n_sigs_required} valid signatures"
" found."
)
logger.trace(proof.p2pksigs)
logger.trace("p2pk signature on inputs is valid.")
return True
# no spending contition
return True
def _verify_output_spending_conditions(
self, proofs: List[Proof], outputs: List[BlindedMessage]
) -> bool:
"""
Verify spending conditions:
Condition: P2PK - Witness: output.p2pksigs
"""
# P2SH
pubkeys_per_proof = []
n_sigs = []
for proof in proofs:
try:
secret = P2PKSecret.deserialize(proof.secret)
# get all p2pk pubkeys from secrets
pubkeys_per_proof.append(secret.get_p2pk_pubkey_from_secret())
# get signature threshold from secrets
n_sigs.append(secret.n_sigs)
except Exception:
# secret is not a spending condition so we treat is a normal secret
return True
# for all proofs all pubkeys must be the same
assert (
len(set([tuple(pubs_output) for pubs_output in pubkeys_per_proof])) == 1
), "pubkeys in all proofs must match."
pubkeys = pubkeys_per_proof[0]
if not pubkeys:
# no pubkeys present
return True
logger.trace(f"pubkeys: {pubkeys}")
# TODO: add limit for maximum number of pubkeys
# for all proofs all n_sigs must be the same
assert len(set(n_sigs)) == 1, "n_sigs in all proofs must match."
n_sigs_required = n_sigs[0] or 1
# first we check if all secrets are P2PK
if not all(
[Secret.deserialize(p.secret).kind == SecretKind.P2PK for p in proofs]
):
# not all secrets are P2PK
return True
# now we check if any of the secrets has sigflag==SIG_ALL
if not any(
[
P2PKSecret.deserialize(p.secret).sigflag == SigFlags.SIG_ALL
for p in proofs
]
):
# no secret has sigflag==SIG_ALL
return True
# loop over all outputs and check if the signatures are valid for pubkeys with a threshold of n_sig
for output in outputs:
# we expect the signature to be on the pubkey (=message) itself
assert output.p2pksigs, "no signatures in output."
# TODO: add limit for maximum number of signatures
# we check whether any signature is duplicate
assert len(set(output.p2pksigs)) == len(
output.p2pksigs
), "duplicate signatures in output."
n_valid_sigs_per_output = 0
# loop over all signatures in output
for output_sig in output.p2pksigs:
for pubkey in pubkeys:
if verify_p2pk_signature(
message=output.B_.encode("utf-8"),
pubkey=PublicKey(bytes.fromhex(pubkey), raw=True),
signature=bytes.fromhex(output_sig),
):
n_valid_sigs_per_output += 1
assert n_valid_sigs_per_output, "no valid signature provided for output."
assert n_valid_sigs_per_output >= n_sigs_required, (
f"signature threshold not met. {n_valid_sigs_per_output} <"
f" {n_sigs_required}."
)
logger.trace(
f"{n_valid_sigs_per_output} of {n_sigs_required} valid signatures"
" found."
)
logger.trace(output.p2pksigs)
logger.trace("p2pk signatures on output is valid.")
return True
def _verify_input_output_amounts(
self, inputs: List[Proof], outputs: List[BlindedMessage]
) -> bool:
"""Verifies that inputs have at least the same amount as outputs"""
input_amount = sum([p.amount for p in inputs])
output_amount = sum([o.amount for o in outputs])
return input_amount >= output_amount
def _verify_no_duplicate_proofs(self, proofs: List[Proof]) -> bool:
secrets = [p.secret for p in proofs]
if len(secrets) != len(list(set(secrets))):
return False
return True
def _verify_no_duplicate_outputs(self, outputs: List[BlindedMessage]) -> bool:
B_s = [od.B_ for od in outputs]
if len(B_s) != len(list(set(B_s))):
return False
return True
def _verify_amount(self, amount: int) -> int:
"""Any amount used should be a positive integer not larger than 2^MAX_ORDER."""
valid = (
isinstance(amount, int) and amount > 0 and amount < 2**settings.max_order
)
logger.trace(f"Verifying amount {amount} is valid: {valid}")
if not valid:
raise NotAllowedError("invalid amount: " + str(amount))
return amount
def _verify_equation_balanced(
self,
proofs: List[Proof],
outs: Union[List[BlindedSignature], List[BlindedMessage]],
) -> None:
"""Verify that Σinputs - Σoutputs = 0.
Outputs can be BlindedSignature or BlindedMessage.
"""
sum_inputs = sum(self._verify_amount(p.amount) for p in proofs)
sum_outputs = sum(self._verify_amount(p.amount) for p in outs)
assert (
sum_outputs - sum_inputs == 0
), "inputs do not have same amount as outputs"
async def _request_lightning_invoice(self, amount: int):
"""Generate a Lightning invoice using the funding source backend.
@@ -714,51 +428,6 @@ class Ledger:
if p.secret == pp.secret:
raise TransactionError("proofs are pending.")
async def _verify_proofs_and_outputs(
self, proofs: List[Proof], outputs: Optional[List[BlindedMessage]] = None
):
"""Checks all proofs and outputs for validity.
Args:
proofs (List[Proof]): List of proofs to check.
outputs (Optional[List[BlindedMessage]], optional): List of outputs to check.
Must be provided for /split but not for /melt. Defaults to None.
Raises:
Exception: Scripts did not validate.
Exception: Criteria for provided secrets not met.
Exception: Duplicate proofs provided.
Exception: BDHKE verification failed.
"""
# Verify inputs
# Verify secret criteria
if not all([self._verify_secret_criteria(p) for p in proofs]):
raise TransactionError("secrets do not match criteria.")
# verify that only unique proofs were used
if not self._verify_no_duplicate_proofs(proofs):
raise TransactionError("duplicate proofs.")
# Verify input spending conditions
if not all([self._verify_input_spending_conditions(p) for p in proofs]):
raise TransactionError("validation of input spending conditions failed.")
# Verify ecash signatures
if not all([self._verify_proof_bdhke(p) for p in proofs]):
raise TransactionError("could not verify proofs.")
if not outputs:
return
# Verify outputs
# verify that only unique outputs were used
if not self._verify_no_duplicate_outputs(outputs):
raise TransactionError("duplicate promises.")
if not self._verify_input_output_amounts(proofs, outputs):
raise TransactionError("input amounts less than output.")
# Verify output spending conditions
if outputs and not self._verify_output_spending_conditions(proofs, outputs):
raise TransactionError("validation of output spending conditions failed.")
async def _generate_change_promises(
self,
total_provided: int,
@@ -938,9 +607,7 @@ class Ledger:
await self._set_proofs_pending(proofs)
try:
await self._verify_proofs_and_outputs(proofs)
logger.trace("verified proofs")
# verify amounts
total_provided = sum_proofs(proofs)
invoice_obj = bolt11.decode(invoice)
invoice_amount = math.ceil(invoice_obj.amount_msat / 1000)
@@ -949,13 +616,18 @@ class Ledger:
f"Maximum melt amount is {settings.mint_max_peg_out} sat."
)
fees_msat = await self.check_fees(invoice)
# verify overspending attempt
assert (
total_provided >= invoice_amount + fees_msat / 1000
), TransactionError("provided proofs not enough for Lightning payment.")
# verify that proofs have not been spent yet
self._check_proofs_spendable(proofs)
# verify spending inputs, outputs, and spending conditions
await self._verify_proofs_and_outputs(proofs, outputs)
# promises to return for overpaid fees
return_promises: List[BlindedSignature] = []
if settings.lightning:
logger.trace("paying lightning invoice")
status, preimage, fee_msat = await self._pay_lightning_invoice(
@@ -1075,20 +747,17 @@ class Ledger:
total_amount = sum_proofs(proofs)
try:
logger.trace("verifying _verify_split_amount")
# verify that amount is kosher
self._verify_amount(total_amount)
# verify overspending attempt
self._verify_equation_balanced(proofs, outputs)
logger.trace("verifying proofs: _verify_proofs_and_outputs")
# verify that proofs have not been spent yet
self._check_proofs_spendable(proofs)
# verify spending inputs, outputs, and spending conditions
await self._verify_proofs_and_outputs(proofs, outputs)
logger.trace("verified proofs and outputs")
# Mark proofs as used and prepare new promises
logger.trace("invalidating proofs")
await self._invalidate_proofs(proofs)
logger.trace("invalidated proofs")
except Exception as e:
logger.trace(f"split failed: {e}")
raise e
@@ -1123,7 +792,6 @@ class Ledger:
logger.trace("split successful")
return promises
return prom_fst, prom_snd
async def restore(
self, outputs: List[BlindedMessage]

8
cashu/mint/protocols.py Normal file
View File

@@ -0,0 +1,8 @@
from typing import Protocol
from ..core.base import MintKeyset, MintKeysets
class SupportsKeysets(Protocol):
keyset: MintKeyset
keysets: MintKeysets

144
cashu/mint/verification.py Normal file
View File

@@ -0,0 +1,144 @@
from typing import List, Literal, Optional, Union
from loguru import logger
from ..core.base import (
BlindedMessage,
BlindedSignature,
MintKeyset,
MintKeysets,
Proof,
)
from ..core.crypto import b_dhke
from ..core.crypto.secp import PublicKey
from ..core.errors import (
NoSecretInProofsError,
NotAllowedError,
SecretTooLongError,
TransactionError,
)
from ..core.settings import settings
from .conditions import LedgerSpendingConditions
from .protocols import SupportsKeysets
class LedgerVerification(LedgerSpendingConditions, SupportsKeysets):
"""Verification functions for the ledger."""
keyset: MintKeyset
keysets: MintKeysets
async def _verify_proofs_and_outputs(
self, proofs: List[Proof], outputs: Optional[List[BlindedMessage]] = None
):
"""Checks all proofs and outputs for validity.
Args:
proofs (List[Proof]): List of proofs to check.
outputs (Optional[List[BlindedMessage]], optional): List of outputs to check.
Must be provided for /split but not for /melt. Defaults to None.
Raises:
Exception: Scripts did not validate.
Exception: Criteria for provided secrets not met.
Exception: Duplicate proofs provided.
Exception: BDHKE verification failed.
"""
# Verify inputs
# Verify secret criteria
if not all([self._verify_secret_criteria(p) for p in proofs]):
raise TransactionError("secrets do not match criteria.")
# verify that only unique proofs were used
if not self._verify_no_duplicate_proofs(proofs):
raise TransactionError("duplicate proofs.")
# Verify input spending conditions
if not all([self._verify_input_spending_conditions(p) for p in proofs]):
raise TransactionError("validation of input spending conditions failed.")
# Verify ecash signatures
if not all([self._verify_proof_bdhke(p) for p in proofs]):
raise TransactionError("could not verify proofs.")
if not outputs:
return
# Verify outputs
# verify that only unique outputs were used
if not self._verify_no_duplicate_outputs(outputs):
raise TransactionError("duplicate promises.")
if not self._verify_input_output_amounts(proofs, outputs):
raise TransactionError("input amounts less than output.")
# Verify output spending conditions
if outputs and not self._verify_output_spending_conditions(proofs, outputs):
raise TransactionError("validation of output spending conditions failed.")
def _verify_secret_criteria(self, proof: Proof) -> Literal[True]:
"""Verifies that a secret is present and is not too long (DOS prevention)."""
if proof.secret is None or proof.secret == "":
raise NoSecretInProofsError()
if len(proof.secret) > 512:
raise SecretTooLongError()
return True
def _verify_proof_bdhke(self, proof: Proof):
"""Verifies that the proof of promise was issued by this ledger."""
# if no keyset id is given in proof, assume the current one
if not proof.id:
private_key_amount = self.keyset.private_keys[proof.amount]
else:
assert proof.id in self.keysets.keysets, f"keyset {proof.id} unknown"
logger.trace(
f"Validating proof with keyset {self.keysets.keysets[proof.id].id}."
)
# use the appropriate active keyset for this proof.id
private_key_amount = self.keysets.keysets[proof.id].private_keys[
proof.amount
]
C = PublicKey(bytes.fromhex(proof.C), raw=True)
return b_dhke.verify(private_key_amount, C, proof.secret)
def _verify_input_output_amounts(
self, inputs: List[Proof], outputs: List[BlindedMessage]
) -> bool:
"""Verifies that inputs have at least the same amount as outputs"""
input_amount = sum([p.amount for p in inputs])
output_amount = sum([o.amount for o in outputs])
return input_amount >= output_amount
def _verify_no_duplicate_proofs(self, proofs: List[Proof]) -> bool:
secrets = [p.secret for p in proofs]
if len(secrets) != len(list(set(secrets))):
return False
return True
def _verify_no_duplicate_outputs(self, outputs: List[BlindedMessage]) -> bool:
B_s = [od.B_ for od in outputs]
if len(B_s) != len(list(set(B_s))):
return False
return True
def _verify_amount(self, amount: int) -> int:
"""Any amount used should be a positive integer not larger than 2^MAX_ORDER."""
valid = (
isinstance(amount, int) and amount > 0 and amount < 2**settings.max_order
)
logger.trace(f"Verifying amount {amount} is valid: {valid}")
if not valid:
raise NotAllowedError("invalid amount: " + str(amount))
return amount
def _verify_equation_balanced(
self,
proofs: List[Proof],
outs: Union[List[BlindedSignature], List[BlindedMessage]],
) -> None:
"""Verify that Σinputs - Σoutputs = 0.
Outputs can be BlindedSignature or BlindedMessage.
"""
sum_inputs = sum(self._verify_amount(p.amount) for p in proofs)
sum_outputs = sum(self._verify_amount(p.amount) for p in outs)
assert (
sum_outputs - sum_inputs == 0
), "inputs do not have same amount as outputs"

56
cashu/wallet/htlc.py Normal file
View File

@@ -0,0 +1,56 @@
import hashlib
from datetime import datetime, timedelta
from typing import List, Optional
from ..core import bolt11 as bolt11
from ..core.base import (
Proof,
)
from ..core.db import Database
from ..core.htlc import (
HTLCSecret,
)
from ..core.secret import SecretKind, Tags
from .protocols import SupportsDb
class WalletHTLC(SupportsDb):
db: Database
async def create_htlc_lock(
self,
*,
preimage: Optional[str] = None,
preimage_hash: Optional[str] = None,
hacklock_pubkey: Optional[str] = None,
locktime_seconds: Optional[int] = None,
locktime_pubkey: Optional[str] = None,
) -> HTLCSecret:
tags = Tags()
if locktime_seconds:
tags["locktime"] = str(
int((datetime.now() + timedelta(seconds=locktime_seconds)).timestamp())
)
if locktime_pubkey:
tags["refund"] = locktime_pubkey
if not preimage_hash and preimage:
preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest()
assert preimage_hash, "preimage_hash or preimage must be provided"
if hacklock_pubkey:
tags["pubkeys"] = hacklock_pubkey
return HTLCSecret(
kind=SecretKind.HTLC,
data=preimage_hash,
tags=tags,
)
async def add_htlc_preimage_to_proofs(
self, proofs: List[Proof], preimage: str
) -> List[Proof]:
for p, s in zip(proofs, preimage):
p.htlcpreimage = s
return proofs

View File

@@ -14,10 +14,7 @@ from ..core.db import Database
from ..core.p2pk import (
P2PKSecret,
P2SHScript,
Secret,
SecretKind,
SigFlags,
Tags,
sign_p2pk_sign,
)
from ..core.script import (
@@ -26,6 +23,7 @@ from ..core.script import (
step1_carol_create_p2sh_address,
step2_carol_sign_tx,
)
from ..core.secret import Secret, SecretKind, Tags
from ..wallet.crud import (
get_unused_locks,
store_p2sh,

View File

@@ -61,6 +61,7 @@ from ..wallet.crud import (
update_proof_reserved,
)
from . import migrations
from .htlc import WalletHTLC
from .p2pk import WalletP2PK
from .secrets import WalletSecrets
@@ -394,7 +395,16 @@ class LedgerAPI(object):
# construct payload
def _splitrequest_include_fields(proofs: List[Proof]):
"""strips away fields from the model that aren't necessary for the /split"""
proofs_include = {"id", "amount", "secret", "C", "p2shscript", "p2pksigs"}
proofs_include = {
"id",
"amount",
"secret",
"C",
"p2shscript",
"p2pksigs",
"htlcpreimage",
"htlcsignature",
}
return {
"outputs": ...,
"proofs": {i: proofs_include for i in range(len(proofs))},
@@ -493,7 +503,7 @@ class LedgerAPI(object):
return returnObj.outputs, returnObj.promises
class Wallet(LedgerAPI, WalletP2PK, WalletSecrets):
class Wallet(LedgerAPI, WalletP2PK, WalletHTLC, WalletSecrets):
"""Minimal wallet wrapper."""
mnemonic: str # holds mnemonic of the wallet

238
tests/test_wallet_htlc.py Normal file
View File

@@ -0,0 +1,238 @@
import asyncio
import hashlib
import secrets
from typing import List
import pytest
import pytest_asyncio
from cashu.core.base import Proof
from cashu.core.crypto.secp import PrivateKey
from cashu.core.htlc import HTLCSecret
from cashu.core.migrations import migrate_databases
from cashu.wallet import migrations
from cashu.wallet.wallet import Wallet
from cashu.wallet.wallet import Wallet as Wallet1
from cashu.wallet.wallet import Wallet as Wallet2
from tests.conftest import SERVER_ENDPOINT
async def assert_err(f, msg):
"""Compute f() and expect an error message 'msg'."""
try:
await f
except Exception as exc:
if msg not in str(exc.args[0]):
raise Exception(f"Expected error: {msg}, got: {exc.args[0]}")
return
raise Exception(f"Expected error: {msg}, got no error")
def assert_amt(proofs: List[Proof], expected: int):
"""Assert amounts the proofs contain."""
assert [p.amount for p in proofs] == expected
@pytest_asyncio.fixture(scope="function")
async def wallet1(mint):
wallet1 = await Wallet1.with_db(
SERVER_ENDPOINT, "test_data/wallet_p2pk_1", "wallet1"
)
await migrate_databases(wallet1.db, migrations)
await wallet1.load_mint()
wallet1.status()
yield wallet1
@pytest_asyncio.fixture(scope="function")
async def wallet2(mint):
wallet2 = await Wallet2.with_db(
SERVER_ENDPOINT, "test_data/wallet_p2pk_2", "wallet2"
)
await migrate_databases(wallet2.db, migrations)
wallet2.private_key = PrivateKey(secrets.token_bytes(32), raw=True)
await wallet2.load_mint()
wallet2.status()
yield wallet2
@pytest.mark.asyncio
async def test_create_htlc_secret(wallet1: Wallet):
await wallet1.mint(64)
preimage = "00000000000000000000000000000000"
preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest()
secret = await wallet1.create_htlc_lock(preimage=preimage)
assert secret.data == preimage_hash
@pytest.mark.asyncio
async def test_htlc_split(wallet1: Wallet, wallet2: Wallet):
await wallet1.mint(64)
preimage = "00000000000000000000000000000000"
preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest()
secret = await wallet1.create_htlc_lock(preimage=preimage)
# p2pk test
_, send_proofs = await wallet1.split_to_send(wallet1.proofs, 8, secret_lock=secret)
for p in send_proofs:
assert HTLCSecret.deserialize(p.secret).data == preimage_hash
@pytest.mark.asyncio
async def test_htlc_redeem_with_preimage(wallet1: Wallet, wallet2: Wallet):
await wallet1.mint(64)
preimage = "00000000000000000000000000000000"
# preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest()
secret = await wallet1.create_htlc_lock(preimage=preimage)
# p2pk test
_, send_proofs = await wallet1.split_to_send(wallet1.proofs, 8, secret_lock=secret)
for p in send_proofs:
p.htlcpreimage = preimage
await wallet2.redeem(send_proofs)
@pytest.mark.asyncio
async def test_htlc_redeem_with_wrong_preimage(wallet1: Wallet, wallet2: Wallet):
await wallet1.mint(64)
preimage = "00000000000000000000000000000000"
# preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest()
secret = await wallet1.create_htlc_lock(preimage=preimage[:-1] + "1")
# p2pk test
_, send_proofs = await wallet1.split_to_send(wallet1.proofs, 8, secret_lock=secret)
for p in send_proofs:
p.htlcpreimage = preimage
await assert_err(
wallet2.redeem(send_proofs), "Mint Error: HTLC preimage does not match"
)
@pytest.mark.asyncio
async def test_htlc_redeem_with_no_signature(wallet1: Wallet, wallet2: Wallet):
await wallet1.mint(64)
preimage = "00000000000000000000000000000000"
pubkey_wallet1 = await wallet1.create_p2pk_pubkey()
# preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest()
secret = await wallet1.create_htlc_lock(
preimage=preimage, hacklock_pubkey=pubkey_wallet1
)
# p2pk test
_, send_proofs = await wallet1.split_to_send(wallet1.proofs, 8, secret_lock=secret)
for p in send_proofs:
p.htlcpreimage = preimage
await assert_err(
wallet2.redeem(send_proofs),
"Mint Error: HTLC no hash lock signatures provided.",
)
@pytest.mark.asyncio
async def test_htlc_redeem_with_wrong_signature(wallet1: Wallet, wallet2: Wallet):
await wallet1.mint(64)
preimage = "00000000000000000000000000000000"
pubkey_wallet1 = await wallet1.create_p2pk_pubkey()
# preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest()
secret = await wallet1.create_htlc_lock(
preimage=preimage, hacklock_pubkey=pubkey_wallet1
)
# p2pk test
_, send_proofs = await wallet1.split_to_send(wallet1.proofs, 8, secret_lock=secret)
signatures = await wallet1.sign_p2pk_proofs(send_proofs)
for p, s in zip(send_proofs, signatures):
p.htlcpreimage = preimage
p.htlcsignature = s[:-1] + "1" # wrong signature
await assert_err(
wallet2.redeem(send_proofs),
"Mint Error: HTLC hash lock signatures did not match.",
)
@pytest.mark.asyncio
async def test_htlc_redeem_with_correct_signature(wallet1: Wallet, wallet2: Wallet):
await wallet1.mint(64)
preimage = "00000000000000000000000000000000"
pubkey_wallet1 = await wallet1.create_p2pk_pubkey()
# preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest()
secret = await wallet1.create_htlc_lock(
preimage=preimage, hacklock_pubkey=pubkey_wallet1
)
# p2pk test
_, send_proofs = await wallet1.split_to_send(wallet1.proofs, 8, secret_lock=secret)
signatures = await wallet1.sign_p2pk_proofs(send_proofs)
for p, s in zip(send_proofs, signatures):
p.htlcpreimage = preimage
p.htlcsignature = s
await wallet2.redeem(send_proofs)
@pytest.mark.asyncio
async def test_htlc_redeem_hashlock_wrong_signature_timelock_correct_signature(
wallet1: Wallet, wallet2: Wallet
):
await wallet1.mint(64)
preimage = "00000000000000000000000000000000"
pubkey_wallet1 = await wallet1.create_p2pk_pubkey()
pubkey_wallet2 = await wallet2.create_p2pk_pubkey()
# preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest()
secret = await wallet1.create_htlc_lock(
preimage=preimage,
hacklock_pubkey=pubkey_wallet2,
locktime_seconds=5,
locktime_pubkey=pubkey_wallet1,
)
# p2pk test
_, send_proofs = await wallet1.split_to_send(wallet1.proofs, 8, secret_lock=secret)
signatures = await wallet1.sign_p2pk_proofs(send_proofs)
for p, s in zip(send_proofs, signatures):
p.htlcpreimage = preimage
p.htlcsignature = s
# should error because we used wallet2 signatures for the hash lock
await assert_err(
wallet1.redeem(send_proofs),
"Mint Error: HTLC hash lock signatures did not match.",
)
await asyncio.sleep(5)
# should succeed since lock time has passed and we provided wallet1 signature for timelock
await wallet1.redeem(send_proofs)
@pytest.mark.asyncio
async def test_htlc_redeem_hashlock_wrong_signature_timelock_wrong_signature(
wallet1: Wallet, wallet2: Wallet
):
await wallet1.mint(64)
preimage = "00000000000000000000000000000000"
pubkey_wallet1 = await wallet1.create_p2pk_pubkey()
pubkey_wallet2 = await wallet2.create_p2pk_pubkey()
# preimage_hash = hashlib.sha256(bytes.fromhex(preimage)).hexdigest()
secret = await wallet1.create_htlc_lock(
preimage=preimage,
hacklock_pubkey=pubkey_wallet2,
locktime_seconds=5,
locktime_pubkey=pubkey_wallet1,
)
# p2pk test
_, send_proofs = await wallet1.split_to_send(wallet1.proofs, 8, secret_lock=secret)
signatures = await wallet1.sign_p2pk_proofs(send_proofs)
for p, s in zip(send_proofs, signatures):
p.htlcpreimage = preimage
p.htlcsignature = s[:-1] + "1" # wrong signature
# should error because we used wallet2 signatures for the hash lock
await assert_err(
wallet1.redeem(send_proofs),
"Mint Error: HTLC hash lock signatures did not match.",
)
await asyncio.sleep(5)
# should fail since lock time has passed and we provided a wrong signature for timelock
await assert_err(
wallet1.redeem(send_proofs),
"Mint Error: HTLC refund signatures did not match.",
)

View File

@@ -9,7 +9,8 @@ import pytest_asyncio
from cashu.core.base import Proof
from cashu.core.crypto.secp import PrivateKey, PublicKey
from cashu.core.migrations import migrate_databases
from cashu.core.p2pk import SigFlags, Tags
from cashu.core.p2pk import SigFlags
from cashu.core.secret import Tags
from cashu.wallet import migrations
from cashu.wallet.wallet import Wallet
from cashu.wallet.wallet import Wallet as Wallet1