mirror of
https://github.com/aljazceru/nutshell.git
synced 2025-12-23 03:34:19 +01:00
add tests
This commit is contained in:
@@ -116,17 +116,7 @@ class Ledger:
|
|||||||
|
|
||||||
C = PublicKey(bytes.fromhex(proof.C), raw=True)
|
C = PublicKey(bytes.fromhex(proof.C), raw=True)
|
||||||
|
|
||||||
# backwards compatibility with old hash_to_curve
|
# backwards compatibility with old hash_to_curve < 0.3.3
|
||||||
logger.debug(f"Client version {context.get('client-version')}")
|
|
||||||
if self.keysets.keysets.get(proof.id):
|
|
||||||
logger.debug(
|
|
||||||
f"Token keyset: {self.keysets.keysets.get(proof.id)}, token version: {self.keysets.keysets[proof.id].version}"
|
|
||||||
)
|
|
||||||
# if not context.get("client-version") or (
|
|
||||||
# self.keysets.keysets.get(proof.id)
|
|
||||||
# and not self.keysets.keysets[proof.id].version
|
|
||||||
# ):
|
|
||||||
# return legacy.verify_pre_0_3_3(secret_key, C, proof.secret)
|
|
||||||
try:
|
try:
|
||||||
ret = legacy.verify_pre_0_3_3(secret_key, C, proof.secret)
|
ret = legacy.verify_pre_0_3_3(secret_key, C, proof.secret)
|
||||||
if ret:
|
if ret:
|
||||||
|
|||||||
@@ -50,28 +50,8 @@ class LedgerAPI:
|
|||||||
|
|
||||||
def __init__(self, url):
|
def __init__(self, url):
|
||||||
self.url = url
|
self.url = url
|
||||||
|
self.s = requests.Session()
|
||||||
async def _get_keys(self, url):
|
self.s.headers.update({"Client-version": VERSION})
|
||||||
resp = requests.get(
|
|
||||||
url + "/keys",
|
|
||||||
headers={"Client-version": VERSION},
|
|
||||||
).json()
|
|
||||||
keys = resp
|
|
||||||
assert len(keys), Exception("did not receive any keys")
|
|
||||||
keyset_keys = {
|
|
||||||
int(amt): PublicKey(bytes.fromhex(val), raw=True)
|
|
||||||
for amt, val in keys.items()
|
|
||||||
}
|
|
||||||
keyset = WalletKeyset(pubkeys=keyset_keys, mint_url=url)
|
|
||||||
return keyset
|
|
||||||
|
|
||||||
async def _get_keysets(self, url):
|
|
||||||
keysets = requests.get(
|
|
||||||
url + "/keysets",
|
|
||||||
headers={"Client-version": VERSION},
|
|
||||||
).json()
|
|
||||||
assert len(keysets), Exception("did not receive any keysets")
|
|
||||||
return keysets
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _get_output_split(amount):
|
def _get_output_split(amount):
|
||||||
@@ -100,6 +80,11 @@ class LedgerAPI:
|
|||||||
proofs.append(proof)
|
proofs.append(proof)
|
||||||
return proofs
|
return proofs
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def raise_on_error(resp_dict):
|
||||||
|
if "error" in resp_dict:
|
||||||
|
raise Exception("Mint Error: {}".format(resp_dict["error"]))
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _generate_secret(randombits=128):
|
def _generate_secret(randombits=128):
|
||||||
"""Returns base64 encoded random string."""
|
"""Returns base64 encoded random string."""
|
||||||
@@ -138,12 +123,6 @@ class LedgerAPI:
|
|||||||
self.keys = keyset.public_keys
|
self.keys = keyset.public_keys
|
||||||
self.keyset_id = keyset.id
|
self.keyset_id = keyset.id
|
||||||
|
|
||||||
def request_mint(self, amount):
|
|
||||||
"""Requests a mint from the server and returns Lightning invoice."""
|
|
||||||
r = requests.get(self.url + "/mint", params={"amount": amount})
|
|
||||||
r.raise_for_status()
|
|
||||||
return r.json()
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _construct_outputs(amounts: List[int], secrets: List[str]):
|
def _construct_outputs(amounts: List[int], secrets: List[str]):
|
||||||
"""Takes a list of amounts and secrets and returns outputs.
|
"""Takes a list of amounts and secrets and returns outputs.
|
||||||
@@ -173,25 +152,55 @@ class LedgerAPI:
|
|||||||
return [f"{secret}:{self._generate_secret()}" for i in range(n)]
|
return [f"{secret}:{self._generate_secret()}" for i in range(n)]
|
||||||
return [f"{i}:{secret}" for i in range(n)]
|
return [f"{i}:{secret}" for i in range(n)]
|
||||||
|
|
||||||
|
"""
|
||||||
|
ENDPOINTS
|
||||||
|
"""
|
||||||
|
|
||||||
|
async def _get_keys(self, url):
|
||||||
|
resp = self.s.get(
|
||||||
|
url + "/keys",
|
||||||
|
)
|
||||||
|
resp.raise_for_status()
|
||||||
|
keys = resp.json()
|
||||||
|
assert len(keys), Exception("did not receive any keys")
|
||||||
|
keyset_keys = {
|
||||||
|
int(amt): PublicKey(bytes.fromhex(val), raw=True)
|
||||||
|
for amt, val in keys.items()
|
||||||
|
}
|
||||||
|
keyset = WalletKeyset(pubkeys=keyset_keys, mint_url=url)
|
||||||
|
return keyset
|
||||||
|
|
||||||
|
async def _get_keysets(self, url):
|
||||||
|
resp = self.s.get(
|
||||||
|
url + "/keysets",
|
||||||
|
).json()
|
||||||
|
resp.raise_for_status()
|
||||||
|
keysets = resp.json()
|
||||||
|
assert len(keysets), Exception("did not receive any keysets")
|
||||||
|
return keysets
|
||||||
|
|
||||||
|
def request_mint(self, amount):
|
||||||
|
"""Requests a mint from the server and returns Lightning invoice."""
|
||||||
|
resp = self.s.get(self.url + "/mint", params={"amount": amount})
|
||||||
|
resp.raise_for_status()
|
||||||
|
return_dict = resp.json()
|
||||||
|
self.raise_on_error(return_dict)
|
||||||
|
return return_dict
|
||||||
|
|
||||||
async def mint(self, amounts, payment_hash=None):
|
async def mint(self, amounts, payment_hash=None):
|
||||||
"""Mints new coins and returns a proof of promise."""
|
"""Mints new coins and returns a proof of promise."""
|
||||||
secrets = [self._generate_secret() for s in range(len(amounts))]
|
secrets = [self._generate_secret() for s in range(len(amounts))]
|
||||||
await self._check_used_secrets(secrets)
|
await self._check_used_secrets(secrets)
|
||||||
payloads, rs = self._construct_outputs(amounts, secrets)
|
payloads, rs = self._construct_outputs(amounts, secrets)
|
||||||
|
|
||||||
resp = requests.post(
|
resp = self.s.post(
|
||||||
self.url + "/mint",
|
self.url + "/mint",
|
||||||
json=payloads.dict(),
|
json=payloads.dict(),
|
||||||
params={"payment_hash": payment_hash},
|
params={"payment_hash": payment_hash},
|
||||||
headers={"Client-version": VERSION},
|
|
||||||
)
|
)
|
||||||
resp.raise_for_status()
|
resp.raise_for_status()
|
||||||
try:
|
promises_list = resp.json()
|
||||||
promises_list = resp.json()
|
self.raise_on_error(promises_list)
|
||||||
except:
|
|
||||||
raise Exception("Unkown mint error.")
|
|
||||||
if "error" in promises_list:
|
|
||||||
raise Exception("Error: {}".format(promises_list["error"]))
|
|
||||||
|
|
||||||
promises = [BlindedSignature.from_dict(p) for p in promises_list]
|
promises = [BlindedSignature.from_dict(p) for p in promises_list]
|
||||||
return self._construct_proofs(promises, secrets, rs)
|
return self._construct_proofs(promises, secrets, rs)
|
||||||
@@ -239,18 +248,14 @@ class LedgerAPI:
|
|||||||
"proofs": {i: proofs_include for i in range(len(proofs))},
|
"proofs": {i: proofs_include for i in range(len(proofs))},
|
||||||
}
|
}
|
||||||
|
|
||||||
resp = requests.post(
|
resp = self.s.post(
|
||||||
self.url + "/split",
|
self.url + "/split",
|
||||||
json=split_payload.dict(include=_splitrequest_include_fields(proofs)),
|
json=split_payload.dict(include=_splitrequest_include_fields(proofs)),
|
||||||
headers={"Client-version": VERSION},
|
|
||||||
)
|
)
|
||||||
resp.raise_for_status()
|
resp.raise_for_status()
|
||||||
try:
|
promises_dict = resp.json()
|
||||||
promises_dict = resp.json()
|
self.raise_on_error(promises_dict)
|
||||||
except:
|
|
||||||
raise Exception("Unkown mint error.")
|
|
||||||
if "error" in promises_dict:
|
|
||||||
raise Exception("Mint Error: {}".format(promises_dict["error"]))
|
|
||||||
promises_fst = [BlindedSignature.from_dict(p) for p in promises_dict["fst"]]
|
promises_fst = [BlindedSignature.from_dict(p) for p in promises_dict["fst"]]
|
||||||
promises_snd = [BlindedSignature.from_dict(p) for p in promises_dict["snd"]]
|
promises_snd = [BlindedSignature.from_dict(p) for p in promises_dict["snd"]]
|
||||||
# Construct proofs from promises (i.e., unblind signatures)
|
# Construct proofs from promises (i.e., unblind signatures)
|
||||||
@@ -264,31 +269,35 @@ class LedgerAPI:
|
|||||||
return frst_proofs, scnd_proofs
|
return frst_proofs, scnd_proofs
|
||||||
|
|
||||||
async def check_spendable(self, proofs: List[Proof]):
|
async def check_spendable(self, proofs: List[Proof]):
|
||||||
|
"""
|
||||||
|
Cheks whether the secrets in proofs are already spent or not and returns a list of booleans.
|
||||||
|
"""
|
||||||
payload = CheckRequest(proofs=proofs)
|
payload = CheckRequest(proofs=proofs)
|
||||||
resp = requests.post(
|
resp = self.s.post(
|
||||||
self.url + "/check",
|
self.url + "/check",
|
||||||
json=payload.dict(),
|
json=payload.dict(),
|
||||||
headers={"Client-version": VERSION},
|
|
||||||
)
|
)
|
||||||
resp.raise_for_status()
|
resp.raise_for_status()
|
||||||
return_dict = resp.json()
|
return_dict = resp.json()
|
||||||
|
self.raise_on_error(return_dict)
|
||||||
return return_dict
|
return return_dict
|
||||||
|
|
||||||
async def check_fees(self, payment_request: str):
|
async def check_fees(self, payment_request: str):
|
||||||
"""Checks whether the Lightning payment is internal."""
|
"""Checks whether the Lightning payment is internal."""
|
||||||
payload = CheckFeesRequest(pr=payment_request)
|
payload = CheckFeesRequest(pr=payment_request)
|
||||||
resp = requests.post(
|
resp = self.s.post(
|
||||||
self.url + "/checkfees",
|
self.url + "/checkfees",
|
||||||
json=payload.dict(),
|
json=payload.dict(),
|
||||||
headers={"Client-version": VERSION},
|
|
||||||
)
|
)
|
||||||
resp.raise_for_status()
|
resp.raise_for_status()
|
||||||
|
|
||||||
return_dict = resp.json()
|
return_dict = resp.json()
|
||||||
|
self.raise_on_error(return_dict)
|
||||||
return return_dict
|
return return_dict
|
||||||
|
|
||||||
async def pay_lightning(self, proofs: List[Proof], invoice: str):
|
async def pay_lightning(self, proofs: List[Proof], invoice: str):
|
||||||
|
"""
|
||||||
|
Accepts proofs and a lightning invoice to pay in exchange.
|
||||||
|
"""
|
||||||
payload = MeltRequest(proofs=proofs, invoice=invoice)
|
payload = MeltRequest(proofs=proofs, invoice=invoice)
|
||||||
|
|
||||||
def _meltequest_include_fields(proofs):
|
def _meltequest_include_fields(proofs):
|
||||||
@@ -300,14 +309,13 @@ class LedgerAPI:
|
|||||||
"proofs": {i: proofs_include for i in range(len(proofs))},
|
"proofs": {i: proofs_include for i in range(len(proofs))},
|
||||||
}
|
}
|
||||||
|
|
||||||
resp = requests.post(
|
resp = self.s.post(
|
||||||
self.url + "/melt",
|
self.url + "/melt",
|
||||||
json=payload.dict(include=_meltequest_include_fields(proofs)),
|
json=payload.dict(include=_meltequest_include_fields(proofs)),
|
||||||
headers={"Client-version": VERSION},
|
|
||||||
)
|
)
|
||||||
resp.raise_for_status()
|
resp.raise_for_status()
|
||||||
|
|
||||||
return_dict = resp.json()
|
return_dict = resp.json()
|
||||||
|
self.raise_on_error(return_dict)
|
||||||
return return_dict
|
return return_dict
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
37
tests/test_crypto.py
Normal file
37
tests/test_crypto.py
Normal file
@@ -0,0 +1,37 @@
|
|||||||
|
import pytest
|
||||||
|
|
||||||
|
from cashu.core.b_dhke import hash_to_curve
|
||||||
|
|
||||||
|
|
||||||
|
def test_hash_to_curve():
|
||||||
|
result = hash_to_curve(
|
||||||
|
bytes.fromhex(
|
||||||
|
"0000000000000000000000000000000000000000000000000000000000000000"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
result.serialize().hex()
|
||||||
|
== "0266687aadf862bd776c8fc18b8e9f8e20089714856ee233b3902a591d0d5f2925"
|
||||||
|
)
|
||||||
|
|
||||||
|
result = hash_to_curve(
|
||||||
|
bytes.fromhex(
|
||||||
|
"0000000000000000000000000000000000000000000000000000000000000001"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
result.serialize().hex()
|
||||||
|
== "02ec4916dd28fc4c10d78e287ca5d9cc51ee1ae73cbfde08c6b37324cbfaac8bc5"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_hash_to_curve_iteration():
|
||||||
|
result = hash_to_curve(
|
||||||
|
bytes.fromhex(
|
||||||
|
"0000000000000000000000000000000000000000000000000000000000000002"
|
||||||
|
)
|
||||||
|
)
|
||||||
|
assert (
|
||||||
|
result.serialize().hex()
|
||||||
|
== "02076c988b353fcbb748178ecb286bc9d0b4acf474d4ba31ba62334e46c97c416a"
|
||||||
|
)
|
||||||
Reference in New Issue
Block a user