mirror of
https://github.com/aljazceru/nutshell.git
synced 2026-02-03 15:54:20 +01:00
P2PK: Sighash flags and multisig ecash because why not (#284)
* refactor split * add sigflag to Secret base model * add better comments * add comments * add witnesses inside /split * more comments * sign outputs if proofs indicate it * add signature to outputs but mint does not check it yet * multiple scriptsigs * signatures on outputs * add multisig tests * rename test * timelock -> locktime * move parameters to tags * convert back to List[List[str]] * errors are back! * fix all the stuff * more testing * make format
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import base64
|
||||
import json
|
||||
import time
|
||||
from sqlite3 import Row
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
@@ -19,8 +20,29 @@ class SecretKind:
|
||||
P2PK = "P2PK"
|
||||
|
||||
|
||||
class SigFlags:
|
||||
SIG_INPUTS = (
|
||||
"SIG_INPUTS" # require signatures only on the inputs (default signature flag)
|
||||
)
|
||||
SIG_ALL = "SIG_ALL" # require signatures on inputs and outputs
|
||||
|
||||
|
||||
class Tags(BaseModel):
|
||||
__root__: List[List[str]]
|
||||
"""
|
||||
Tags are used to encode additional information in the Secret of a Proof.
|
||||
"""
|
||||
|
||||
__root__: List[List[str]] = []
|
||||
|
||||
def __init__(self, tags: Optional[List[List[str]]] = None, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.__root__ = tags or []
|
||||
|
||||
def __setitem__(self, key: str, value: str) -> None:
|
||||
self.__root__.append([key, value])
|
||||
|
||||
def __getitem__(self, key: str) -> Union[str, None]:
|
||||
return self.get_tag(key)
|
||||
|
||||
def get_tag(self, tag_name: str) -> Union[str, None]:
|
||||
for tag in self.__root__:
|
||||
@@ -28,6 +50,13 @@ class Tags(BaseModel):
|
||||
return tag[1]
|
||||
return None
|
||||
|
||||
def get_tag_all(self, tag_name: str) -> List[str]:
|
||||
all_tags = []
|
||||
for tag in self.__root__:
|
||||
if tag[0] == tag_name:
|
||||
all_tags.append(tag[1])
|
||||
return all_tags
|
||||
|
||||
|
||||
class Secret(BaseModel):
|
||||
"""Describes spending condition encoded in the secret field of a Proof."""
|
||||
@@ -35,7 +64,6 @@ class Secret(BaseModel):
|
||||
kind: str
|
||||
data: str
|
||||
nonce: Union[None, str] = None
|
||||
timelock: Union[None, int] = None
|
||||
tags: Union[None, Tags] = None
|
||||
|
||||
def serialize(self) -> str:
|
||||
@@ -43,23 +71,76 @@ class Secret(BaseModel):
|
||||
"data": self.data,
|
||||
"nonce": self.nonce or PrivateKey().serialize()[:32],
|
||||
}
|
||||
if self.timelock:
|
||||
data_dict["timelock"] = self.timelock
|
||||
if self.tags:
|
||||
if self.tags and self.tags.__root__:
|
||||
logger.debug(f"Serializing tags: {self.tags.__root__}")
|
||||
data_dict["tags"] = self.tags.__root__
|
||||
logger.debug(
|
||||
json.dumps(
|
||||
[self.kind, data_dict],
|
||||
)
|
||||
)
|
||||
return json.dumps(
|
||||
[self.kind, data_dict],
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def deserialize(cls, data: str):
|
||||
kind, kwargs = json.loads(data)
|
||||
return cls(kind=kind, **kwargs)
|
||||
def deserialize(cls, from_proof: str):
|
||||
kind, kwargs = json.loads(from_proof)
|
||||
data = kwargs.pop("data")
|
||||
nonce = kwargs.pop("nonce")
|
||||
tags_list = kwargs.pop("tags", None)
|
||||
if tags_list:
|
||||
tags = Tags(tags=tags_list)
|
||||
else:
|
||||
tags = None
|
||||
logger.debug(f"Deserialized Secret: {kind}, {data}, {nonce}, {tags}")
|
||||
return cls(kind=kind, data=data, nonce=nonce, tags=tags)
|
||||
|
||||
@property
|
||||
def locktime(self) -> Union[None, int]:
|
||||
if self.tags:
|
||||
locktime = self.tags.get_tag("locktime")
|
||||
if locktime:
|
||||
return int(locktime)
|
||||
return None
|
||||
|
||||
@property
|
||||
def sigflag(self) -> Union[None, str]:
|
||||
if self.tags:
|
||||
sigflag = self.tags.get_tag("sigflag")
|
||||
if sigflag:
|
||||
return sigflag
|
||||
return None
|
||||
|
||||
@property
|
||||
def n_sigs(self) -> Union[None, int]:
|
||||
if self.tags:
|
||||
n_sigs = self.tags.get_tag("n_sigs")
|
||||
if n_sigs:
|
||||
return int(n_sigs)
|
||||
return None
|
||||
|
||||
def get_p2pk_pubkey_from_secret(self) -> List[str]:
|
||||
"""Gets the P2PK pubkey from a Secret depending on the locktime
|
||||
|
||||
Args:
|
||||
secret (Secret): P2PK Secret in ecash token
|
||||
|
||||
Returns:
|
||||
str: pubkey to use for P2PK, empty string if anyone can spend (locktime passed)
|
||||
"""
|
||||
pubkeys: List[str] = [self.data] # for now we only support one pubkey
|
||||
# get all additional pubkeys from tags for multisig
|
||||
if self.tags and self.tags.get_tag("pubkey"):
|
||||
pubkeys += self.tags.get_tag_all("pubkey")
|
||||
|
||||
now = time.time()
|
||||
if self.locktime and self.locktime < now:
|
||||
logger.trace(f"p2pk locktime ran out ({self.locktime}<{now}).")
|
||||
# check tags if a refund pubkey is present.
|
||||
# If yes, we demand the signature to be from the refund pubkey
|
||||
if self.tags:
|
||||
refund_pubkey = self.tags.get_tag("refund")
|
||||
if refund_pubkey:
|
||||
pubkeys = [refund_pubkey]
|
||||
return pubkeys
|
||||
return []
|
||||
return pubkeys
|
||||
|
||||
|
||||
class P2SHScript(BaseModel):
|
||||
@@ -83,7 +164,7 @@ class Proof(BaseModel):
|
||||
amount: int = 0
|
||||
secret: str = "" # secret or message to be blinded and signed
|
||||
C: str = "" # signature on secret, unblinded by wallet
|
||||
p2pksig: Optional[str] = None # P2PK signature
|
||||
p2pksigs: Union[List[str], None] = [] # P2PK signature
|
||||
p2shscript: Union[P2SHScript, None] = None # P2SH spending condition
|
||||
reserved: Union[
|
||||
None, bool
|
||||
@@ -121,6 +202,7 @@ class BlindedMessage(BaseModel):
|
||||
|
||||
amount: int
|
||||
B_: str # Hex-encoded blinded message
|
||||
p2pksigs: Union[List[str], None] = None # signature for p2pk with SIG_ALL
|
||||
|
||||
|
||||
class BlindedSignature(BaseModel):
|
||||
|
||||
@@ -98,7 +98,7 @@ class WalletSettings(CashuSettings):
|
||||
]
|
||||
)
|
||||
|
||||
timelock_delta_seconds: int = Field(default=86400) # 1 day
|
||||
locktime_delta_seconds: int = Field(default=86400) # 1 day
|
||||
|
||||
|
||||
class Settings(
|
||||
|
||||
@@ -16,6 +16,7 @@ from ..core.base import (
|
||||
Proof,
|
||||
Secret,
|
||||
SecretKind,
|
||||
SigFlags,
|
||||
)
|
||||
from ..core.crypto import b_dhke
|
||||
from ..core.crypto.keys import derive_pubkey, random_hash
|
||||
@@ -223,26 +224,28 @@ class Ledger:
|
||||
C = PublicKey(bytes.fromhex(proof.C), raw=True)
|
||||
return b_dhke.verify(private_key_amount, C, proof.secret)
|
||||
|
||||
def _verify_spending_conditions(self, proof: Proof) -> bool:
|
||||
def _verify_input_spending_conditions(self, proof: Proof) -> bool:
|
||||
"""
|
||||
Verify spending conditions:
|
||||
Condition: P2SH - Witnesses proof.p2shscript
|
||||
Condition: P2PK - Witness: proof.p2pksig
|
||||
Condition: P2PK - Witness: proof.p2pksigs
|
||||
|
||||
"""
|
||||
# P2SH
|
||||
try:
|
||||
secret = Secret.deserialize(proof.secret)
|
||||
logger.trace(f"proof.secret: {proof.secret}")
|
||||
logger.trace(f"secret: {secret}")
|
||||
except Exception as e:
|
||||
# secret is not a spending condition so we treat is a normal secret
|
||||
return True
|
||||
if secret.kind == SecretKind.P2SH:
|
||||
# check if timelock is in the past
|
||||
# check if locktime is in the past
|
||||
now = time.time()
|
||||
if secret.timelock and secret.timelock < now:
|
||||
logger.trace(f"p2sh timelock ran out ({secret.timelock}<{now}).")
|
||||
if secret.locktime and secret.locktime < now:
|
||||
logger.trace(f"p2sh locktime ran out ({secret.locktime}<{now}).")
|
||||
return True
|
||||
logger.trace(f"p2sh timelock still active ({secret.timelock}>{now}).")
|
||||
logger.trace(f"p2sh locktime still active ({secret.locktime}>{now}).")
|
||||
|
||||
if (
|
||||
proof.p2shscript is None
|
||||
@@ -266,58 +269,165 @@ class Ledger:
|
||||
|
||||
# P2PK
|
||||
if secret.kind == SecretKind.P2PK:
|
||||
# check if timelock is in the past
|
||||
now = time.time()
|
||||
if secret.timelock and secret.timelock < now:
|
||||
logger.trace(f"p2pk timelock ran out ({secret.timelock}<{now}).")
|
||||
# check tags if a refund pubkey is present.
|
||||
# If yes, we demand the signature to be from the refund pubkey
|
||||
if secret.tags and secret.tags.get_tag("refund"):
|
||||
signature_pubkey = secret.tags.get_tag("refund")
|
||||
else:
|
||||
# if no refund pubkey is present and the timelock has expired
|
||||
# the token can be spent by anyone
|
||||
return True
|
||||
else:
|
||||
# the timelock is still active, therefore we demand the signature
|
||||
# to be from the pubkey in the data field
|
||||
signature_pubkey = secret.data
|
||||
logger.trace(f"p2pk timelock still active ({secret.timelock}>{now}).")
|
||||
# check if locktime is in the past
|
||||
pubkeys = secret.get_p2pk_pubkey_from_secret()
|
||||
assert len(set(pubkeys)) == len(pubkeys), f"pubkeys must be unique."
|
||||
logger.trace(f"pubkeys: {pubkeys}")
|
||||
# we will get an empty list if the locktime has passed and no refund pubkey is present
|
||||
if not pubkeys:
|
||||
return True
|
||||
|
||||
# now we check the signature
|
||||
if not proof.p2pksig:
|
||||
if not proof.p2pksigs:
|
||||
# no signature present although secret indicates one
|
||||
raise Exception("no p2pk signature in proof.")
|
||||
logger.error(f"no p2pk signatures in proof: {proof.p2pksigs}")
|
||||
raise Exception("no p2pk signatures in proof.")
|
||||
|
||||
# we make sure that there are no duplicate signatures
|
||||
if len(set(proof.p2pksigs)) != len(proof.p2pksigs):
|
||||
raise Exception("p2pk signatures must be unique.")
|
||||
|
||||
# we parse the secret as a P2PK commitment
|
||||
# assert len(proof.secret.split(":")) == 5, "p2pk secret format invalid."
|
||||
|
||||
# check signature proof.p2pksig against pubkey
|
||||
# INPUTS: check signatures proof.p2pksigs against pubkey
|
||||
# we expect the signature to be on the pubkey (=message) itself
|
||||
assert signature_pubkey, "no signature pubkey present."
|
||||
assert verify_p2pk_signature(
|
||||
message=secret.serialize().encode("utf-8"),
|
||||
pubkey=PublicKey(bytes.fromhex(signature_pubkey), raw=True),
|
||||
signature=bytes.fromhex(proof.p2pksig),
|
||||
), "p2pk signature invalid."
|
||||
logger.trace(proof.p2pksig)
|
||||
logger.trace("p2pk signature valid.")
|
||||
n_sigs_required = secret.n_sigs or 1
|
||||
assert n_sigs_required > 0, "n_sigs must be positive."
|
||||
|
||||
# check if enough signatures are present
|
||||
assert (
|
||||
len(proof.p2pksigs) >= n_sigs_required
|
||||
), f"not enough signatures provided: {len(proof.p2pksigs)} < {n_sigs_required}."
|
||||
|
||||
n_valid_sigs_per_output = 0
|
||||
# loop over all signatures in output
|
||||
for input_sig in proof.p2pksigs:
|
||||
for pubkey in pubkeys:
|
||||
logger.trace(f"verifying signature {input_sig} by pubkey {pubkey}.")
|
||||
logger.trace(f"Message: {secret.serialize().encode('utf-8')}")
|
||||
if verify_p2pk_signature(
|
||||
message=secret.serialize().encode("utf-8"),
|
||||
pubkey=PublicKey(bytes.fromhex(pubkey), raw=True),
|
||||
signature=bytes.fromhex(input_sig),
|
||||
):
|
||||
n_valid_sigs_per_output += 1
|
||||
logger.trace(
|
||||
f"p2pk signature on input is valid: {input_sig} on {pubkey}."
|
||||
)
|
||||
continue
|
||||
else:
|
||||
logger.trace(
|
||||
f"p2pk signature on input is invalid: {input_sig} on {pubkey}."
|
||||
)
|
||||
# check if we have enough valid signatures
|
||||
assert n_valid_sigs_per_output, "no valid signature provided for input."
|
||||
assert (
|
||||
n_valid_sigs_per_output >= n_sigs_required
|
||||
), f"signature threshold not met. {n_valid_sigs_per_output} < {n_sigs_required}."
|
||||
logger.trace(
|
||||
f"{n_valid_sigs_per_output} of {n_sigs_required} valid signatures found."
|
||||
)
|
||||
|
||||
logger.trace(proof.p2pksigs)
|
||||
logger.trace("p2pk signature on inputs is valid.")
|
||||
|
||||
return True
|
||||
|
||||
# no spending contition
|
||||
return True
|
||||
|
||||
def _verify_outputs(
|
||||
self, total: int, amount: int, outputs: List[BlindedMessage]
|
||||
def _verify_output_spending_conditions(
|
||||
self, proofs: List[Proof], outputs: List[BlindedMessage]
|
||||
) -> bool:
|
||||
"""Verifies the expected split was correctly computed"""
|
||||
frst_amt, scnd_amt = total - amount, amount # we have two amounts to split to
|
||||
frst_outputs = amount_split(frst_amt)
|
||||
scnd_outputs = amount_split(scnd_amt)
|
||||
expected = frst_outputs + scnd_outputs
|
||||
given = [o.amount for o in outputs]
|
||||
return given == expected
|
||||
"""
|
||||
Verify spending conditions:
|
||||
Condition: P2PK - Witness: output.p2pksigs
|
||||
|
||||
"""
|
||||
# P2SH
|
||||
pubkeys_per_proof = []
|
||||
n_sigs = []
|
||||
for proof in proofs:
|
||||
try:
|
||||
secret = Secret.deserialize(proof.secret)
|
||||
# get all p2pk pubkeys from secrets
|
||||
pubkeys_per_proof.append(secret.get_p2pk_pubkey_from_secret())
|
||||
# get signature threshold from secrets
|
||||
n_sigs.append(secret.n_sigs)
|
||||
except Exception as e:
|
||||
# secret is not a spending condition so we treat is a normal secret
|
||||
return True
|
||||
# for all proofs all pubkeys must be the same
|
||||
assert (
|
||||
len(set([tuple(pubs_output) for pubs_output in pubkeys_per_proof])) == 1
|
||||
), "pubkeys in all proofs must match."
|
||||
pubkeys = pubkeys_per_proof[0]
|
||||
if not pubkeys:
|
||||
# no pubkeys present
|
||||
return True
|
||||
|
||||
logger.trace(f"pubkeys: {pubkeys}")
|
||||
# TODO: add limit for maximum number of pubkeys
|
||||
|
||||
# for all proofs all n_sigs must be the same
|
||||
assert len(set(n_sigs)) == 1, "n_sigs in all proofs must match."
|
||||
n_sigs_required = n_sigs[0] or 1
|
||||
|
||||
# first we check if all secrets are P2PK
|
||||
if not all(
|
||||
[Secret.deserialize(p.secret).kind == SecretKind.P2PK for p in proofs]
|
||||
):
|
||||
# not all secrets are P2PK
|
||||
return True
|
||||
|
||||
# now we check if any of the secrets has sigflag==SIG_ALL
|
||||
if not any(
|
||||
[Secret.deserialize(p.secret).sigflag == SigFlags.SIG_ALL for p in proofs]
|
||||
):
|
||||
# no secret has sigflag==SIG_ALL
|
||||
return True
|
||||
|
||||
# loop over all outputs and check if the signatures are valid for pubkeys with a threshold of n_sig
|
||||
for output in outputs:
|
||||
# we expect the signature to be on the pubkey (=message) itself
|
||||
assert output.p2pksigs, "no signatures in output."
|
||||
# TODO: add limit for maximum number of signatures
|
||||
|
||||
# we check whether any signature is duplicate
|
||||
assert len(set(output.p2pksigs)) == len(
|
||||
output.p2pksigs
|
||||
), "duplicate signatures in output."
|
||||
|
||||
n_valid_sigs_per_output = 0
|
||||
# loop over all signatures in output
|
||||
for output_sig in output.p2pksigs:
|
||||
for pubkey in pubkeys:
|
||||
if verify_p2pk_signature(
|
||||
message=output.B_.encode("utf-8"),
|
||||
pubkey=PublicKey(bytes.fromhex(pubkey), raw=True),
|
||||
signature=bytes.fromhex(output_sig),
|
||||
):
|
||||
n_valid_sigs_per_output += 1
|
||||
assert n_valid_sigs_per_output, "no valid signature provided for output."
|
||||
assert (
|
||||
n_valid_sigs_per_output >= n_sigs_required
|
||||
), f"signature threshold not met. {n_valid_sigs_per_output} < {n_sigs_required}."
|
||||
logger.trace(
|
||||
f"{n_valid_sigs_per_output} of {n_sigs_required} valid signatures found."
|
||||
)
|
||||
logger.trace(output.p2pksigs)
|
||||
logger.trace("p2pk signatures on output is valid.")
|
||||
|
||||
return True
|
||||
|
||||
def _verify_input_output_amounts(
|
||||
self, inputs: List[Proof], outputs: List[BlindedMessage]
|
||||
) -> bool:
|
||||
"""Verifies that inputs have at least the same amount as outputs"""
|
||||
input_amount = sum([p.amount for p in inputs])
|
||||
output_amount = sum([o.amount for o in outputs])
|
||||
return input_amount >= output_amount
|
||||
|
||||
def _verify_no_duplicate_proofs(self, proofs: List[Proof]) -> bool:
|
||||
secrets = [p.secret for p in proofs]
|
||||
@@ -565,11 +675,14 @@ class Ledger:
|
||||
if p.secret == pp.secret:
|
||||
raise Exception("proofs are pending.")
|
||||
|
||||
async def _verify_proofs(self, proofs: List[Proof]):
|
||||
"""Checks a series of criteria for the verification of proofs.
|
||||
async def _verify_proofs_and_outputs(
|
||||
self, proofs: List[Proof], outputs: Optional[List[BlindedMessage]] = None
|
||||
):
|
||||
"""Checks all proofs and outputs for validity.
|
||||
|
||||
Args:
|
||||
proofs (List[Proof]): List of proofs to check.
|
||||
outputs (Optional[List[BlindedMessage]], optional): List of outputs to check. Must be provided for /split but not for /melt. Defaults to None.
|
||||
|
||||
Raises:
|
||||
Exception: Scripts did not validate.
|
||||
@@ -577,19 +690,35 @@ class Ledger:
|
||||
Exception: Duplicate proofs provided.
|
||||
Exception: BDHKE verification failed.
|
||||
"""
|
||||
# Verify scripts
|
||||
if not all([self._verify_spending_conditions(p) for p in proofs]):
|
||||
raise Exception("script validation failed.")
|
||||
# Verify inputs
|
||||
|
||||
# Verify secret criteria
|
||||
if not all([self._verify_secret_criteria(p) for p in proofs]):
|
||||
raise Exception("secrets do not match criteria.")
|
||||
# verify that only unique proofs were used
|
||||
if not self._verify_no_duplicate_proofs(proofs):
|
||||
raise Exception("duplicate proofs.")
|
||||
# Verify proofs
|
||||
# Verify input spending conditions
|
||||
if not all([self._verify_input_spending_conditions(p) for p in proofs]):
|
||||
raise Exception("validation of input spending conditions failed.")
|
||||
# Verify ecash signatures
|
||||
if not all([self._verify_proof_bdhke(p) for p in proofs]):
|
||||
raise Exception("could not verify proofs.")
|
||||
|
||||
if not outputs:
|
||||
return
|
||||
|
||||
# Verify outputs
|
||||
|
||||
# verify that only unique outputs were used
|
||||
if not self._verify_no_duplicate_outputs(outputs):
|
||||
raise Exception("duplicate promises.")
|
||||
if not self._verify_input_output_amounts(proofs, outputs):
|
||||
raise Exception("input amounts less than output.")
|
||||
# Verify output spending conditions
|
||||
if outputs and not self._verify_output_spending_conditions(proofs, outputs):
|
||||
raise Exception("validation of output spending conditions failed.")
|
||||
|
||||
async def _generate_change_promises(
|
||||
self,
|
||||
total_provided: int,
|
||||
@@ -767,7 +896,7 @@ class Ledger:
|
||||
await self._set_proofs_pending(proofs)
|
||||
|
||||
try:
|
||||
await self._verify_proofs(proofs)
|
||||
await self._verify_proofs_and_outputs(proofs)
|
||||
logger.trace("verified proofs")
|
||||
|
||||
total_provided = sum_proofs(proofs)
|
||||
@@ -909,16 +1038,9 @@ class Ledger:
|
||||
if amount > total:
|
||||
raise Exception("split amount is higher than the total sum.")
|
||||
|
||||
logger.trace("verifying proofs: _verify_proofs")
|
||||
await self._verify_proofs(proofs)
|
||||
logger.trace(f"verified proofs")
|
||||
# verify that only unique outputs were used
|
||||
if not self._verify_no_duplicate_outputs(outputs):
|
||||
raise Exception("duplicate promises.")
|
||||
# verify that outputs have the correct amount
|
||||
if not self._verify_outputs(total, amount, outputs):
|
||||
raise Exception("split of promises is not as expected.")
|
||||
logger.trace(f"verified outputs")
|
||||
logger.trace("verifying proofs: _verify_proofs_and_outputs")
|
||||
await self._verify_proofs_and_outputs(proofs, outputs)
|
||||
logger.trace(f"verified proofs and outputs")
|
||||
# Mark proofs as used and prepare new promises
|
||||
logger.trace(f"invalidating proofs")
|
||||
await self._invalidate_proofs(proofs)
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
import base64
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime, timedelta
|
||||
from typing import List, Optional
|
||||
|
||||
import click
|
||||
from loguru import logger
|
||||
|
||||
from ..core.base import Secret, SecretKind, TokenV1, TokenV2, TokenV3, TokenV3Token
|
||||
from ..core.base import TokenV1, TokenV2, TokenV3, TokenV3Token
|
||||
from ..core.helpers import sum_proofs
|
||||
from ..core.migrations import migrate_databases
|
||||
from ..core.settings import settings
|
||||
@@ -159,19 +158,21 @@ async def send(
|
||||
if not lock.startswith("P2SH:") and not lock.startswith("P2PK:"):
|
||||
raise Exception("Error: lock has to start with P2SH: or P2PK:")
|
||||
# we add a time lock to the P2PK lock by appending the current unix time + 14 days
|
||||
# we use datetime because it's easier to read
|
||||
if lock.startswith("P2PK:") or lock.startswith("P2SH:"):
|
||||
logger.debug(f"Locking token to: {lock}")
|
||||
logger.debug(
|
||||
f"Adding a time lock of {settings.timelock_delta_seconds} seconds."
|
||||
f"Adding a time lock of {settings.locktime_delta_seconds} seconds."
|
||||
)
|
||||
if lock.startswith("P2SH:"):
|
||||
secret_lock = await wallet.create_p2sh_lock(
|
||||
lock.split(":")[1], timelock=settings.timelock_delta_seconds
|
||||
lock.split(":")[1], locktime=settings.locktime_delta_seconds
|
||||
)
|
||||
elif lock.startswith("P2PK:"):
|
||||
secret_lock = await wallet.create_p2pk_lock(
|
||||
lock.split(":")[1], timelock=settings.timelock_delta_seconds
|
||||
lock.split(":")[1],
|
||||
locktime_seconds=settings.locktime_delta_seconds,
|
||||
sig_all=True,
|
||||
n_sigs=1,
|
||||
)
|
||||
|
||||
await wallet.load_proofs()
|
||||
|
||||
@@ -33,6 +33,7 @@ from ..core.base import (
|
||||
Proof,
|
||||
Secret,
|
||||
SecretKind,
|
||||
SigFlags,
|
||||
Tags,
|
||||
TokenV2,
|
||||
TokenV2Mint,
|
||||
@@ -456,7 +457,7 @@ class LedgerAPI:
|
||||
# construct payload
|
||||
def _splitrequest_include_fields(proofs):
|
||||
"""strips away fields from the model that aren't necessary for the /split"""
|
||||
proofs_include = {"id", "amount", "secret", "C", "p2shscript", "p2pksig"}
|
||||
proofs_include = {"id", "amount", "secret", "C", "p2shscript", "p2pksigs"}
|
||||
return {
|
||||
"amount": ...,
|
||||
"outputs": ...,
|
||||
@@ -637,7 +638,71 @@ class Wallet(LedgerAPI):
|
||||
self.proofs += proofs
|
||||
return proofs
|
||||
|
||||
async def add_witnesses_to_proofs(self, proofs: List[Proof]):
|
||||
async def add_p2pk_witnesses_to_outputs(
|
||||
self, outputs: List[BlindedMessage]
|
||||
) -> List[BlindedMessage]:
|
||||
p2pk_signatures = await self.sign_p2pk_outputs(outputs)
|
||||
for o, s in zip(outputs, p2pk_signatures):
|
||||
o.p2pksigs = [s]
|
||||
return outputs
|
||||
|
||||
async def add_witnesses_to_outputs(
|
||||
self, proofs: List[Proof], outputs: List[BlindedMessage]
|
||||
) -> List[BlindedMessage]:
|
||||
"""Adds witnesses to outputs if the inputs (proofs) indicate an appropriate signature flag
|
||||
|
||||
Args:
|
||||
proofs (List[Proof]): _description_
|
||||
outputs (List[BlindedMessage]): _description_
|
||||
"""
|
||||
# first we check whether all tokens have serialized secrets as their secret
|
||||
try:
|
||||
for p in proofs:
|
||||
Secret.deserialize(p.secret)
|
||||
except:
|
||||
# if not, we do not add witnesses (treat as regular token secret)
|
||||
return outputs
|
||||
|
||||
# if any of the proofs provided require SIG_ALL, we must provide it
|
||||
if any(
|
||||
[Secret.deserialize(p.secret).sigflag == SigFlags.SIG_ALL for p in proofs]
|
||||
):
|
||||
# p2pk_signatures = await self.sign_p2pk_outputs(outputs)
|
||||
# for o, s in zip(outputs, p2pk_signatures):
|
||||
# o.p2pksigs = [s]
|
||||
outputs = await self.add_p2pk_witnesses_to_outputs(outputs)
|
||||
return outputs
|
||||
|
||||
async def add_p2sh_witnesses_to_proofs(self, proofs: List[Proof]) -> List[Proof]:
|
||||
# Quirk: we use a single P2SH script and signature pair for all tokens in proofs
|
||||
address = Secret.deserialize(proofs[0].secret).data
|
||||
p2shscripts = await get_unused_locks(address, db=self.db)
|
||||
assert len(p2shscripts) == 1, Exception("lock not found.")
|
||||
p2sh_script, p2sh_signature = (
|
||||
p2shscripts[0].script,
|
||||
p2shscripts[0].signature,
|
||||
)
|
||||
logger.debug(f"Unlock script: {p2sh_script} signature: {p2sh_signature}")
|
||||
|
||||
# attach unlock scripts to proofs
|
||||
for p in proofs:
|
||||
p.p2shscript = P2SHScript(script=p2sh_script, signature=p2sh_signature)
|
||||
return proofs
|
||||
|
||||
async def add_p2pk_witnesses_to_proofs(self, proofs: List[Proof]) -> List[Proof]:
|
||||
p2pk_signatures = await self.sign_p2pk_proofs(proofs)
|
||||
logger.debug(f"Unlock signatures for {len(proofs)} proofs: {p2pk_signatures}")
|
||||
logger.debug(f"Proofs: {proofs}")
|
||||
# attach unlock signatures to proofs
|
||||
assert len(proofs) == len(p2pk_signatures), "wrong number of signatures"
|
||||
for p, s in zip(proofs, p2pk_signatures):
|
||||
if p.p2pksigs:
|
||||
p.p2pksigs.append(s)
|
||||
else:
|
||||
p.p2pksigs = [s]
|
||||
return proofs
|
||||
|
||||
async def add_witnesses_to_proofs(self, proofs: List[Proof]) -> List[Proof]:
|
||||
"""Adds witnesses to proofs for P2SH or P2PK redemption."""
|
||||
|
||||
p2sh_script, p2sh_signature = None, None
|
||||
@@ -652,33 +717,18 @@ class Wallet(LedgerAPI):
|
||||
except:
|
||||
# if not, we do not add witnesses (treat as regular token secret)
|
||||
return proofs
|
||||
logger.debug(f"Spending conditions detected.")
|
||||
# P2SH scripts
|
||||
if all([Secret.deserialize(p.secret).kind == SecretKind.P2SH for p in proofs]):
|
||||
# Quirk: we use a single P2SH script and signature pair for all tokens in proofs
|
||||
address = Secret.deserialize(proofs[0].secret).data
|
||||
p2shscripts = await get_unused_locks(address, db=self.db)
|
||||
assert len(p2shscripts) == 1, Exception("lock not found.")
|
||||
p2sh_script, p2sh_signature = (
|
||||
p2shscripts[0].script,
|
||||
p2shscripts[0].signature,
|
||||
)
|
||||
logger.debug(f"Unlock script: {p2sh_script} signature: {p2sh_signature}")
|
||||
|
||||
# attach unlock scripts to proofs
|
||||
for p in proofs:
|
||||
p.p2shscript = P2SHScript(script=p2sh_script, signature=p2sh_signature)
|
||||
logger.debug(f"P2SH redemption detected.")
|
||||
proofs = await self.add_p2sh_witnesses_to_proofs(proofs)
|
||||
|
||||
# P2PK signatures
|
||||
elif all(
|
||||
[Secret.deserialize(p.secret).kind == SecretKind.P2PK for p in proofs]
|
||||
):
|
||||
p2pk_signatures = await self.sign_p2pk_proofs(proofs)
|
||||
logger.debug(f"Unlock signature: {p2pk_signatures}")
|
||||
|
||||
# attach unlock signatures to proofs
|
||||
assert len(proofs) == len(p2pk_signatures), "wrong number of signatures"
|
||||
for p, s in zip(proofs, p2pk_signatures):
|
||||
p.p2pksig = s
|
||||
logger.debug(f"P2PK redemption detected.")
|
||||
proofs = await self.add_p2pk_witnesses_to_proofs(proofs)
|
||||
|
||||
return proofs
|
||||
|
||||
@@ -731,6 +781,9 @@ class Wallet(LedgerAPI):
|
||||
# construct outputs
|
||||
outputs, rs = self._construct_outputs(amounts, secrets)
|
||||
|
||||
# potentially add witnesses to outputs based on what requirement the proofs indicate
|
||||
outputs = await self.add_witnesses_to_outputs(proofs, outputs)
|
||||
|
||||
# Call /split API
|
||||
promises_fst, promises_snd = await super().split(
|
||||
proofs, outputs, secrets, rs, amount, secret_lock
|
||||
@@ -808,7 +861,7 @@ class Wallet(LedgerAPI):
|
||||
|
||||
@staticmethod
|
||||
def _get_proofs_per_keyset(proofs: List[Proof]):
|
||||
return {key: list(group) for key, group in groupby(proofs, lambda p: p.id)}
|
||||
return {key: list(group) for key, group in groupby(proofs, lambda p: p.id)} # type: ignore
|
||||
|
||||
async def _get_proofs_per_minturl(self, proofs: List[Proof]):
|
||||
ret = {}
|
||||
@@ -1087,30 +1140,43 @@ class Wallet(LedgerAPI):
|
||||
async def create_p2pk_lock(
|
||||
self,
|
||||
pubkey: str,
|
||||
timelock: Optional[int] = None,
|
||||
locktime_seconds: Optional[int] = None,
|
||||
tags: Optional[Tags] = None,
|
||||
):
|
||||
sig_all: bool = False,
|
||||
n_sigs: int = 1,
|
||||
) -> Secret:
|
||||
logger.debug(f"Provided tags: {tags}")
|
||||
if not tags:
|
||||
tags = Tags()
|
||||
logger.debug(f"Before tags: {tags}")
|
||||
if locktime_seconds:
|
||||
tags["locktime"] = str(
|
||||
int((datetime.now() + timedelta(seconds=locktime_seconds)).timestamp())
|
||||
)
|
||||
tags["sigflag"] = SigFlags.SIG_ALL if sig_all else SigFlags.SIG_INPUTS
|
||||
if n_sigs > 1:
|
||||
tags["n_sigs"] = str(n_sigs)
|
||||
logger.debug(f"After tags: {tags}")
|
||||
return Secret(
|
||||
kind=SecretKind.P2PK,
|
||||
data=pubkey,
|
||||
timelock=int((datetime.now() + timedelta(seconds=timelock)).timestamp())
|
||||
if timelock
|
||||
else None,
|
||||
tags=tags,
|
||||
)
|
||||
|
||||
async def create_p2sh_lock(
|
||||
self,
|
||||
address: str,
|
||||
timelock: Optional[int] = None,
|
||||
tags: Optional[Tags] = None,
|
||||
):
|
||||
locktime: Optional[int] = None,
|
||||
tags: Tags = Tags(),
|
||||
) -> Secret:
|
||||
if locktime:
|
||||
tags["locktime"] = str(
|
||||
(datetime.now() + timedelta(seconds=locktime)).timestamp()
|
||||
)
|
||||
|
||||
return Secret(
|
||||
kind=SecretKind.P2SH,
|
||||
data=address,
|
||||
timelock=int((datetime.now() + timedelta(seconds=timelock)).timestamp())
|
||||
if timelock
|
||||
else None,
|
||||
tags=tags,
|
||||
)
|
||||
|
||||
@@ -1120,13 +1186,36 @@ class Wallet(LedgerAPI):
|
||||
), "No private key set in settings. Set NOSTR_PRIVATE_KEY in .env"
|
||||
private_key = self.private_key
|
||||
assert private_key.pubkey
|
||||
return [
|
||||
logger.trace(
|
||||
f"Signing with private key: {private_key.serialize()} public key: {private_key.pubkey.serialize().hex()}"
|
||||
)
|
||||
for proof in proofs:
|
||||
logger.trace(f"Signing proof: {proof}")
|
||||
logger.trace(f"Signing message: {proof.secret}")
|
||||
|
||||
signatures = [
|
||||
sign_p2pk_sign(
|
||||
message=proof.secret.encode("utf-8"),
|
||||
private_key=private_key,
|
||||
)
|
||||
for proof in proofs
|
||||
]
|
||||
logger.debug(f"Signatures: {signatures}")
|
||||
return signatures
|
||||
|
||||
async def sign_p2pk_outputs(self, outputs: List[BlindedMessage]) -> List[str]:
|
||||
assert (
|
||||
self.private_key
|
||||
), "No private key set in settings. Set NOSTR_PRIVATE_KEY in .env"
|
||||
private_key = self.private_key
|
||||
assert private_key.pubkey
|
||||
return [
|
||||
sign_p2pk_sign(
|
||||
message=output.B_.encode("utf-8"),
|
||||
private_key=private_key,
|
||||
)
|
||||
for output in outputs
|
||||
]
|
||||
|
||||
# ---------- BALANCE CHECKS ----------
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import asyncio
|
||||
import copy
|
||||
import secrets
|
||||
from typing import List
|
||||
|
||||
@@ -46,6 +47,7 @@ async def wallet1(mint):
|
||||
async def wallet2(mint):
|
||||
wallet2 = Wallet2(SERVER_ENDPOINT, "data/wallet2", "wallet2")
|
||||
await migrate_databases(wallet2.db, migrations)
|
||||
wallet2.private_key = PrivateKey(secrets.token_bytes(32), raw=True)
|
||||
await wallet2.load_mint()
|
||||
wallet2.status()
|
||||
yield wallet2
|
||||
@@ -251,100 +253,6 @@ async def test_create_p2pk_pubkey(wallet1: Wallet):
|
||||
PublicKey(bytes.fromhex(pubkey), raw=True)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_p2pk(wallet1: Wallet, wallet2: Wallet):
|
||||
await wallet1.mint(64)
|
||||
pubkey_wallet2 = await wallet2.create_p2pk_pubkey()
|
||||
# p2pk test
|
||||
secret_lock = await wallet1.create_p2pk_lock(pubkey_wallet2) # sender side
|
||||
_, send_proofs = await wallet1.split_to_send(
|
||||
wallet1.proofs, 8, secret_lock=secret_lock
|
||||
)
|
||||
await wallet2.redeem(send_proofs)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_p2pk_receive_with_wrong_private_key(wallet1: Wallet, wallet2: Wallet):
|
||||
await wallet1.mint(64)
|
||||
pubkey_wallet2 = await wallet2.create_p2pk_pubkey() # receiver side
|
||||
# sender side
|
||||
secret_lock = await wallet1.create_p2pk_lock(pubkey_wallet2) # sender side
|
||||
_, send_proofs = await wallet1.split_to_send(
|
||||
wallet1.proofs, 8, secret_lock=secret_lock
|
||||
)
|
||||
# receiver side: wrong private key
|
||||
wallet1.private_key = PrivateKey() # wrong private key
|
||||
await assert_err(wallet1.redeem(send_proofs), "Mint Error: p2pk signature invalid.")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_p2pk_short_timelock_receive_with_wrong_private_key(
|
||||
wallet1: Wallet, wallet2: Wallet
|
||||
):
|
||||
await wallet1.mint(64)
|
||||
pubkey_wallet2 = await wallet2.create_p2pk_pubkey() # receiver side
|
||||
# sender side
|
||||
secret_lock = await wallet1.create_p2pk_lock(
|
||||
pubkey_wallet2, timelock=4
|
||||
) # sender side
|
||||
_, send_proofs = await wallet1.split_to_send(
|
||||
wallet1.proofs, 8, secret_lock=secret_lock
|
||||
)
|
||||
# receiver side: wrong private key
|
||||
wallet1.private_key = PrivateKey() # wrong private key
|
||||
await assert_err(wallet1.redeem(send_proofs), "Mint Error: p2pk signature invalid.")
|
||||
await asyncio.sleep(6)
|
||||
await wallet1.redeem(send_proofs)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_p2pk_timelock_with_refund_pubkey(wallet1: Wallet, wallet2: Wallet):
|
||||
await wallet1.mint(64)
|
||||
pubkey_wallet2 = await wallet2.create_p2pk_pubkey() # receiver side
|
||||
# sender side
|
||||
garbage_pubkey = PrivateKey().pubkey
|
||||
assert garbage_pubkey
|
||||
secret_lock = await wallet1.create_p2pk_lock(
|
||||
garbage_pubkey.serialize().hex(), # create lock to unspendable pubkey
|
||||
timelock=4, # timelock
|
||||
tags=Tags(__root__=[["refund", pubkey_wallet2]]), # refund pubkey
|
||||
) # sender side
|
||||
_, send_proofs = await wallet1.split_to_send(
|
||||
wallet1.proofs, 8, secret_lock=secret_lock
|
||||
)
|
||||
# receiver side: can't redeem since we used a garbage pubkey
|
||||
await assert_err(wallet2.redeem(send_proofs), "Mint Error: p2pk signature invalid.")
|
||||
await asyncio.sleep(6)
|
||||
# we can now redeem because of the refund timelock
|
||||
await wallet2.redeem(send_proofs)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_p2pk_timelock_with_wrong_refund_pubkey(wallet1: Wallet, wallet2: Wallet):
|
||||
await wallet1.mint(64)
|
||||
pubkey_wallet2 = await wallet2.create_p2pk_pubkey() # receiver side
|
||||
# sender side
|
||||
garbage_pubkey = PrivateKey().pubkey
|
||||
garbage_pubkey_2 = PrivateKey().pubkey
|
||||
assert garbage_pubkey
|
||||
assert garbage_pubkey_2
|
||||
secret_lock = await wallet1.create_p2pk_lock(
|
||||
garbage_pubkey.serialize().hex(), # create lock to unspendable pubkey
|
||||
timelock=4, # timelock
|
||||
tags=Tags(
|
||||
__root__=[["refund", garbage_pubkey_2.serialize().hex()]]
|
||||
), # refund pubkey
|
||||
) # sender side
|
||||
_, send_proofs = await wallet1.split_to_send(
|
||||
wallet1.proofs, 8, secret_lock=secret_lock
|
||||
)
|
||||
# receiver side: can't redeem since we used a garbage pubkey
|
||||
await assert_err(wallet2.redeem(send_proofs), "Mint Error: p2pk signature invalid.")
|
||||
await asyncio.sleep(6)
|
||||
# we still can't redeem it because we used garbage_pubkey_2 as a refund pubkey
|
||||
await assert_err(wallet2.redeem(send_proofs), "Mint Error: p2pk signature invalid.")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_p2sh(wallet1: Wallet, wallet2: Wallet):
|
||||
await wallet1.mint(64)
|
||||
|
||||
343
tests/test_wallet_p2pk.py
Normal file
343
tests/test_wallet_p2pk.py
Normal file
@@ -0,0 +1,343 @@
|
||||
import asyncio
|
||||
import copy
|
||||
import secrets
|
||||
from typing import List
|
||||
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
|
||||
from cashu.core.base import Proof, Secret, SecretKind, SigFlags, Tags
|
||||
from cashu.core.crypto.secp import PrivateKey, PublicKey
|
||||
from cashu.core.helpers import async_unwrap, sum_proofs
|
||||
from cashu.core.migrations import migrate_databases
|
||||
from cashu.core.settings import settings
|
||||
from cashu.wallet import migrations
|
||||
from cashu.wallet.wallet import Wallet
|
||||
from cashu.wallet.wallet import Wallet as Wallet1
|
||||
from cashu.wallet.wallet import Wallet as Wallet2
|
||||
from tests.conftest import SERVER_ENDPOINT, mint
|
||||
|
||||
|
||||
async def assert_err(f, msg):
|
||||
"""Compute f() and expect an error message 'msg'."""
|
||||
try:
|
||||
await f
|
||||
except Exception as exc:
|
||||
if str(exc.args[0]) != msg:
|
||||
raise Exception(f"Expected error: {msg}, got: {exc.args[0]}")
|
||||
return
|
||||
raise Exception(f"Expected error: {msg}, got no error")
|
||||
|
||||
|
||||
def assert_amt(proofs: List[Proof], expected: int):
|
||||
"""Assert amounts the proofs contain."""
|
||||
assert [p.amount for p in proofs] == expected
|
||||
|
||||
|
||||
@pytest_asyncio.fixture(scope="function")
|
||||
async def wallet1(mint):
|
||||
wallet1 = Wallet1(SERVER_ENDPOINT, "data/wallet_p2pk_1", "wallet1")
|
||||
await migrate_databases(wallet1.db, migrations)
|
||||
await wallet1.load_mint()
|
||||
wallet1.status()
|
||||
yield wallet1
|
||||
|
||||
|
||||
@pytest_asyncio.fixture(scope="function")
|
||||
async def wallet2(mint):
|
||||
wallet2 = Wallet2(SERVER_ENDPOINT, "data/wallet_p2pk_2", "wallet2")
|
||||
await migrate_databases(wallet2.db, migrations)
|
||||
wallet2.private_key = PrivateKey(secrets.token_bytes(32), raw=True)
|
||||
await wallet2.load_mint()
|
||||
wallet2.status()
|
||||
yield wallet2
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_p2pk_pubkey(wallet1: Wallet):
|
||||
await wallet1.mint(64)
|
||||
pubkey = await wallet1.create_p2pk_pubkey()
|
||||
PublicKey(bytes.fromhex(pubkey), raw=True)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_p2pk(wallet1: Wallet, wallet2: Wallet):
|
||||
await wallet1.mint(64)
|
||||
pubkey_wallet2 = await wallet2.create_p2pk_pubkey()
|
||||
# p2pk test
|
||||
secret_lock = await wallet1.create_p2pk_lock(pubkey_wallet2) # sender side
|
||||
_, send_proofs = await wallet1.split_to_send(
|
||||
wallet1.proofs, 8, secret_lock=secret_lock
|
||||
)
|
||||
await wallet2.redeem(send_proofs)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_p2pk_receive_with_wrong_private_key(wallet1: Wallet, wallet2: Wallet):
|
||||
await wallet1.mint(64)
|
||||
pubkey_wallet2 = await wallet2.create_p2pk_pubkey() # receiver side
|
||||
# sender side
|
||||
secret_lock = await wallet1.create_p2pk_lock(pubkey_wallet2) # sender side
|
||||
_, send_proofs = await wallet1.split_to_send(
|
||||
wallet1.proofs, 8, secret_lock=secret_lock
|
||||
)
|
||||
# receiver side: wrong private key
|
||||
wallet2.private_key = PrivateKey() # wrong private key
|
||||
await assert_err(
|
||||
wallet2.redeem(send_proofs),
|
||||
"Mint Error: no valid signature provided for input.",
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_p2pk_short_locktime_receive_with_wrong_private_key(
|
||||
wallet1: Wallet, wallet2: Wallet
|
||||
):
|
||||
await wallet1.mint(64)
|
||||
pubkey_wallet2 = await wallet2.create_p2pk_pubkey() # receiver side
|
||||
# sender side
|
||||
secret_lock = await wallet1.create_p2pk_lock(
|
||||
pubkey_wallet2, locktime_seconds=4
|
||||
) # sender side
|
||||
_, send_proofs = await wallet1.split_to_send(
|
||||
wallet1.proofs, 8, secret_lock=secret_lock
|
||||
)
|
||||
# receiver side: wrong private key
|
||||
wallet2.private_key = PrivateKey() # wrong private key
|
||||
send_proofs_copy = copy.deepcopy(send_proofs)
|
||||
await assert_err(
|
||||
wallet2.redeem(send_proofs),
|
||||
"Mint Error: no valid signature provided for input.",
|
||||
)
|
||||
await asyncio.sleep(6)
|
||||
# should succeed because even with the wrong private key we
|
||||
# can redeem the tokens after the locktime
|
||||
await wallet2.redeem(send_proofs_copy)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_p2pk_locktime_with_refund_pubkey(wallet1: Wallet, wallet2: Wallet):
|
||||
await wallet1.mint(64)
|
||||
pubkey_wallet2 = await wallet2.create_p2pk_pubkey() # receiver side
|
||||
# sender side
|
||||
garbage_pubkey = PrivateKey().pubkey
|
||||
assert garbage_pubkey
|
||||
secret_lock = await wallet1.create_p2pk_lock(
|
||||
garbage_pubkey.serialize().hex(), # create lock to unspendable pubkey
|
||||
locktime_seconds=4, # locktime
|
||||
tags=Tags([["refund", pubkey_wallet2]]), # refund pubkey
|
||||
) # sender side
|
||||
_, send_proofs = await wallet1.split_to_send(
|
||||
wallet1.proofs, 8, secret_lock=secret_lock
|
||||
)
|
||||
send_proofs_copy = copy.deepcopy(send_proofs)
|
||||
# receiver side: can't redeem since we used a garbage pubkey
|
||||
await assert_err(
|
||||
wallet2.redeem(send_proofs),
|
||||
"Mint Error: no valid signature provided for input.",
|
||||
)
|
||||
await asyncio.sleep(6)
|
||||
# we can now redeem because of the refund locktime
|
||||
await wallet2.redeem(send_proofs_copy)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_p2pk_locktime_with_wrong_refund_pubkey(wallet1: Wallet, wallet2: Wallet):
|
||||
await wallet1.mint(64)
|
||||
pubkey_wallet2 = await wallet2.create_p2pk_pubkey() # receiver side
|
||||
# sender side
|
||||
garbage_pubkey = PrivateKey().pubkey
|
||||
garbage_pubkey_2 = PrivateKey().pubkey
|
||||
assert garbage_pubkey
|
||||
assert garbage_pubkey_2
|
||||
secret_lock = await wallet1.create_p2pk_lock(
|
||||
garbage_pubkey.serialize().hex(), # create lock to unspendable pubkey
|
||||
locktime_seconds=4, # locktime
|
||||
tags=Tags([["refund", garbage_pubkey_2.serialize().hex()]]), # refund pubkey
|
||||
) # sender side
|
||||
_, send_proofs = await wallet1.split_to_send(
|
||||
wallet1.proofs, 8, secret_lock=secret_lock
|
||||
)
|
||||
send_proofs_copy = copy.deepcopy(send_proofs)
|
||||
# receiver side: can't redeem since we used a garbage pubkey
|
||||
await assert_err(
|
||||
wallet2.redeem(send_proofs),
|
||||
"Mint Error: no valid signature provided for input.",
|
||||
)
|
||||
await asyncio.sleep(6)
|
||||
# we still can't redeem it because we used garbage_pubkey_2 as a refund pubkey
|
||||
await assert_err(
|
||||
wallet2.redeem(send_proofs_copy),
|
||||
"Mint Error: no valid signature provided for input.",
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_p2pk_multisig_2_of_2(wallet1: Wallet, wallet2: Wallet):
|
||||
await wallet1.mint(64)
|
||||
pubkey_wallet1 = await wallet1.create_p2pk_pubkey()
|
||||
pubkey_wallet2 = await wallet2.create_p2pk_pubkey()
|
||||
assert pubkey_wallet1 != pubkey_wallet2
|
||||
# p2pk test
|
||||
secret_lock = await wallet1.create_p2pk_lock(
|
||||
pubkey_wallet2, tags=Tags([["pubkey", pubkey_wallet1]]), n_sigs=2
|
||||
)
|
||||
|
||||
_, send_proofs = await wallet1.split_to_send(
|
||||
wallet1.proofs, 8, secret_lock=secret_lock
|
||||
)
|
||||
# add signatures of wallet2
|
||||
send_proofs = await wallet1.add_p2pk_witnesses_to_proofs(send_proofs)
|
||||
# here we add the signatures of wallet1
|
||||
await wallet2.redeem(send_proofs)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_p2pk_multisig_duplicate_signature(wallet1: Wallet, wallet2: Wallet):
|
||||
await wallet1.mint(64)
|
||||
pubkey_wallet1 = await wallet1.create_p2pk_pubkey()
|
||||
pubkey_wallet2 = await wallet2.create_p2pk_pubkey()
|
||||
assert pubkey_wallet1 != pubkey_wallet2
|
||||
# p2pk test
|
||||
secret_lock = await wallet1.create_p2pk_lock(
|
||||
pubkey_wallet2, tags=Tags([["pubkey", pubkey_wallet1]]), n_sigs=2
|
||||
)
|
||||
|
||||
_, send_proofs = await wallet1.split_to_send(
|
||||
wallet1.proofs, 8, secret_lock=secret_lock
|
||||
)
|
||||
# add signatures of wallet2
|
||||
send_proofs = await wallet2.add_p2pk_witnesses_to_proofs(send_proofs)
|
||||
# here we add the signatures of wallet1
|
||||
await assert_err(
|
||||
wallet2.redeem(send_proofs), "Mint Error: p2pk signatures must be unique."
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_p2pk_multisig_quorum_not_met_1_of_2(wallet1: Wallet, wallet2: Wallet):
|
||||
await wallet1.mint(64)
|
||||
pubkey_wallet1 = await wallet1.create_p2pk_pubkey()
|
||||
pubkey_wallet2 = await wallet2.create_p2pk_pubkey()
|
||||
assert pubkey_wallet1 != pubkey_wallet2
|
||||
# p2pk test
|
||||
secret_lock = await wallet1.create_p2pk_lock(
|
||||
pubkey_wallet2, tags=Tags([["pubkey", pubkey_wallet1]]), n_sigs=2
|
||||
)
|
||||
_, send_proofs = await wallet1.split_to_send(
|
||||
wallet1.proofs, 8, secret_lock=secret_lock
|
||||
)
|
||||
await assert_err(
|
||||
wallet2.redeem(send_proofs),
|
||||
"Mint Error: not enough signatures provided: 1 < 2.",
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_p2pk_multisig_quorum_not_met_2_of_3(wallet1: Wallet, wallet2: Wallet):
|
||||
await wallet1.mint(64)
|
||||
pubkey_wallet1 = await wallet1.create_p2pk_pubkey()
|
||||
pubkey_wallet2 = await wallet2.create_p2pk_pubkey()
|
||||
assert pubkey_wallet1 != pubkey_wallet2
|
||||
# p2pk test
|
||||
secret_lock = await wallet1.create_p2pk_lock(
|
||||
pubkey_wallet2, tags=Tags([["pubkey", pubkey_wallet1]]), n_sigs=3
|
||||
)
|
||||
|
||||
_, send_proofs = await wallet1.split_to_send(
|
||||
wallet1.proofs, 8, secret_lock=secret_lock
|
||||
)
|
||||
# add signatures of wallet1
|
||||
send_proofs = await wallet1.add_p2pk_witnesses_to_proofs(send_proofs)
|
||||
# here we add the signatures of wallet2
|
||||
await assert_err(
|
||||
wallet2.redeem(send_proofs),
|
||||
"Mint Error: not enough signatures provided: 2 < 3.",
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_p2pk_multisig_with_duplicate_publickey(wallet1: Wallet, wallet2: Wallet):
|
||||
await wallet1.mint(64)
|
||||
pubkey_wallet2 = await wallet2.create_p2pk_pubkey()
|
||||
# p2pk test
|
||||
secret_lock = await wallet1.create_p2pk_lock(
|
||||
pubkey_wallet2, tags=Tags([["pubkey", pubkey_wallet2]]), n_sigs=2
|
||||
)
|
||||
_, send_proofs = await wallet1.split_to_send(
|
||||
wallet1.proofs, 8, secret_lock=secret_lock
|
||||
)
|
||||
await assert_err(wallet2.redeem(send_proofs), "Mint Error: pubkeys must be unique.")
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_p2pk_multisig_with_wrong_first_private_key(
|
||||
wallet1: Wallet, wallet2: Wallet
|
||||
):
|
||||
await wallet1.mint(64)
|
||||
pubkey_wallet1 = await wallet1.create_p2pk_pubkey()
|
||||
pubkey_wallet2 = await wallet2.create_p2pk_pubkey()
|
||||
wrong_pubklic_key = PrivateKey().pubkey
|
||||
assert wrong_pubklic_key
|
||||
wrong_public_key_hex = wrong_pubklic_key.serialize().hex()
|
||||
|
||||
assert wrong_public_key_hex != pubkey_wallet2
|
||||
|
||||
# p2pk test
|
||||
secret_lock = await wallet1.create_p2pk_lock(
|
||||
pubkey_wallet2, tags=Tags([["pubkey", wrong_public_key_hex]]), n_sigs=2
|
||||
)
|
||||
_, send_proofs = await wallet1.split_to_send(
|
||||
wallet1.proofs, 8, secret_lock=secret_lock
|
||||
)
|
||||
# add signatures of wallet1
|
||||
send_proofs = await wallet1.add_p2pk_witnesses_to_proofs(send_proofs)
|
||||
await assert_err(
|
||||
wallet2.redeem(send_proofs), "Mint Error: signature threshold not met. 1 < 2."
|
||||
)
|
||||
|
||||
|
||||
def test_tags():
|
||||
tags = Tags([["key1", "value1"], ["key2", "value2"], ["key2", "value3"]])
|
||||
assert tags.get_tag("key1") == "value1"
|
||||
assert tags["key1"] == "value1"
|
||||
assert tags.get_tag("key2") == "value2"
|
||||
assert tags["key2"] == "value2"
|
||||
assert tags.get_tag("key3") is None
|
||||
assert tags["key3"] is None
|
||||
assert tags.get_tag_all("key2") == ["value2", "value3"]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_secret_initialized_with_tags(wallet1: Wallet):
|
||||
tags = Tags([["locktime", "100"], ["n_sigs", "3"], ["sigflag", "SIG_ALL"]])
|
||||
pubkey = PrivateKey().pubkey
|
||||
assert pubkey
|
||||
secret = await wallet1.create_p2pk_lock(
|
||||
pubkey=pubkey.serialize().hex(),
|
||||
tags=tags,
|
||||
)
|
||||
assert secret.locktime
|
||||
assert secret.locktime == 100
|
||||
assert secret.n_sigs
|
||||
assert secret.n_sigs == 3
|
||||
assert secret.sigflag
|
||||
assert secret.sigflag == SigFlags.SIG_ALL
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_secret_initialized_with_arguments(wallet1: Wallet):
|
||||
pubkey = PrivateKey().pubkey
|
||||
assert pubkey
|
||||
secret = await wallet1.create_p2pk_lock(
|
||||
pubkey=pubkey.serialize().hex(),
|
||||
locktime_seconds=100,
|
||||
n_sigs=3,
|
||||
sig_all=True,
|
||||
)
|
||||
assert secret.locktime
|
||||
assert secret.locktime > 1689000000
|
||||
assert secret.n_sigs
|
||||
assert secret.n_sigs == 3
|
||||
assert secret.sigflag
|
||||
assert secret.sigflag == SigFlags.SIG_ALL
|
||||
84
tests/test_wallet_p2sh.py
Normal file
84
tests/test_wallet_p2sh.py
Normal file
@@ -0,0 +1,84 @@
|
||||
import asyncio
|
||||
import copy
|
||||
import secrets
|
||||
from typing import List
|
||||
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
|
||||
from cashu.core.base import Proof, Secret, SecretKind, Tags
|
||||
from cashu.core.crypto.secp import PrivateKey, PublicKey
|
||||
from cashu.core.helpers import async_unwrap, sum_proofs
|
||||
from cashu.core.migrations import migrate_databases
|
||||
from cashu.core.settings import settings
|
||||
from cashu.wallet import migrations
|
||||
from cashu.wallet.wallet import Wallet
|
||||
from cashu.wallet.wallet import Wallet as Wallet1
|
||||
from cashu.wallet.wallet import Wallet as Wallet2
|
||||
from tests.conftest import SERVER_ENDPOINT, mint
|
||||
|
||||
|
||||
async def assert_err(f, msg):
|
||||
"""Compute f() and expect an error message 'msg'."""
|
||||
try:
|
||||
await f
|
||||
except Exception as exc:
|
||||
if str(exc.args[0]) != msg:
|
||||
raise Exception(f"Expected error: {msg}, got: {exc.args[0]}")
|
||||
return
|
||||
raise Exception(f"Expected error: {msg}, got no error")
|
||||
|
||||
|
||||
def assert_amt(proofs: List[Proof], expected: int):
|
||||
"""Assert amounts the proofs contain."""
|
||||
assert [p.amount for p in proofs] == expected
|
||||
|
||||
|
||||
@pytest_asyncio.fixture(scope="function")
|
||||
async def wallet1(mint):
|
||||
wallet1 = Wallet1(SERVER_ENDPOINT, "data/wallet_p2sh_1", "wallet1")
|
||||
await migrate_databases(wallet1.db, migrations)
|
||||
await wallet1.load_mint()
|
||||
wallet1.status()
|
||||
yield wallet1
|
||||
|
||||
|
||||
@pytest_asyncio.fixture(scope="function")
|
||||
async def wallet2(mint):
|
||||
wallet2 = Wallet2(SERVER_ENDPOINT, "data/wallet_p2sh_2", "wallet2")
|
||||
await migrate_databases(wallet2.db, migrations)
|
||||
wallet2.private_key = PrivateKey(secrets.token_bytes(32), raw=True)
|
||||
await wallet2.load_mint()
|
||||
wallet2.status()
|
||||
yield wallet2
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_p2pk_pubkey(wallet1: Wallet):
|
||||
await wallet1.mint(64)
|
||||
pubkey = await wallet1.create_p2pk_pubkey()
|
||||
PublicKey(bytes.fromhex(pubkey), raw=True)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_p2sh(wallet1: Wallet, wallet2: Wallet):
|
||||
await wallet1.mint(64)
|
||||
_ = await wallet1.create_p2sh_address_and_store() # receiver side
|
||||
_, send_proofs = await wallet1.split_to_send(wallet1.proofs, 8) # sender side
|
||||
|
||||
frst_proofs, scnd_proofs = await wallet2.redeem(send_proofs) # receiver side
|
||||
assert len(frst_proofs) == 0
|
||||
assert len(scnd_proofs) == 1
|
||||
assert sum_proofs(scnd_proofs) == 8
|
||||
assert wallet2.balance == 8
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_p2sh_receive_with_wrong_wallet(wallet1: Wallet, wallet2: Wallet):
|
||||
await wallet1.mint(64)
|
||||
wallet1_address = await wallet1.create_p2sh_address_and_store() # receiver side
|
||||
secret_lock = await wallet1.create_p2sh_lock(wallet1_address) # sender side
|
||||
_, send_proofs = await wallet1.split_to_send(
|
||||
wallet1.proofs, 8, secret_lock
|
||||
) # sender side
|
||||
await assert_err(wallet2.redeem(send_proofs), "lock not found.") # wrong receiver
|
||||
Reference in New Issue
Block a user