Files
nutshell/cashu/wallet/cli.py
2022-12-27 15:39:06 +01:00

655 lines
21 KiB
Python

#!/usr/bin/env python
import asyncio
import base64
import json
import os
import sys
import threading
import time
import urllib.parse
from datetime import datetime
from functools import wraps
from itertools import groupby
from operator import itemgetter
from os import listdir
from os.path import isdir, join
from typing import Dict, List
import click
from loguru import logger
from cashu.core.base import Proof, TokenJson, TokenMintJson
from cashu.core.helpers import sum_proofs
from cashu.core.migrations import migrate_databases
from cashu.core.settings import (
CASHU_DIR,
DEBUG,
ENV_FILE,
LIGHTNING,
MINT_URL,
NOSTR_PRIVATE_KEY,
NOSTR_RELAYS,
SOCKS_HOST,
SOCKS_PORT,
TOR,
VERSION,
)
from cashu.nostr.nostr.client.client import NostrClient
from cashu.nostr.nostr.event import Event
from cashu.nostr.nostr.key import PublicKey
from cashu.tor.tor import TorProxy
from cashu.wallet import migrations
from cashu.wallet.crud import (
get_keyset,
get_lightning_invoices,
get_reserved_proofs,
get_unused_locks,
)
from cashu.wallet.wallet import Wallet as Wallet
from .cli_helpers import (
get_mint_wallet,
print_mint_balances,
redeem_multimint,
verify_mints,
)
async def init_wallet(wallet: Wallet):
"""Performs migrations and loads proofs from db."""
await migrate_databases(wallet.db, migrations)
await wallet.load_proofs()
class NaturalOrderGroup(click.Group):
"""For listing commands in help in order of definition"""
def list_commands(self, ctx):
return self.commands.keys()
@click.group(cls=NaturalOrderGroup)
@click.option("--host", "-h", default=MINT_URL, help=f"Mint URL (default: {MINT_URL}).")
@click.option(
"--wallet",
"-w",
"walletname",
default="wallet",
help="Wallet name (default: wallet).",
)
@click.pass_context
def cli(ctx, host: str, walletname: str):
if TOR and not TorProxy().check_platform():
error_str = "Your settings say TOR=true but the built-in Tor bundle is not supported on your system. You have two options: Either install Tor manually and set TOR=FALSE and SOCKS_HOST=localhost and SOCKS_PORT=9050 in your Cashu config (recommended). Or turn off Tor by setting TOR=false (not recommended). Cashu will not work until you edit your config file accordingly."
error_str += "\n\n"
if ENV_FILE:
error_str += f"Edit your Cashu config file here: {ENV_FILE}"
env_path = ENV_FILE
else:
error_str += (
f"Ceate a new Cashu config file here: {os.path.join(CASHU_DIR, '.env')}"
)
env_path = os.path.join(CASHU_DIR, ".env")
error_str += f'\n\nYou can turn off Tor with this command: echo "TOR=FALSE" >> {env_path}'
raise Exception(error_str)
# configure logger
logger.remove()
logger.add(sys.stderr, level="DEBUG" if DEBUG else "INFO")
ctx.ensure_object(dict)
ctx.obj["HOST"] = host
ctx.obj["WALLET_NAME"] = walletname
wallet = Wallet(ctx.obj["HOST"], os.path.join(CASHU_DIR, walletname))
ctx.obj["WALLET"] = wallet
asyncio.run(init_wallet(wallet))
pass
# https://github.com/pallets/click/issues/85#issuecomment-503464628
def coro(f):
@wraps(f)
def wrapper(*args, **kwargs):
return asyncio.run(f(*args, **kwargs))
return wrapper
@cli.command("pay", help="Pay Lightning invoice.")
@click.argument("invoice", type=str)
@click.option(
"--yes", "-y", default=False, is_flag=True, help="Skip confirmation.", type=bool
)
@click.pass_context
@coro
async def pay(ctx, invoice: str, yes: bool):
wallet: Wallet = ctx.obj["WALLET"]
await wallet.load_mint()
wallet.status()
amount, fees = await wallet.get_pay_amount_with_fees(invoice)
if not yes:
click.confirm(
f"Pay {amount - fees} sat ({amount} sat incl. fees)?",
abort=True,
default=True,
)
print(f"Paying Lightning invoice ...")
assert amount > 0, "amount is not positive"
if wallet.available_balance < amount:
print("Error: Balance too low.")
return
_, send_proofs = await wallet.split_to_send(wallet.proofs, amount)
await wallet.pay_lightning(send_proofs, invoice)
wallet.status()
@cli.command("invoice", help="Create Lighting invoice.")
@click.argument("amount", type=int)
@click.option("--hash", default="", help="Hash of the paid invoice.", type=str)
@click.pass_context
@coro
async def invoice(ctx, amount: int, hash: str):
wallet: Wallet = ctx.obj["WALLET"]
await wallet.load_mint()
wallet.status()
if not LIGHTNING:
r = await wallet.mint(amount)
elif amount and not hash:
invoice = await wallet.request_mint(amount)
if invoice.pr:
print(f"Pay invoice to mint {amount} sat:")
print("")
print(f"Invoice: {invoice.pr}")
print("")
print(
f"Execute this command if you abort the check:\ncashu invoice {amount} --hash {invoice.hash}"
)
check_until = time.time() + 5 * 60 # check for five minutes
print("")
print(
f"Checking invoice ...",
end="",
flush=True,
)
paid = False
while time.time() < check_until and not paid:
time.sleep(3)
try:
await wallet.mint(amount, invoice.hash)
paid = True
print(" Invoice paid.")
except Exception as e:
# TODO: user error codes!
if str(e) == "Error: Lightning invoice not paid yet.":
print(".", end="", flush=True)
continue
elif amount and hash:
await wallet.mint(amount, hash)
wallet.status()
return
@cli.command("balance", help="Balance.")
@click.option(
"--verbose",
"-v",
default=False,
is_flag=True,
help="Show pending tokens as well.",
type=bool,
)
@click.pass_context
@coro
async def balance(ctx, verbose):
wallet: Wallet = ctx.obj["WALLET"]
if verbose:
# show balances per keyset
keyset_balances = wallet.balance_per_keyset()
if len(keyset_balances) > 1:
print(f"You have balances in {len(keyset_balances)} keysets:")
print("")
for k, v in keyset_balances.items():
print(
f"Keyset: {k} - Balance: {v['available']} sat (pending: {v['balance']-v['available']} sat)"
)
print("")
await print_mint_balances(ctx, wallet)
if verbose:
print(
f"Balance: {wallet.available_balance} sat (pending: {wallet.balance-wallet.available_balance} sat) in {len([p for p in wallet.proofs if not p.reserved])} tokens"
)
else:
print(f"Balance: {wallet.available_balance} sat")
@cli.command("send", help="Send tokens.")
@click.argument("amount", type=int)
@click.option("--lock", "-l", default=None, help="Lock tokens (P2SH).", type=str)
@click.option(
"--legacy",
default=False,
is_flag=True,
help="Print legacy token without mint information.",
type=bool,
)
@click.pass_context
@coro
async def send(ctx, amount: int, lock: str, legacy: bool):
if lock and len(lock) < 22:
print("Error: lock has to be at least 22 characters long.")
return
p2sh = False
if lock and len(lock.split("P2SH:")) == 2:
p2sh = True
wallet = await get_mint_wallet(ctx)
await wallet.load_proofs()
_, send_proofs = await wallet.split_to_send(
wallet.proofs, amount, lock, set_reserved=True
)
token = await wallet.serialize_proofs(
send_proofs,
include_mints=True,
)
print(token)
if legacy:
print("")
print(
"Legacy token without mint information for older clients. This token can only be be received by wallets who use the mint the token is issued from:"
)
print("")
token = await wallet.serialize_proofs(
send_proofs,
legacy=True,
)
print(token)
wallet.status()
async def receive(ctx, token: str, lock: str):
wallet: Wallet = ctx.obj["WALLET"]
await wallet.load_mint()
# check for P2SH locks
if lock:
# load the script and signature of this address from the database
assert len(lock.split("P2SH:")) == 2, Exception(
"lock has wrong format. Expected P2SH:<address>."
)
address_split = lock.split("P2SH:")[1]
p2shscripts = await get_unused_locks(address_split, db=wallet.db)
assert len(p2shscripts) == 1, Exception("lock not found.")
script, signature = p2shscripts[0].script, p2shscripts[0].signature
else:
script, signature = None, None
# deserialize token
# ----- backwards compatibility -----
# we support old tokens (< 0.7) without mint information and (W3siaWQ...)
# new tokens (>= 0.7) with multiple mint support (eyJ0b2...)
try:
# backwards compatibility: tokens without mint information
# supports tokens of the form W3siaWQiOiJH
# LNbits token link parsing
# can extract minut URL from LNbits token links like:
# https://lnbits.server/cashu/wallet?mint_id=aMintId&recv_token=W3siaWQiOiJHY2...
url = None
if len(token.split("&recv_token=")) == 2:
# extract URL params
params = urllib.parse.parse_qs(token.split("?")[1])
# extract URL
if "mint_id" in params:
url = (
token.split("?")[0].split("/wallet")[0]
+ "/api/v1/"
+ params["mint_id"][0]
)
# extract token
token = params["recv_token"][0]
# assume W3siaWQiOiJH.. token
# trows an error if the desirialization with the old format doesn't work
proofs = [Proof(**p) for p in json.loads(base64.urlsafe_b64decode(token))]
token = await wallet.serialize_proofs(
proofs,
include_mints=False,
)
# if it was an LNbits link
# and add url and keyset id to token from link extraction above
if url:
token_object: TokenJson = await wallet._make_token(
proofs, include_mints=False
)
token_object.mints = {}
keysets = list(set([p.id for p in proofs]))
assert keysets is not None, "no keysets"
token_object.mints[url] = TokenMintJson(url=url, ks=keysets) # type: ignore
token = await wallet._serialize_token_base64(token_object)
except:
pass
# ----- receive token -----
# deserialize token
dtoken = json.loads(base64.urlsafe_b64decode(token))
assert "tokens" in dtoken, Exception("no proofs in token")
includes_mint_info: bool = "mints" in dtoken and dtoken.get("mints") is not None
# if there is a `mints` field in the token
# we check whether the token has mints that we don't know yet
# and ask the user if they want to trust the new mitns
if includes_mint_info:
# we ask the user to confirm any new mints the tokens may include
await verify_mints(ctx, dtoken)
# redeem tokens with new wallet instances
await redeem_multimint(ctx, dtoken, script, signature)
# reload main wallet so the balance updates
await wallet.load_proofs()
else:
# no mint information present, we extract the proofs and use wallet's default mint
proofs = [Proof(**p) for p in dtoken["tokens"]]
_, _ = await wallet.redeem(proofs, script, signature)
wallet.status()
@cli.command("receive", help="Receive tokens.")
@click.argument("token", type=str)
@click.option("--lock", "-l", default=None, help="Unlock tokens.", type=str)
@click.pass_context
@coro
async def receive_cli(ctx, token: str, lock: str):
wallet: Wallet = ctx.obj["WALLET"]
wallet.status()
await receive(ctx, token, lock)
@cli.command("burn", help="Burn spent tokens.")
@click.argument("token", required=False, type=str)
@click.option("--all", "-a", default=False, is_flag=True, help="Burn all spent tokens.")
@click.option(
"--force", "-f", default=False, is_flag=True, help="Force check on all tokens."
)
@click.pass_context
@coro
async def burn(ctx, token: str, all: bool, force: bool):
wallet: Wallet = ctx.obj["WALLET"]
await wallet.load_mint()
if not (all or token or force) or (token and all):
print(
"Error: enter a token or use --all to burn all pending tokens or --force to check all tokens."
)
return
if all:
# check only those who are flagged as reserved
proofs = await get_reserved_proofs(wallet.db)
elif force:
# check all proofs in db
proofs = wallet.proofs
else:
# check only the specified ones
proofs = [Proof(**p) for p in json.loads(base64.urlsafe_b64decode(token))]
wallet.status()
await wallet.invalidate(proofs)
wallet.status()
@cli.command("pending", help="Show pending tokens.")
@click.pass_context
@coro
async def pending(ctx):
wallet: Wallet = ctx.obj["WALLET"]
await wallet.load_mint()
reserved_proofs = await get_reserved_proofs(wallet.db)
if len(reserved_proofs):
print(f"--------------------------\n")
sorted_proofs = sorted(reserved_proofs, key=itemgetter("send_id"))
for i, (key, value) in enumerate(
groupby(sorted_proofs, key=itemgetter("send_id"))
):
grouped_proofs = list(value)
token = await wallet.serialize_proofs(grouped_proofs)
token_hidden_secret = await wallet.serialize_proofs(grouped_proofs)
reserved_date = datetime.utcfromtimestamp(
int(grouped_proofs[0].time_reserved)
).strftime("%Y-%m-%d %H:%M:%S")
print(
f"#{i} Amount: {sum_proofs(grouped_proofs)} sat Time: {reserved_date} ID: {key}\n"
)
print(f"{token}\n")
print(f"--------------------------\n")
print("To remove all spent tokens use: cashu burn -a")
wallet.status()
@cli.command("lock", help="Generate receiving lock.")
@click.pass_context
@coro
async def lock(ctx):
wallet: Wallet = ctx.obj["WALLET"]
p2shscript = await wallet.create_p2sh_lock()
txin_p2sh_address = p2shscript.address
print("---- Pay to script hash (P2SH) ----\n")
print("Use a lock to receive tokens that only you can unlock.")
print("")
print(f"Public receiving lock: P2SH:{txin_p2sh_address}")
print("")
print(
f"Anyone can send tokens to this lock:\n\ncashu send <amount> --lock P2SH:{txin_p2sh_address}"
)
print("")
print(
f"Only you can receive tokens from this lock:\n\ncashu receive <token> --lock P2SH:{txin_p2sh_address}\n"
)
@cli.command("locks", help="Show unused receiving locks.")
@click.pass_context
@coro
async def locks(ctx):
wallet: Wallet = ctx.obj["WALLET"]
locks = await get_unused_locks(db=wallet.db)
if len(locks):
print("")
print(f"--------------------------\n")
for l in locks:
print(f"Address: {l.address}")
print(f"Script: {l.script}")
print(f"Signature: {l.signature}")
print("")
print(f"Receive: cashu receive <token> --lock P2SH:{l.address}")
print("")
print(f"--------------------------\n")
else:
print("No locks found. Create one using: cashu lock")
return True
@cli.command("invoices", help="List of all pending invoices.")
@click.pass_context
@coro
async def invoices(ctx):
wallet: Wallet = ctx.obj["WALLET"]
invoices = await get_lightning_invoices(db=wallet.db)
if len(invoices):
print("")
print(f"--------------------------\n")
for invoice in invoices:
print(f"Paid: {invoice.paid}")
print(f"Incoming: {invoice.amount > 0}")
print(f"Amount: {abs(invoice.amount)}")
if invoice.hash:
print(f"Hash: {invoice.hash}")
if invoice.preimage:
print(f"Preimage: {invoice.preimage}")
if invoice.time_created:
d = datetime.utcfromtimestamp(
int(float(invoice.time_created))
).strftime("%Y-%m-%d %H:%M:%S")
print(f"Created: {d}")
if invoice.time_paid:
d = datetime.utcfromtimestamp(int(float(invoice.time_paid))).strftime(
"%Y-%m-%d %H:%M:%S"
)
print(f"Paid: {d}")
print("")
print(f"Payment request: {invoice.pr}")
print("")
print(f"--------------------------\n")
else:
print("No invoices found.")
@cli.command("nsend", help="Send tokens via nostr.")
@click.argument("amount", type=int)
@click.argument(
"pubkey",
type=str,
)
@click.option(
"--verbose",
"-v",
default=False,
is_flag=True,
help="Show more information.",
type=bool,
)
@click.option(
"--yes", "-y", default=False, is_flag=True, help="Skip confirmation.", type=bool
)
@click.pass_context
@coro
async def nsend(ctx, amount: int, pubkey: str, verbose: bool, yes: bool):
wallet = await get_mint_wallet(ctx)
await wallet.load_proofs()
_, send_proofs = await wallet.split_to_send(
wallet.proofs, amount, set_reserved=True
)
token = await wallet.serialize_proofs(send_proofs)
print("")
print(token)
if not yes:
print("")
click.confirm(
f"Send {amount} sat to nostr pubkey {pubkey}?",
abort=True,
default=True,
)
# we only use ephemeral private keys for sending
client = NostrClient(relays=NOSTR_RELAYS)
if verbose:
print(f"Your ephemeral nostr private key: {client.private_key.hex()}")
await asyncio.sleep(1)
client.dm(token, PublicKey(bytes.fromhex(pubkey)))
print(f"Token sent to {pubkey}")
client.close()
@cli.command("nreceive", help="Receive tokens via nostr.")
@click.option(
"--verbose",
"-v",
help="Display more information.",
is_flag=True,
default=False,
type=bool,
)
@click.pass_context
@coro
async def nreceive(ctx, verbose: bool):
if NOSTR_PRIVATE_KEY is None:
print(
"Warning: No nostr private key set! You don't have NOSTR_PRIVATE_KEY set in your .env file. I will create a random private key for this session but I will not remember it."
)
print("")
client = NostrClient(privatekey_hex=NOSTR_PRIVATE_KEY, relays=NOSTR_RELAYS)
print(f"Your nostr public key: {client.public_key.hex()}")
if verbose:
print(f"Your nostr private key (do not share!): {client.private_key.hex()}")
await asyncio.sleep(2)
def get_token_callback(event: Event, decrypted_content):
if verbose:
print(
f"From {event.public_key[:3]}..{event.public_key[-3:]}: {decrypted_content}"
)
try:
# call the receive method
asyncio.run(receive(ctx, decrypted_content, ""))
except Exception as e:
pass
t = threading.Thread(
target=client.get_dm,
args=(
client.public_key,
get_token_callback,
),
name="Nostr DM",
)
t.start()
@cli.command("wallets", help="List of all available wallets.")
@click.pass_context
@coro
async def wallets(ctx):
# list all directories
wallets = [d for d in listdir(CASHU_DIR) if isdir(join(CASHU_DIR, d))]
try:
wallets.remove("mint")
except ValueError:
pass
for w in wallets:
wallet = Wallet(ctx.obj["HOST"], os.path.join(CASHU_DIR, w))
try:
await init_wallet(wallet)
if wallet.proofs and len(wallet.proofs):
active_wallet = False
if w == ctx.obj["WALLET_NAME"]:
active_wallet = True
print(
f"Wallet: {w}\tBalance: {sum_proofs(wallet.proofs)} sat (available: {sum_proofs([p for p in wallet.proofs if not p.reserved])} sat){' *' if active_wallet else ''}"
)
except:
pass
@cli.command("info", help="Information about Cashu wallet.")
@click.pass_context
@coro
async def info(ctx):
print(f"Version: {VERSION}")
print(f"Wallet: {ctx.obj['WALLET_NAME']}")
if DEBUG:
print(f"Debug: {DEBUG}")
print(f"Cashu dir: {CASHU_DIR}")
if ENV_FILE:
print(f"Settings: {ENV_FILE}")
if TOR:
print(f"Tor enabled: {TOR}")
if NOSTR_PRIVATE_KEY:
client = NostrClient(privatekey_hex=NOSTR_PRIVATE_KEY, connect=False)
print(f"Nostr public key: {client.public_key.hex()}")
print(f"Nostr relays: {NOSTR_RELAYS}")
if SOCKS_HOST:
print(f"Socks proxy: {SOCKS_HOST}:{SOCKS_PORT}")
print(f"Mint URL: {ctx.obj['HOST']}")
return