Merge pull request #70 from cashubtc/multimint

Early multimint support
This commit is contained in:
calle
2022-12-27 15:44:33 +01:00
committed by GitHub
10 changed files with 441 additions and 81 deletions

View File

@@ -1,5 +1,5 @@
from sqlite3 import Row
from typing import Any, Dict, List, Union
from typing import Any, Dict, List, Optional, TypedDict, Union
from pydantic import BaseModel
@@ -212,3 +212,13 @@ class MintKeysets:
def get_ids(self):
return [k for k, _ in self.keysets.items()]
class TokenMintJson(BaseModel):
url: str
ks: List[str]
class TokenJson(BaseModel):
tokens: List[Proof]
mints: Optional[Dict[str, TokenMintJson]] = None

View File

@@ -7,9 +7,10 @@ from environs import Env # type: ignore
env = Env()
ENV_FILE = os.path.join(str(Path.home()), ".cashu", ".env")
# env file: default to current dir, else home dir
ENV_FILE = os.path.join(os.getcwd(), ".env")
if not os.path.isfile(ENV_FILE):
ENV_FILE = os.path.join(os.getcwd(), ".env")
ENV_FILE = os.path.join(str(Path.home()), ".cashu", ".env")
if os.path.isfile(ENV_FILE):
env.read_env(ENV_FILE)
else:
@@ -53,6 +54,7 @@ LNBITS_ENDPOINT = env.str("LNBITS_ENDPOINT", default=None)
LNBITS_KEY = env.str("LNBITS_KEY", default=None)
NOSTR_PRIVATE_KEY = env.str("NOSTR_PRIVATE_KEY", default=None)
NOSTR_RELAYS = env.list("NOSTR_RELAYS", default=["wss://nostr-pub.wellorder.net"])
MAX_ORDER = 64
VERSION = "0.7.0"

View File

@@ -7,17 +7,19 @@ import os
import sys
import threading
import time
import urllib.parse
from datetime import datetime
from functools import wraps
from itertools import groupby
from operator import itemgetter
from os import listdir
from os.path import isdir, join
from typing import Dict, List
import click
from loguru import logger
from cashu.core.base import Proof
from cashu.core.base import Proof, TokenJson, TokenMintJson
from cashu.core.helpers import sum_proofs
from cashu.core.migrations import migrate_databases
from cashu.core.settings import (
@@ -27,6 +29,7 @@ from cashu.core.settings import (
LIGHTNING,
MINT_URL,
NOSTR_PRIVATE_KEY,
NOSTR_RELAYS,
SOCKS_HOST,
SOCKS_PORT,
TOR,
@@ -38,12 +41,20 @@ from cashu.nostr.nostr.key import PublicKey
from cashu.tor.tor import TorProxy
from cashu.wallet import migrations
from cashu.wallet.crud import (
get_keyset,
get_lightning_invoices,
get_reserved_proofs,
get_unused_locks,
)
from cashu.wallet.wallet import Wallet as Wallet
from .cli_helpers import (
get_mint_wallet,
print_mint_balances,
redeem_multimint,
verify_mints,
)
async def init_wallet(wallet: Wallet):
"""Performs migrations and loads proofs from db."""
@@ -69,16 +80,8 @@ class NaturalOrderGroup(click.Group):
)
@click.pass_context
def cli(ctx, host: str, walletname: str):
# configure logger
logger.remove()
logger.add(sys.stderr, level="DEBUG" if DEBUG else "INFO")
ctx.ensure_object(dict)
ctx.obj["HOST"] = host
ctx.obj["WALLET_NAME"] = walletname
wallet = Wallet(ctx.obj["HOST"], os.path.join(CASHU_DIR, walletname))
if TOR and not TorProxy().check_platform():
error_str = "Your settings say TOR=true but the built-in Tor bundle is not supported on your system. Please install Tor manually and set TOR=false and SOCKS_HOST=localhost and SOCKS_PORT=9050 in your Cashu config (recommended) or turn off Tor by setting TOR=false (not recommended). Cashu will not work until you edit your config file accordingly."
error_str = "Your settings say TOR=true but the built-in Tor bundle is not supported on your system. You have two options: Either install Tor manually and set TOR=FALSE and SOCKS_HOST=localhost and SOCKS_PORT=9050 in your Cashu config (recommended). Or turn off Tor by setting TOR=false (not recommended). Cashu will not work until you edit your config file accordingly."
error_str += "\n\n"
if ENV_FILE:
error_str += f"Edit your Cashu config file here: {ENV_FILE}"
@@ -88,9 +91,17 @@ def cli(ctx, host: str, walletname: str):
f"Ceate a new Cashu config file here: {os.path.join(CASHU_DIR, '.env')}"
)
env_path = os.path.join(CASHU_DIR, ".env")
error_str += f'\n\nYou can turn off Tor with this command: echo "TOR=false" >> {env_path}'
error_str += f'\n\nYou can turn off Tor with this command: echo "TOR=FALSE" >> {env_path}'
raise Exception(error_str)
# configure logger
logger.remove()
logger.add(sys.stderr, level="DEBUG" if DEBUG else "INFO")
ctx.ensure_object(dict)
ctx.obj["HOST"] = host
ctx.obj["WALLET_NAME"] = walletname
wallet = Wallet(ctx.obj["HOST"], os.path.join(CASHU_DIR, walletname))
ctx.obj["WALLET"] = wallet
asyncio.run(init_wallet(wallet))
pass
@@ -193,18 +204,23 @@ async def invoice(ctx, amount: int, hash: str):
@coro
async def balance(ctx, verbose):
wallet: Wallet = ctx.obj["WALLET"]
keyset_balances = wallet.balance_per_keyset()
if len(keyset_balances) > 1:
print(f"You have balances in {len(keyset_balances)} keysets:")
print("")
for k, v in keyset_balances.items():
print(
f"Keyset: {k or 'undefined'} Balance: {v['balance']} sat (available: {v['available']} sat)"
)
print("")
if verbose:
# show balances per keyset
keyset_balances = wallet.balance_per_keyset()
if len(keyset_balances) > 1:
print(f"You have balances in {len(keyset_balances)} keysets:")
print("")
for k, v in keyset_balances.items():
print(
f"Keyset: {k} - Balance: {v['available']} sat (pending: {v['balance']-v['available']} sat)"
)
print("")
await print_mint_balances(ctx, wallet)
if verbose:
print(
f"Balance: {wallet.balance} sat (available: {wallet.available_balance} sat in {len([p for p in wallet.proofs if not p.reserved])} tokens)"
f"Balance: {wallet.available_balance} sat (pending: {wallet.balance-wallet.available_balance} sat) in {len([p for p in wallet.proofs if not p.reserved])} tokens"
)
else:
print(f"Balance: {wallet.available_balance} sat")
@@ -213,46 +229,140 @@ async def balance(ctx, verbose):
@cli.command("send", help="Send tokens.")
@click.argument("amount", type=int)
@click.option("--lock", "-l", default=None, help="Lock tokens (P2SH).", type=str)
@click.option(
"--legacy",
default=False,
is_flag=True,
help="Print legacy token without mint information.",
type=bool,
)
@click.pass_context
@coro
async def send(ctx, amount: int, lock: str):
async def send(ctx, amount: int, lock: str, legacy: bool):
if lock and len(lock) < 22:
print("Error: lock has to be at least 22 characters long.")
return
p2sh = False
if lock and len(lock.split("P2SH:")) == 2:
p2sh = True
wallet: Wallet = ctx.obj["WALLET"]
await wallet.load_mint()
wallet.status()
wallet = await get_mint_wallet(ctx)
await wallet.load_proofs()
_, send_proofs = await wallet.split_to_send(
wallet.proofs, amount, lock, set_reserved=True
)
token = await wallet.serialize_proofs(
send_proofs, hide_secrets=True if lock and not p2sh else False
send_proofs,
include_mints=True,
)
print(token)
if legacy:
print("")
print(
"Legacy token without mint information for older clients. This token can only be be received by wallets who use the mint the token is issued from:"
)
print("")
token = await wallet.serialize_proofs(
send_proofs,
legacy=True,
)
print(token)
wallet.status()
async def receive(ctx, token: str, lock: str):
wallet: Wallet = ctx.obj["WALLET"]
await wallet.load_mint()
# check for P2SH locks
if lock:
# load the script and signature of this address from the database
assert len(lock.split("P2SH:")) == 2, Exception(
"lock has wrong format. Expected P2SH:<address>."
)
address_split = lock.split("P2SH:")[1]
p2shscripts = await get_unused_locks(address_split, db=wallet.db)
assert len(p2shscripts) == 1, Exception("lock not found.")
script = p2shscripts[0].script
signature = p2shscripts[0].signature
script, signature = p2shscripts[0].script, p2shscripts[0].signature
else:
script, signature = None, None
proofs = [Proof(**p) for p in json.loads(base64.urlsafe_b64decode(token))]
_, _ = await wallet.redeem(proofs, scnd_script=script, scnd_siganture=signature)
# deserialize token
# ----- backwards compatibility -----
# we support old tokens (< 0.7) without mint information and (W3siaWQ...)
# new tokens (>= 0.7) with multiple mint support (eyJ0b2...)
try:
# backwards compatibility: tokens without mint information
# supports tokens of the form W3siaWQiOiJH
# LNbits token link parsing
# can extract minut URL from LNbits token links like:
# https://lnbits.server/cashu/wallet?mint_id=aMintId&recv_token=W3siaWQiOiJHY2...
url = None
if len(token.split("&recv_token=")) == 2:
# extract URL params
params = urllib.parse.parse_qs(token.split("?")[1])
# extract URL
if "mint_id" in params:
url = (
token.split("?")[0].split("/wallet")[0]
+ "/api/v1/"
+ params["mint_id"][0]
)
# extract token
token = params["recv_token"][0]
# assume W3siaWQiOiJH.. token
# trows an error if the desirialization with the old format doesn't work
proofs = [Proof(**p) for p in json.loads(base64.urlsafe_b64decode(token))]
token = await wallet.serialize_proofs(
proofs,
include_mints=False,
)
# if it was an LNbits link
# and add url and keyset id to token from link extraction above
if url:
token_object: TokenJson = await wallet._make_token(
proofs, include_mints=False
)
token_object.mints = {}
keysets = list(set([p.id for p in proofs]))
assert keysets is not None, "no keysets"
token_object.mints[url] = TokenMintJson(url=url, ks=keysets) # type: ignore
token = await wallet._serialize_token_base64(token_object)
except:
pass
# ----- receive token -----
# deserialize token
dtoken = json.loads(base64.urlsafe_b64decode(token))
assert "tokens" in dtoken, Exception("no proofs in token")
includes_mint_info: bool = "mints" in dtoken and dtoken.get("mints") is not None
# if there is a `mints` field in the token
# we check whether the token has mints that we don't know yet
# and ask the user if they want to trust the new mitns
if includes_mint_info:
# we ask the user to confirm any new mints the tokens may include
await verify_mints(ctx, dtoken)
# redeem tokens with new wallet instances
await redeem_multimint(ctx, dtoken, script, signature)
# reload main wallet so the balance updates
await wallet.load_proofs()
else:
# no mint information present, we extract the proofs and use wallet's default mint
proofs = [Proof(**p) for p in dtoken["tokens"]]
_, _ = await wallet.redeem(proofs, script, signature)
wallet.status()
@@ -312,17 +422,16 @@ async def pending(ctx):
):
grouped_proofs = list(value)
token = await wallet.serialize_proofs(grouped_proofs)
token_hidden_secret = await wallet.serialize_proofs(
grouped_proofs, hide_secrets=True
)
token_hidden_secret = await wallet.serialize_proofs(grouped_proofs)
reserved_date = datetime.utcfromtimestamp(
int(grouped_proofs[0].time_reserved)
).strftime("%Y-%m-%d %H:%M:%S")
print(
f"#{i} Amount: {sum_proofs(grouped_proofs)} sat Time: {reserved_date} ID: {key}\n"
)
print(f"With secret: {token}\n\nSecretless: {token_hidden_secret}\n")
print(f"{token}\n")
print(f"--------------------------\n")
print("To remove all spent tokens use: cashu burn -a")
wallet.status()
@@ -424,9 +533,8 @@ async def invoices(ctx):
@click.pass_context
@coro
async def nsend(ctx, amount: int, pubkey: str, verbose: bool, yes: bool):
wallet: Wallet = ctx.obj["WALLET"]
await wallet.load_mint()
wallet.status()
wallet = await get_mint_wallet(ctx)
await wallet.load_proofs()
_, send_proofs = await wallet.split_to_send(
wallet.proofs, amount, set_reserved=True
)
@@ -444,7 +552,7 @@ async def nsend(ctx, amount: int, pubkey: str, verbose: bool, yes: bool):
)
# we only use ephemeral private keys for sending
client = NostrClient()
client = NostrClient(relays=NOSTR_RELAYS)
if verbose:
print(f"Your ephemeral nostr private key: {client.private_key.hex()}")
await asyncio.sleep(1)
@@ -470,7 +578,7 @@ async def nreceive(ctx, verbose: bool):
"Warning: No nostr private key set! You don't have NOSTR_PRIVATE_KEY set in your .env file. I will create a random private key for this session but I will not remember it."
)
print("")
client = NostrClient(privatekey_hex=NOSTR_PRIVATE_KEY)
client = NostrClient(privatekey_hex=NOSTR_PRIVATE_KEY, relays=NOSTR_RELAYS)
print(f"Your nostr public key: {client.public_key.hex()}")
if verbose:
print(f"Your nostr private key (do not share!): {client.private_key.hex()}")
@@ -539,7 +647,8 @@ async def info(ctx):
if NOSTR_PRIVATE_KEY:
client = NostrClient(privatekey_hex=NOSTR_PRIVATE_KEY, connect=False)
print(f"Nostr public key: {client.public_key.hex()}")
print(f"Nostr relays: {NOSTR_RELAYS}")
if SOCKS_HOST:
print(f"Socks proxy: {SOCKS_HOST}:{SOCKS_PORT}")
print(f"Mint URL: {MINT_URL}")
print(f"Mint URL: {ctx.obj['HOST']}")
return

123
cashu/wallet/cli_helpers.py Normal file
View File

@@ -0,0 +1,123 @@
import os
import click
from cashu.core.base import Proof, WalletKeyset
from cashu.core.settings import CASHU_DIR
from cashu.wallet.crud import get_keyset
from cashu.wallet.wallet import Wallet as Wallet
async def verify_mints(ctx, dtoken):
trust_token_mints = True
for mint_id in dtoken.get("mints"):
for keyset in set(dtoken["mints"][mint_id]["ks"]):
mint_url = dtoken["mints"][mint_id]["url"]
# init a temporary wallet object
keyset_wallet = Wallet(
mint_url, os.path.join(CASHU_DIR, ctx.obj["WALLET_NAME"])
)
# make sure that this mint supports this keyset
mint_keysets = await keyset_wallet._get_keysets(mint_url)
assert keyset in mint_keysets["keysets"], "mint does not have this keyset."
# we validate the keyset id by fetching the keys from the mint
mint_keyset = await keyset_wallet._get_keyset(mint_url, keyset)
assert keyset == mint_keyset.id, Exception("keyset not valid.")
# we check the db whether we know this mint already and ask the user if not
mint_keysets = await get_keyset(mint_url=mint_url, db=keyset_wallet.db)
if mint_keysets is None:
# we encountered a new mint and ask for a user confirmation
trust_token_mints = False
print("")
print("Warning: Tokens are from a mint you don't know yet.")
print("\n")
print(f"Mint URL: {mint_url}")
print(f"Mint keyset: {keyset}")
print("\n")
click.confirm(
f"Do you trust this mint and want to receive the tokens?",
abort=True,
default=True,
)
trust_token_mints = True
assert trust_token_mints, Exception("Aborted!")
async def redeem_multimint(ctx, dtoken, script, signature):
# we get the mint information in the token and load the keys of each mint
# we then redeem the tokens for each keyset individually
for mint_id in dtoken.get("mints"):
for keyset in set(dtoken["mints"][mint_id]["ks"]):
mint_url = dtoken["mints"][mint_id]["url"]
# init a temporary wallet object
keyset_wallet = Wallet(
mint_url, os.path.join(CASHU_DIR, ctx.obj["WALLET_NAME"])
)
# load the keys
await keyset_wallet.load_mint(keyset_id=keyset)
# redeem proofs of this keyset
redeem_proofs = [
Proof(**p) for p in dtoken["tokens"] if Proof(**p).id == keyset
]
_, _ = await keyset_wallet.redeem(
redeem_proofs, scnd_script=script, scnd_siganture=signature
)
async def print_mint_balances(ctx, wallet, show_mints=False):
# get balances per mint
mint_balances = await wallet.balance_per_minturl()
# if we have a balance on a non-default mint, we show its URL
keysets = [k for k, v in wallet.balance_per_keyset().items()]
for k in keysets:
ks = await get_keyset(id=str(k), db=wallet.db)
if ks and ks.mint_url != ctx.obj["HOST"]:
show_mints = True
# or we have a balance on more than one mint
# show balances per mint
if len(mint_balances) > 1 or show_mints:
print(f"You have balances in {len(mint_balances)} mints:")
print("")
for i, (k, v) in enumerate(mint_balances.items()):
print(
f"Mint {i+1}: {k} - Balance: {v['available']} sat (pending: {v['balance']-v['available']} sat)"
)
print("")
async def get_mint_wallet(ctx):
wallet: Wallet = ctx.obj["WALLET"]
await wallet.load_mint()
mint_balances = await wallet.balance_per_minturl()
# if there is only one mint, use it
if len(mint_balances) == 1:
return wallet
await print_mint_balances(ctx, wallet, show_mints=True)
mint_nr = input(
f"Which mint do you want to use? [1-{len(mint_balances)}, default: 1] "
)
mint_nr = "1" if mint_nr == "" else mint_nr
if not mint_nr.isdigit():
raise Exception("invalid input.")
mint_nr = int(mint_nr)
mint_url = list(mint_balances.keys())[mint_nr - 1]
# load this mint_url into a wallet
mint_wallet = Wallet(mint_url, os.path.join(CASHU_DIR, ctx.obj["WALLET_NAME"]))
mint_keysets: WalletKeyset = await get_keyset(mint_url=mint_url, db=mint_wallet.db) # type: ignore
# load the keys
await mint_wallet.load_mint(keyset_id=mint_keysets.id)
return mint_wallet

View File

@@ -207,8 +207,8 @@ async def store_keyset(
async def get_keyset(
id: str = None,
mint_url: str = None,
id: str = "",
mint_url: str = "",
db: Database = None,
conn: Optional[Connection] = None,
):

View File

@@ -5,7 +5,7 @@ import secrets as scrts
import time
import uuid
from itertools import groupby
from typing import Dict, List
from typing import Dict, List, Optional
import requests
from loguru import logger
@@ -23,6 +23,8 @@ from cashu.core.base import (
P2SHScript,
Proof,
SplitRequest,
TokenJson,
TokenMintJson,
WalletKeyset,
)
from cashu.core.bolt11 import Invoice as InvoiceBolt11
@@ -53,9 +55,10 @@ from cashu.wallet.crud import (
class LedgerAPI:
keys: Dict[int, str]
keys: Dict[int, PublicKey]
keyset: str
tor: TorProxy
db: Database
def __init__(self, url):
self.url = url
@@ -63,6 +66,8 @@ class LedgerAPI:
def _set_requests(self):
s = requests.Session()
s.headers.update({"Client-version": VERSION})
if DEBUG:
s.verify = False
socks_host, socks_port = None, None
if TOR and TorProxy().check_platform():
self.tor = TorProxy(timeout=True)
@@ -107,16 +112,32 @@ class LedgerAPI:
"""Returns base64 encoded random string."""
return scrts.token_urlsafe(randombits // 8)
async def _load_mint(self):
async def _load_mint(self, keyset_id: str = ""):
"""
Loads the current keys and the active keyset of the map.
Loads the public keys of the mint. Either gets the keys for the specified
`keyset_id` or loads the most recent one from the mint.
Gets and the active keyset ids of the mint and stores in `self.keysets`.
"""
assert len(
self.url
), "Ledger not initialized correctly: mint URL not specified yet. "
# get current keyset
keyset = await self._get_keys(self.url)
# get all active keysets
if keyset_id:
# get requested keyset
keyset = await self._get_keyset(self.url, keyset_id)
else:
# get current keyset
keyset = await self._get_keys(self.url)
# store current keyset
assert len(keyset.public_keys) > 0, "did not receive keys from mint."
# check if current keyset is in db
keyset_local: Optional[WalletKeyset] = await get_keyset(keyset.id, db=self.db)
if keyset_local is None:
await store_keyset(keyset=keyset, db=self.db)
# get all active keysets of this mint
mint_keysets = []
try:
keysets_resp = await self._get_keysets(self.url)
@@ -126,14 +147,6 @@ class LedgerAPI:
pass
self.keysets = mint_keysets if len(mint_keysets) else [keyset.id]
# store current keyset
assert len(keyset.public_keys) > 0, "did not receive keys from mint."
# check if current keyset is in db
keyset_local: WalletKeyset = await get_keyset(keyset.id, db=self.db)
if keyset_local is None:
await store_keyset(keyset=keyset, db=self.db)
logger.debug(f"Mint keysets: {self.keysets}")
logger.debug(f"Current mint keyset: {keyset.id}")
@@ -173,7 +186,7 @@ class LedgerAPI:
ENDPOINTS
"""
async def _get_keys(self, url):
async def _get_keys(self, url: str):
self.s = self._set_requests()
resp = self.s.get(
url + "/keys",
@@ -188,7 +201,26 @@ class LedgerAPI:
keyset = WalletKeyset(pubkeys=keyset_keys, mint_url=url)
return keyset
async def _get_keysets(self, url):
async def _get_keyset(self, url: str, keyset_id: str):
"""
keyset_id is base64, needs to be urlsafe-encoded.
"""
self.s = self._set_requests()
keyset_id_urlsafe = keyset_id.replace("+", "-").replace("/", "_")
resp = self.s.get(
url + f"/keys/{keyset_id_urlsafe}",
)
resp.raise_for_status()
keys = resp.json()
assert len(keys), Exception("did not receive any keys")
keyset_keys = {
int(amt): PublicKey(bytes.fromhex(val), raw=True)
for amt, val in keys.items()
}
keyset = WalletKeyset(pubkeys=keyset_keys, mint_url=url)
return keyset
async def _get_keysets(self, url: str):
self.s = self._set_requests()
resp = self.s.get(
url + "/keysets",
@@ -225,7 +257,7 @@ class LedgerAPI:
promises = [BlindedSignature(**p) for p in promises_list]
return self._construct_proofs(promises, secrets, rs)
async def split(self, proofs, amount, scnd_secret: str = None):
async def split(self, proofs, amount, scnd_secret: Optional[str] = None):
"""Consume proofs and create new promises based on amount split.
If scnd_secret is None, random secrets will be generated for the tokens to keep (frst_outputs)
and the promises to send (scnd_outputs).
@@ -259,6 +291,7 @@ class LedgerAPI:
payloads, rs = self._construct_outputs(amounts, secrets)
split_payload = SplitRequest(proofs=proofs, amount=amount, outputs=payloads)
# construct payload
def _splitrequest_include_fields(proofs):
"""strips away fields from the model that aren't necessary for the /split"""
proofs_include = {"id", "amount", "secret", "C", "script"}
@@ -359,8 +392,8 @@ class Wallet(LedgerAPI):
self.proofs: List[Proof] = []
self.name = name
async def load_mint(self):
await super()._load_mint()
async def load_mint(self, keyset_id: str = ""):
await super()._load_mint(keyset_id)
async def load_proofs(self):
self.proofs = await get_proofs(db=self.db)
@@ -373,13 +406,25 @@ class Wallet(LedgerAPI):
def _get_proofs_per_keyset(proofs: List[Proof]):
return {key: list(group) for key, group in groupby(proofs, lambda p: p.id)}
async def _get_proofs_per_minturl(self, proofs: List[Proof]):
ret = {}
for id in set([p.id for p in proofs]):
if id is None:
continue
keyset: WalletKeyset = await get_keyset(id=id, db=self.db)
if keyset.mint_url not in ret:
ret[keyset.mint_url] = [p for p in proofs if p.id == id]
else:
ret[keyset.mint_url].extend([p for p in proofs if p.id == id])
return ret
async def request_mint(self, amount):
invoice = super().request_mint(amount)
invoice.time_created = int(time.time())
await store_lightning_invoice(db=self.db, invoice=invoice)
return invoice
async def mint(self, amount: int, payment_hash: str = None):
async def mint(self, amount: int, payment_hash: Optional[str] = None):
split = amount_split(amount)
proofs = await super().mint(split, payment_hash)
if proofs == []:
@@ -395,8 +440,8 @@ class Wallet(LedgerAPI):
async def redeem(
self,
proofs: List[Proof],
scnd_script: str = None,
scnd_siganture: str = None,
scnd_script: Optional[str] = None,
scnd_siganture: Optional[str] = None,
):
if scnd_script and scnd_siganture:
logger.debug(f"Unlock script: {scnd_script}")
@@ -409,7 +454,7 @@ class Wallet(LedgerAPI):
self,
proofs: List[Proof],
amount: int,
scnd_secret: str = None,
scnd_secret: Optional[str] = None,
):
assert len(proofs) > 0, ValueError("no proofs provided.")
frst_proofs, scnd_proofs = await super().split(proofs, amount, scnd_secret)
@@ -442,17 +487,67 @@ class Wallet(LedgerAPI):
raise Exception("could not pay invoice.")
return status["paid"]
@staticmethod
async def serialize_proofs(proofs: List[Proof], hide_secrets=False):
if hide_secrets:
proofs_serialized = [p.to_dict_no_secret() for p in proofs]
else:
proofs_serialized = [p.to_dict() for p in proofs]
token = base64.urlsafe_b64encode(
json.dumps(proofs_serialized).encode()
).decode()
async def _make_token(self, proofs: List[Proof], include_mints=True):
"""
Takes list of proofs and produces a TokenJson by looking up
the keyset id and mint URLs from the database.
"""
# build token
token = TokenJson(tokens=proofs)
# add mint information to the token, if requested
if include_mints:
# hold information about the mint
mints: Dict[str, TokenMintJson] = dict()
# iterate through all proofs and add their keyset to `mints`
for proof in proofs:
if proof.id:
# load the keyset from the db
keyset = await get_keyset(id=proof.id, db=self.db)
if keyset and keyset.mint_url:
# TODO: replace this with a mint pubkey
placeholder_mint_id = keyset.mint_url
if placeholder_mint_id not in mints:
# mint information
id = TokenMintJson(
url=keyset.mint_url,
ks=[keyset.id],
)
mints[placeholder_mint_id] = id
else:
# if a mint has multiple keysets, append to the existing list
if keyset.id not in mints[placeholder_mint_id].ks:
mints[placeholder_mint_id].ks.append(keyset.id)
if len(mints) > 0:
token.mints = mints
return token
async def _serialize_token_base64(self, token: TokenJson):
"""
Takes a TokenJson and serializes it in urlsafe_base64.
"""
# encode the token as a base64 string
token_base64 = base64.urlsafe_b64encode(
json.dumps(token.dict()).encode()
).decode()
return token_base64
async def serialize_proofs(
self, proofs: List[Proof], include_mints=True, legacy=False
):
"""
Produces sharable token with proofs and mint information.
"""
if legacy:
proofs_serialized = [p.to_dict() for p in proofs]
return base64.urlsafe_b64encode(
json.dumps(proofs_serialized).encode()
).decode()
token = await self._make_token(proofs, include_mints)
return await self._serialize_token_base64(token)
async def _get_spendable_proofs(self, proofs: List[Proof]):
"""
Selects proofs that can be used with the current mint.
@@ -489,7 +584,7 @@ class Wallet(LedgerAPI):
self,
proofs: List[Proof],
amount,
scnd_secret: str = None,
scnd_secret: Optional[str] = None,
set_reserved: bool = False,
):
"""Like self.split but only considers non-reserved tokens."""
@@ -568,5 +663,16 @@ class Wallet(LedgerAPI):
for key, proofs in self._get_proofs_per_keyset(self.proofs).items()
}
async def balance_per_minturl(self):
balances = await self._get_proofs_per_minturl(self.proofs)
balances_return = {
key: {
"balance": sum_proofs(proofs),
"available": sum_proofs([p for p in proofs if not p.reserved]),
}
for key, proofs in balances.items()
}
return dict(sorted(balances_return.items(), key=lambda item: item[1]["available"], reverse=True)) # type: ignore
def proof_amounts(self):
return [p["amount"] for p in sorted(self.proofs, key=lambda p: p["amount"])]

Binary file not shown.

View File

@@ -57,13 +57,23 @@ async def test_get_keys(wallet1: Wallet):
assert len(keyset.id) > 0
@pytest.mark.asyncio
async def test_get_keyset(wallet1: Wallet):
assert len(wallet1.keys) == MAX_ORDER
# ket's get the keys first so we can get a keyset ID that we use later
keys1 = await wallet1._get_keys(wallet1.url)
# gets the keys of a specific keyset
keys2 = await wallet1._get_keyset(wallet1.url, keys1.id)
assert len(keys1.public_keys) == len(keys2.public_keys)
@pytest.mark.asyncio
async def test_get_keysets(wallet1: Wallet):
keyset = await wallet1._get_keysets(wallet1.url)
assert type(keyset) == dict
assert type(keyset["keysets"]) == list
assert len(keyset["keysets"]) > 0
assert keyset["keysets"][0] == wallet1.keyset_id
assert keyset["keysets"][-1] == wallet1.keyset_id
@pytest.mark.asyncio