#!/usr/bin/env python import asyncio import base64 import json import os import sys import time from datetime import datetime from functools import wraps from itertools import groupby from operator import itemgetter from os import listdir from os.path import isdir, join import click from loguru import logger from cashu.core.base import Proof from cashu.core.helpers import sum_proofs from cashu.core.migrations import migrate_databases from cashu.core.settings import ( CASHU_DIR, DEBUG, ENV_FILE, LIGHTNING, MINT_URL, SOCKS_HOST, SOCKS_PORT, TOR, VERSION, ) from cashu.tor.tor import TorProxy from cashu.wallet import migrations from cashu.wallet.crud import ( get_keyset, get_lightning_invoices, get_reserved_proofs, get_unused_locks, ) from cashu.wallet.wallet import Wallet as Wallet async def init_wallet(wallet: Wallet): """Performs migrations and loads proofs from db.""" await migrate_databases(wallet.db, migrations) await wallet.load_proofs() class NaturalOrderGroup(click.Group): """For listing commands in help in order of definition""" def list_commands(self, ctx): return self.commands.keys() @click.group(cls=NaturalOrderGroup) @click.option("--host", "-h", default=MINT_URL, help=f"Mint URL (default: {MINT_URL}).") @click.option( "--wallet", "-w", "walletname", default="wallet", help="Wallet name (default: wallet).", ) @click.pass_context def cli(ctx, host: str, walletname: str): if TOR and not TorProxy().check_platform(): error_str = "Your settings say TOR=true but the built-in Tor bundle is not supported on your system. Please install Tor manually and set TOR=false and SOCKS_HOST=localhost and SOCKS_PORT=9050 in your Cashu config (recommended) or turn off Tor by setting TOR=false (not recommended). Cashu will not work until you edit your config file accordingly." error_str += "\n\n" if ENV_FILE: error_str += f"Edit your Cashu config file here: {ENV_FILE}" else: error_str += ( f"Ceate a new Cashu config file here: {os.path.join(CASHU_DIR, '.env')}" ) raise Exception(error_str) # configure logger logger.remove() logger.add(sys.stderr, level="DEBUG" if DEBUG else "INFO") ctx.ensure_object(dict) ctx.obj["HOST"] = host ctx.obj["WALLET_NAME"] = walletname wallet = Wallet(ctx.obj["HOST"], os.path.join(CASHU_DIR, walletname)) ctx.obj["WALLET"] = wallet asyncio.run(init_wallet(wallet)) pass # https://github.com/pallets/click/issues/85#issuecomment-503464628 def coro(f): @wraps(f) def wrapper(*args, **kwargs): return asyncio.run(f(*args, **kwargs)) return wrapper @cli.command("pay", help="Pay Lightning invoice.") @click.argument("invoice", type=str) @click.option( "--yes", "-y", default=False, is_flag=True, help="Skip confirmation.", type=bool ) @click.pass_context @coro async def pay(ctx, invoice: str, yes: bool): wallet: Wallet = ctx.obj["WALLET"] await wallet.load_mint() wallet.status() amount, fees = await wallet.get_pay_amount_with_fees(invoice) if not yes: click.confirm( f"Pay {amount - fees} sat ({amount} sat incl. fees)?", abort=True, default=True, ) print(f"Paying Lightning invoice ...") assert amount > 0, "amount is not positive" if wallet.available_balance < amount: print("Error: Balance too low.") return _, send_proofs = await wallet.split_to_send(wallet.proofs, amount) await wallet.pay_lightning(send_proofs, invoice) wallet.status() @cli.command("invoice", help="Create Lighting invoice.") @click.argument("amount", type=int) @click.option("--hash", default="", help="Hash of the paid invoice.", type=str) @click.pass_context @coro async def invoice(ctx, amount: int, hash: str): wallet: Wallet = ctx.obj["WALLET"] await wallet.load_mint() wallet.status() if not LIGHTNING: r = await wallet.mint(amount) elif amount and not hash: invoice = await wallet.request_mint(amount) if invoice.pr: print(f"Pay invoice to mint {amount} sat:") print("") print(f"Invoice: {invoice.pr}") print("") print( f"Execute this command if you abort the check:\ncashu invoice {amount} --hash {invoice.hash}" ) check_until = time.time() + 5 * 60 # check for five minutes print("") print( f"Checking invoice ...", end="", flush=True, ) paid = False while time.time() < check_until and not paid: time.sleep(3) try: await wallet.mint(amount, invoice.hash) paid = True print(" Invoice paid.") except Exception as e: # TODO: user error codes! if str(e) == "Error: Lightning invoice not paid yet.": print(".", end="", flush=True) continue elif amount and hash: await wallet.mint(amount, hash) wallet.status() return @cli.command("balance", help="Balance.") @click.option( "--verbose", "-v", default=False, is_flag=True, help="Show pending tokens as well.", type=bool, ) @click.pass_context @coro async def balance(ctx, verbose): wallet: Wallet = ctx.obj["WALLET"] if verbose: # show balances per keyset keyset_balances = wallet.balance_per_keyset() if len(keyset_balances) > 1: print(f"You have balances in {len(keyset_balances)} keysets:") print("") for k, v in keyset_balances.items(): print( f"Keyset: {k or 'undefined'} Balance: {v['balance']} sat (available: {v['available']} sat)" ) print("") mint_balances = await wallet.balance_per_minturl() if len(mint_balances) > 1: # show balances per mint print(f"You have balances in {len(mint_balances)} mints:") print("") for k, v in mint_balances.items(): print( f"Mint: {k or 'undefined'} Balance: {v['balance']} sat (available: {v['available']} sat)" ) print("") if verbose: print( f"Balance: {wallet.balance} sat (available: {wallet.available_balance} sat in {len([p for p in wallet.proofs if not p.reserved])} tokens)" ) else: print(f"Balance: {wallet.available_balance} sat") @cli.command("send", help="Send tokens.") @click.argument("amount", type=int) @click.option("--lock", "-l", default=None, help="Lock tokens (P2SH).", type=str) @click.option( "--legacy", default=False, is_flag=True, help="Print legacy token without mint information.", type=bool, ) @click.pass_context @coro async def send(ctx, amount: int, lock: str, legacy: bool): if lock and len(lock) < 22: print("Error: lock has to be at least 22 characters long.") return p2sh = False if lock and len(lock.split("P2SH:")) == 2: p2sh = True wallet: Wallet = ctx.obj["WALLET"] await wallet.load_mint() wallet.status() _, send_proofs = await wallet.split_to_send( wallet.proofs, amount, lock, set_reserved=True ) token = await wallet.serialize_proofs( send_proofs, include_mints=True, ) print(token) if legacy: print("") print( "Legacy token without mint information for older clients. This token can only be be received by wallets who use the mint the token is issued from:" ) print("") token = await wallet.serialize_proofs( send_proofs, legacy=True, ) print(token) wallet.status() @cli.command("receive", help="Receive tokens.") @click.argument("token", type=str) @click.option("--lock", "-l", default=None, help="Unlock tokens.", type=str) @click.pass_context @coro async def receive(ctx, token: str, lock: str): wallet: Wallet = ctx.obj["WALLET"] await wallet.load_mint() wallet.status() if lock: # load the script and signature of this address from the database assert len(lock.split("P2SH:")) == 2, Exception( "lock has wrong format. Expected P2SH:
." ) address_split = lock.split("P2SH:")[1] p2shscripts = await get_unused_locks(address_split, db=wallet.db) assert len(p2shscripts) == 1, Exception("lock not found.") script = p2shscripts[0].script signature = p2shscripts[0].signature else: script, signature = None, None try: # backwards compatibility < 0.7.0: tokens without mint information proofs = [Proof(**p) for p in json.loads(base64.urlsafe_b64decode(token))] _, _ = await wallet.redeem(proofs, scnd_script=script, scnd_siganture=signature) except: dtoken = json.loads(base64.urlsafe_b64decode(token)) assert "tokens" in dtoken, Exception("no proofs in token") # if there is a `mints` field in the token # we get the mint information in the token and load the keys of each mint # we then redeem the tokens for each keyset individually if "mints" in dtoken and dtoken.get("mints") is not None: for mint_id in dtoken.get("mints"): for keyset in set(dtoken["mints"][mint_id]["ks"]): mint_url = dtoken["mints"][mint_id]["url"] # init a temporary wallet object keyset_wallet = Wallet( mint_url, os.path.join(CASHU_DIR, ctx.obj["WALLET_NAME"]) ) # first we check whether we know this mint already and ask the user mint_keysets = await get_keyset(id=keyset, db=keyset_wallet.db) if mint_keysets is None: click.confirm( f"Do you want to receive tokens from mint {mint_url}?", abort=True, default=True, ) # make sure that this mint indeed supports this keyset mint_keysets = await keyset_wallet._get_keysets(mint_url) if keyset in mint_keysets["keysets"]: # load the keys await keyset_wallet.load_mint(keyset_id=keyset) # redeem proofs of this keyset redeem_proofs = [ Proof(**p) for p in dtoken["tokens"] if Proof(**p).id == keyset ] _, _ = await keyset_wallet.redeem( redeem_proofs, scnd_script=script, scnd_siganture=signature ) keyset_wallet.db.connect() # reload proofs to update the main wallet's balance await wallet.load_proofs() else: # no mint information present, assume default mint proofs = [Proof(**p) for p in dtoken["tokens"]] _, _ = await wallet.redeem( proofs, scnd_script=script, scnd_siganture=signature ) wallet.status() @cli.command("burn", help="Burn spent tokens.") @click.argument("token", required=False, type=str) @click.option("--all", "-a", default=False, is_flag=True, help="Burn all spent tokens.") @click.option( "--force", "-f", default=False, is_flag=True, help="Force check on all tokens." ) @click.pass_context @coro async def burn(ctx, token: str, all: bool, force: bool): wallet: Wallet = ctx.obj["WALLET"] await wallet.load_mint() if not (all or token or force) or (token and all): print( "Error: enter a token or use --all to burn all pending tokens or --force to check all tokens." ) return if all: # check only those who are flagged as reserved proofs = await get_reserved_proofs(wallet.db) elif force: # check all proofs in db proofs = wallet.proofs else: # check only the specified ones proofs = [Proof(**p) for p in json.loads(base64.urlsafe_b64decode(token))] wallet.status() await wallet.invalidate(proofs) wallet.status() @cli.command("pending", help="Show pending tokens.") @click.pass_context @coro async def pending(ctx): wallet: Wallet = ctx.obj["WALLET"] await wallet.load_mint() reserved_proofs = await get_reserved_proofs(wallet.db) if len(reserved_proofs): print(f"--------------------------\n") sorted_proofs = sorted(reserved_proofs, key=itemgetter("send_id")) for i, (key, value) in enumerate( groupby(sorted_proofs, key=itemgetter("send_id")) ): grouped_proofs = list(value) token = await wallet.serialize_proofs(grouped_proofs) token_hidden_secret = await wallet.serialize_proofs(grouped_proofs) reserved_date = datetime.utcfromtimestamp( int(grouped_proofs[0].time_reserved) ).strftime("%Y-%m-%d %H:%M:%S") print( f"#{i} Amount: {sum_proofs(grouped_proofs)} sat Time: {reserved_date} ID: {key}\n" ) print(f"{token}\n") print(f"--------------------------\n") wallet.status() @cli.command("lock", help="Generate receiving lock.") @click.pass_context @coro async def lock(ctx): wallet: Wallet = ctx.obj["WALLET"] p2shscript = await wallet.create_p2sh_lock() txin_p2sh_address = p2shscript.address print("---- Pay to script hash (P2SH) ----\n") print("Use a lock to receive tokens that only you can unlock.") print("") print(f"Public receiving lock: P2SH:{txin_p2sh_address}") print("") print( f"Anyone can send tokens to this lock:\n\ncashu send