Wallet REST API (#199)

* Add REST API for Cashu wallet

* Add simple way to start REST API server

* Add simple tests for wallet REST API

* Add TokenV3 to REST API

* Improve tests for wallet REST API

* Make format

* Remove unused import

* Rename nostr module for API

* Unify helper functions for CLI and API

* Make format

* Move error handling from helper functions to API

* Remove 'is_api' flag where possible

* Make format, cleanup, add comments

* Fix typo for burn also in API

* Improve error handling for API

* Add flag 'trust_new_mint' to receive command

To enable trusting or mistrusting unknown mints via API

* Allow selecting mint for sending from API

* Fix: set specific mint via API

* Fix: select mint with maximum balance via CLI

* Use different variables for mint_nr

* Allow selecting mint when sending via nostr via API

* Remove unnessecary 'is_api' flags from 'send_nostr'

* Remove HTTPException from nostr.py

* Allow selecting mint for sending with parameter also via CLI

* Allow trusting unknown mint for receiving also via CLI

* Make format

* Enable trusting unknown mint also when receiving via nostr

* Fix: wrong indentation of in receive function

* Use relative imports for wallet API

* Unify get_mint_wallet for CLI and API

* Unify send command for CLI and API

* Unify receive for CLI and API

* Catch errors in nostr via API

* Remove flag 'is_api' from verify_mints_tokenv2

* Remove cli_helpers left from last merge

* refactor cli selection

* load mint in nostr_send

* cleanup

* add cli_helpers.py

* legacy deserialization in cli

* make format

* clean up api response

* fix tests

* try pk

* verify mints in api

* try github tests

* Fix: verify_mints for API

* Uncomment verify_mints in receive of API

* update README

* Show mint url in pending

* clean up balance response

* fix test

* mint selection in api

* clean up API

* CLI: verify mint for receive -a

* clean up

* Rest -> REST

* Remove unused imports

---------

Co-authored-by: sihamon <sihamon@proton.me>
Co-authored-by: sihamon <126967444+sihamon@users.noreply.github.com>
This commit is contained in:
callebtc
2023-05-11 23:27:13 +02:00
committed by GitHub
parent 7f524927e2
commit 4088ab2876
18 changed files with 976 additions and 380 deletions

View File

@@ -168,6 +168,13 @@ Balance: 0 sat
Balance: 69 sat
```
# Starting the wallet API daemon
Nutshell wallet can be used in daemon mode that can be controlled through a REST API:
```bash
poetry run api
```
You can find the API docs at [http://localhost:4448/docs](http://localhost:4448/docs).
# Running a mint
This command runs the mint on your local computer. Skip this step if you want to use the [public test mint](#test-instance) instead.

View File

@@ -1,7 +1,6 @@
import base64
import json
from sqlite3 import Row
from typing import Any, Dict, List, Optional, TypedDict, Union
from typing import Any, Dict, List, Optional, Union
from loguru import logger
from pydantic import BaseModel

View File

@@ -34,7 +34,7 @@ async def migrate_databases(db: Database, migrations_module):
async with db.connect() as conn:
await set_migration_version(conn, db_name, version)
async with db.connect() as conn:
async with db.connect() as conn: # type: ignore
exists = None
if conn.type == SQLITE:
exists = await conn.fetchone(

View File

@@ -78,6 +78,8 @@ class WalletSettings(CashuSettings):
mint_host: str = Field(default="8333.space")
mint_port: int = Field(default=3338)
api_port: int = Field(default=4448)
nostr_private_key: str = Field(default=None)
nostr_relays: List[str] = Field(
default=[

View File

View File

@@ -0,0 +1,13 @@
from ...core.base import TokenV3
from ...wallet.crud import get_keyset
async def verify_mints(wallet, tokenObj: TokenV3):
# verify mints
mints = set([t.mint for t in tokenObj.token])
if None in mints:
raise Exception("Token has missing mint information.")
for mint in mints:
assert mint
mint_keysets = await get_keyset(mint_url=mint, db=wallet.db)
assert mint_keysets, "We don't know this mint."

22
cashu/wallet/api/app.py Normal file
View File

@@ -0,0 +1,22 @@
from fastapi import FastAPI
from ...core.settings import settings
from .router import router
def create_app() -> FastAPI:
app = FastAPI(
title="Cashu Wallet REST API",
description="REST API for Cashu Nutshell",
version=settings.version,
license_info={
"name": "MIT License",
"url": "https://raw.githubusercontent.com/cashubtc/cashu/main/LICENSE",
},
)
return app
app = create_app()
app.include_router(router=router)

9
cashu/wallet/api/main.py Normal file
View File

@@ -0,0 +1,9 @@
import uvicorn
from ...core.settings import settings
def main(port=settings.api_port):
config = uvicorn.Config("cashu.wallet.api.app:app", port=port, host="127.0.0.1")
server = uvicorn.Server(config)
server.run()

408
cashu/wallet/api/router.py Normal file
View File

@@ -0,0 +1,408 @@
import os
from datetime import datetime
from itertools import groupby, islice
from operator import itemgetter
from os import listdir
from os.path import isdir, join
from typing import Optional
from fastapi import APIRouter, HTTPException, Query, status
from ...core.base import TokenV3
from ...core.helpers import sum_proofs
from ...core.settings import settings
from ...nostr.nostr.client.client import NostrClient
from ...tor.tor import TorProxy
from ...wallet.crud import get_lightning_invoices, get_reserved_proofs, get_unused_locks
from ...wallet.helpers import deserialize_token_from_string, init_wallet, receive, send
from ...wallet.nostr import receive_nostr, send_nostr
from ...wallet.wallet import Wallet as Wallet
from .api_helpers import verify_mints
router: APIRouter = APIRouter()
def create_wallet(url=settings.mint_url, dir=settings.cashu_dir, name="wallet"):
return Wallet(url, os.path.join(dir, name), name=name)
async def load_mint(wallet: Wallet, mint: Optional[str] = None):
if mint:
wallet = create_wallet(mint)
try:
await wallet.load_mint()
except Exception as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
return wallet
wallet = create_wallet()
@router.on_event("startup")
async def start_wallet():
if settings.tor and not TorProxy().check_platform():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="tor not working"
)
await init_wallet(wallet)
@router.post("/pay", name="Pay lightning invoice")
async def pay(
invoice: str = Query(default=..., description="Lightning invoice to pay"),
mint: str = Query(
default=None,
description="Mint URL to pay from (None for default mint)",
),
):
if not settings.lightning:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="lightning not enabled."
)
global wallet
wallet = await load_mint(wallet, mint)
await wallet.load_proofs()
initial_balance = wallet.available_balance
total_amount, fee_reserve_sat = await wallet.get_pay_amount_with_fees(invoice)
assert total_amount > 0, HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="amount has to be larger than zero.",
)
if wallet.available_balance < total_amount:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail="balance is too low."
)
_, send_proofs = await wallet.split_to_send(wallet.proofs, total_amount)
await wallet.pay_lightning(send_proofs, invoice)
await wallet.load_proofs()
return {
"amount": total_amount - fee_reserve_sat,
"fee": fee_reserve_sat,
"amount_with_fee": total_amount,
"initial_balance": initial_balance,
"balance": wallet.available_balance,
}
@router.post("/invoice", name="Request lightning invoice")
async def invoice(
amount: int = Query(default=..., description="Amount to request in invoice"),
hash: str = Query(default=None, description="Hash of paid invoice"),
mint: str = Query(
default=None,
description="Mint URL to create an invoice at (None for default mint)",
),
):
global wallet
wallet = await load_mint(wallet, mint)
initial_balance = wallet.available_balance
if not settings.lightning:
r = await wallet.mint(amount)
return {
"amount": amount,
"balance": wallet.available_balance,
"initial_balance": initial_balance,
}
elif amount and not hash:
invoice = await wallet.request_mint(amount)
return {
"invoice": invoice,
"balance": wallet.available_balance,
"initial_balance": initial_balance,
}
elif amount and hash:
await wallet.mint(amount, hash)
return {
"amount": amount,
"hash": hash,
"balance": wallet.available_balance,
"initial_balance": initial_balance,
}
return
@router.get("/balance", name="Balance", summary="Display balance.")
async def balance():
await wallet.load_proofs()
result: dict = {"balance": wallet.available_balance}
keyset_balances = wallet.balance_per_keyset()
if len(keyset_balances) > 0:
result.update({"keysets": keyset_balances})
mint_balances = await wallet.balance_per_minturl()
if len(mint_balances) > 0:
result.update({"mints": mint_balances})
return result
@router.post("/send", name="Send tokens")
async def send_command(
amount: int = Query(default=..., description="Amount to send"),
nostr: str = Query(default=None, description="Send to nostr pubkey"),
lock: str = Query(default=None, description="Lock tokens (P2SH)"),
mint: str = Query(
default=None,
description="Mint URL to send from (None for default mint)",
),
):
global wallet
wallet = await load_mint(wallet, mint)
await wallet.load_proofs()
if not nostr:
try:
balance, token = await send(wallet, amount, lock, legacy=False)
except Exception as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
return {"balance": balance, "token": token}
else:
try:
token, pubkey = await send_nostr(wallet, amount, nostr)
except Exception as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
return {
"balance": wallet.available_balance,
"token": token,
"npub": pubkey,
}
@router.post("/receive", name="Receive tokens")
async def receive_command(
token: str = Query(default=None, description="Token to receive"),
lock: str = Query(default=None, description="Unlock tokens"),
nostr: bool = Query(default=False, description="Receive tokens via nostr"),
all: bool = Query(default=False, description="Receive all pending tokens"),
):
result = {"initial_balance": wallet.available_balance}
if token:
try:
tokenObj: TokenV3 = await deserialize_token_from_string(token)
try:
await verify_mints(wallet, tokenObj)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)
)
balance = await receive(wallet, tokenObj, lock)
except Exception as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
elif nostr:
try:
await receive_nostr(wallet)
balance = wallet.available_balance
except Exception as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
elif all:
reserved_proofs = await get_reserved_proofs(wallet.db)
balance = None
if len(reserved_proofs):
for _, value in groupby(reserved_proofs, key=itemgetter("send_id")): # type: ignore
proofs = list(value)
token = await wallet.serialize_proofs(proofs)
tokenObj = await deserialize_token_from_string(token)
try:
await verify_mints(wallet, tokenObj)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)
)
try:
balance = await receive(wallet, tokenObj, lock)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST, detail=str(e)
)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="enter token or use either flag --nostr or --all.",
)
assert balance
result.update({"balance": balance})
return result
@router.post("/burn", name="Burn spent tokens")
async def burn(
token: str = Query(default=None, description="Token to burn"),
all: bool = Query(default=False, description="Burn all spent tokens"),
force: bool = Query(default=False, description="Force check on all tokens."),
delete: str = Query(
default=None,
description="Forcefully delete pending token by send ID if mint is unavailable",
),
mint: str = Query(
default=None,
description="Mint URL to burn from (None for default mint)",
),
):
global wallet
if not delete:
wallet = await load_mint(wallet, mint)
if not (all or token or force or delete) or (token and all):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="enter a token or use --all to burn all pending tokens, --force to check all tokens"
"or --delete with send ID to force-delete pending token from list if mint is unavailable.",
)
if all:
# check only those who are flagged as reserved
proofs = await get_reserved_proofs(wallet.db)
elif force:
# check all proofs in db
proofs = wallet.proofs
elif delete:
reserved_proofs = await get_reserved_proofs(wallet.db)
proofs = [proof for proof in reserved_proofs if proof["send_id"] == delete]
else:
# check only the specified ones
tokenObj = TokenV3.deserialize(token)
proofs = tokenObj.get_proofs()
if delete:
await wallet.invalidate(proofs, check_spendable=False)
else:
await wallet.invalidate(proofs)
return {"balance": wallet.available_balance}
@router.get("/pending", name="Show pending tokens")
async def pending(
number: int = Query(default=None, description="Show only n pending tokens"),
offset: int = Query(
default=0, description="Show pending tokens only starting from offset"
),
):
reserved_proofs = await get_reserved_proofs(wallet.db)
result: dict = {}
if len(reserved_proofs):
sorted_proofs = sorted(reserved_proofs, key=itemgetter("send_id")) # type: ignore
if number:
number += offset
for i, (key, value) in islice(
enumerate(
groupby(
sorted_proofs,
key=itemgetter("send_id"),
)
),
offset,
number,
):
grouped_proofs = list(value)
token = await wallet.serialize_proofs(grouped_proofs)
tokenObj = await deserialize_token_from_string(token)
mint = [t.mint for t in tokenObj.token][0]
reserved_date = datetime.utcfromtimestamp(
int(grouped_proofs[0].time_reserved)
).strftime("%Y-%m-%d %H:%M:%S")
result.update(
{
f"{i}": {
"amount": sum_proofs(grouped_proofs),
"time": reserved_date,
"ID": key,
"token": token,
"mint": mint,
}
}
)
return result
@router.get("/lock", name="Generate receiving lock")
async def lock():
p2shscript = await wallet.create_p2sh_lock()
txin_p2sh_address = p2shscript.address
return {"P2SH": txin_p2sh_address}
@router.get("/locks", name="Show unused receiving locks")
async def locks():
locks = await get_unused_locks(db=wallet.db)
if len(locks):
return {"locks": locks}
else:
return {"locks": []}
@router.get("/invoices", name="List all pending invoices")
async def invoices():
invoices = await get_lightning_invoices(db=wallet.db)
if len(invoices):
return {"invoices": invoices}
else:
return {"invoices": []}
@router.get("/wallets", name="List all available wallets")
async def wallets():
wallets = [
d for d in listdir(settings.cashu_dir) if isdir(join(settings.cashu_dir, d))
]
try:
wallets.remove("mint")
except ValueError:
pass
result = {}
for w in wallets:
wallet = Wallet(settings.mint_url, os.path.join(settings.cashu_dir, w), name=w)
try:
await init_wallet(wallet)
if wallet.proofs and len(wallet.proofs):
active_wallet = False
if w == wallet.name:
active_wallet = True
if active_wallet:
result.update(
{
f"{w}": {
"balance": sum_proofs(wallet.proofs),
"available": sum_proofs(
[p for p in wallet.proofs if not p.reserved]
),
}
}
)
except:
pass
return result
@router.get("/info", name="Information about Cashu wallet")
async def info():
general = {
"version": settings.version,
"wallet": wallet.name,
"debug": settings.debug,
"cashu_dir": settings.cashu_dir,
"mint_url": settings.mint_url,
}
if settings.env_file:
general.update({"settings": settings.env_file})
if settings.tor:
general.update({"tor": settings.tor})
if settings.nostr_private_key:
try:
client = NostrClient(private_key=settings.nostr_private_key, connect=False)
general.update(
{
"nostr": {
"public_key": client.private_key.bech32(),
"relays": settings.nostr_relays,
},
}
)
except:
general.update({"nostr": "Invalid key"})
if settings.socks_host:
general.update(
{"socks proxy": settings.socks_host + ":" + str(settings.socks_host)}
)
return general

View File

@@ -1,10 +1,7 @@
#!/usr/bin/env python
import asyncio
import base64
import json
import os
import sys
import time
from datetime import datetime
from functools import wraps
@@ -12,40 +9,20 @@ from itertools import groupby, islice
from operator import itemgetter
from os import listdir
from os.path import isdir, join
from typing import Dict, List
import click
from click import Context
from loguru import logger
from ...core.base import Proof, TokenV1, TokenV2, TokenV3
from ...core.base import TokenV3
from ...core.helpers import sum_proofs
from ...core.migrations import migrate_databases
from ...core.settings import settings
from ...nostr.nostr.client.client import NostrClient
from ...tor.tor import TorProxy
from ...wallet import migrations
from ...wallet.crud import (
get_keyset,
get_lightning_invoices,
get_reserved_proofs,
get_unused_locks,
)
from ...wallet.crud import get_lightning_invoices, get_reserved_proofs, get_unused_locks
from ...wallet.wallet import Wallet as Wallet
from .cli_helpers import (
get_mint_wallet,
print_mint_balances,
redeem_TokenV3_multimint,
serialize_TokenV1_to_TokenV3,
serialize_TokenV2_to_TokenV3,
)
from .nostr import receive_nostr, send_nostr
async def init_wallet(wallet: Wallet):
"""Performs migrations and loads proofs from db."""
await migrate_databases(wallet.db, migrations)
await wallet.load_proofs()
from ..cli.cli_helpers import get_mint_wallet, print_mint_balances, verify_mint
from ..helpers import deserialize_token_from_string, init_wallet, receive, send
from ..nostr import receive_nostr, send_nostr
class NaturalOrderGroup(click.Group):
@@ -59,7 +36,7 @@ class NaturalOrderGroup(click.Group):
@click.option(
"--host",
"-h",
default=settings.mint_url,
default=None,
help=f"Mint URL (default: {settings.mint_url}).",
)
@click.option(
@@ -84,12 +61,25 @@ def cli(ctx: Context, host: str, walletname: str):
raise Exception(error_str)
ctx.ensure_object(dict)
ctx.obj["HOST"] = host
ctx.obj["HOST"] = host or settings.mint_url
ctx.obj["WALLET_NAME"] = walletname
wallet = Wallet(ctx.obj["HOST"], os.path.join(settings.cashu_dir, walletname))
wallet = Wallet(
ctx.obj["HOST"], os.path.join(settings.cashu_dir, walletname), name=walletname
)
ctx.obj["WALLET"] = wallet
asyncio.run(init_wallet(wallet))
pass
# MUTLIMINT: Select a wallet
# only if a command is one of a subset that needs to specify a mint host
# if a mint host is already specified as an argument `host`, use it
if ctx.invoked_subcommand not in ["send", "invoice", "pay"] or host:
return
# else: we ask the user to select one
ctx.obj["WALLET"] = wallet # set a wallet for get_mint_wallet in the next step
ctx.obj["WALLET"] = asyncio.run(
get_mint_wallet(ctx)
) # select a specific wallet by CLI input
asyncio.run(init_wallet(ctx.obj["WALLET"]))
# https://github.com/pallets/click/issues/85#issuecomment-503464628
@@ -207,7 +197,7 @@ async def balance(ctx: Context, verbose):
)
print("")
await print_mint_balances(ctx, wallet)
await print_mint_balances(wallet)
if verbose:
print(
@@ -217,42 +207,6 @@ async def balance(ctx: Context, verbose):
print(f"Balance: {wallet.available_balance} sat")
async def send(ctx: Context, amount: int, lock: str, legacy: bool):
"""
Prints token to send to stdout.
"""
if lock and len(lock) < 22:
print("Error: lock has to be at least 22 characters long.")
return
p2sh = False
if lock and len(lock.split("P2SH:")) == 2:
p2sh = True
wallet = await get_mint_wallet(ctx)
await wallet.load_proofs()
_, send_proofs = await wallet.split_to_send(
wallet.proofs, amount, lock, set_reserved=True
)
token = await wallet.serialize_proofs(
send_proofs,
include_mints=True,
)
print(token)
if legacy:
print("")
print("Old token format:")
print("")
token = await wallet.serialize_proofs(
send_proofs,
legacy=True,
)
print(token)
wallet.status()
@cli.command("send", help="Send tokens.")
@click.argument("amount", type=int)
@click.argument("nostr", type=str, required=False)
@@ -295,90 +249,11 @@ async def send_command(
verbose: bool,
yes: bool,
):
if not nostr and not nopt:
await send(ctx, amount, lock, legacy)
else:
await send_nostr(ctx, amount, nostr or nopt, verbose, yes)
async def receive(ctx: Context, token: str, lock: str):
wallet: Wallet = ctx.obj["WALLET"]
# await wallet.load_mint()
# check for P2SH locks
if lock:
# load the script and signature of this address from the database
assert len(lock.split("P2SH:")) == 2, Exception(
"lock has wrong format. Expected P2SH:<address>."
)
address_split = lock.split("P2SH:")[1]
p2shscripts = await get_unused_locks(address_split, db=wallet.db)
assert len(p2shscripts) == 1, Exception("lock not found.")
script, signature = p2shscripts[0].script, p2shscripts[0].signature
if not nostr and not nopt:
await send(wallet, amount, lock, legacy)
else:
script, signature = None, None
# deserialize token
# ----- backwards compatibility -----
# V2Tokens (0.7-0.11.0) (eyJwcm9...)
if token.startswith("eyJwcm9"):
try:
tokenv2 = TokenV2.parse_obj(json.loads(base64.urlsafe_b64decode(token)))
token = await serialize_TokenV2_to_TokenV3(wallet, tokenv2)
except:
pass
# V1Tokens (<0.7) (W3siaWQ...)
if token.startswith("W3siaWQ"):
try:
tokenv1 = TokenV1.parse_obj(json.loads(base64.urlsafe_b64decode(token)))
token = await serialize_TokenV1_to_TokenV3(wallet, tokenv1)
print(token)
except:
pass
# ----- receive token -----
# deserialize token
tokenObj = TokenV3.deserialize(token)
# tokenObj = TokenV2.parse_obj(dtoken)
assert len(tokenObj.token), Exception("no proofs in token")
assert len(tokenObj.token[0].proofs), Exception("no proofs in token")
includes_mint_info: bool = any([t.mint for t in tokenObj.token])
# if there is a `mints` field in the token
# we check whether the token has mints that we don't know yet
# and ask the user if they want to trust the new mitns
if includes_mint_info:
# we ask the user to confirm any new mints the tokens may include
# await verify_mints(ctx, tokenObj)
# redeem tokens with new wallet instances
await redeem_TokenV3_multimint(ctx, tokenObj, script, signature)
else:
# no mint information present, we extract the proofs and use wallet's default mint
proofs = [p for t in tokenObj.token for p in t.proofs]
# first we load the mint URL from the DB
keyset_in_token = proofs[0].id
assert keyset_in_token
# we get the keyset from the db
mint_keysets = await get_keyset(id=keyset_in_token, db=wallet.db)
assert mint_keysets, Exception("we don't know this keyset")
assert mint_keysets.mint_url, Exception("we don't know this mint's URL")
# now we have the URL
mint_wallet = Wallet(
mint_keysets.mint_url,
os.path.join(settings.cashu_dir, ctx.obj["WALLET_NAME"]),
)
await mint_wallet.load_mint(keyset_in_token)
_, _ = await mint_wallet.redeem(proofs, script, signature)
print(f"Received {sum_proofs(proofs)} sats")
# reload main wallet so the balance updates
await wallet.load_proofs()
wallet.status()
await send_nostr(wallet, amount, nostr or nopt, verbose, yes)
@cli.command("receive", help="Receive tokens.")
@@ -401,21 +276,44 @@ async def receive(ctx: Context, token: str, lock: str):
@click.pass_context
@coro
async def receive_cli(
ctx: Context, token: str, lock: str, nostr: bool, all: bool, verbose: bool
ctx: Context,
token: str,
lock: str,
nostr: bool,
all: bool,
verbose: bool,
):
wallet: Wallet = ctx.obj["WALLET"]
wallet.status()
if token:
await receive(ctx, token, lock)
tokenObj = await deserialize_token_from_string(token)
# verify that we trust all mints in these tokens
# ask the user if they want to trust the new mints
for mint_url in set([t.mint for t in tokenObj.token if t.mint]):
mint_wallet = Wallet(
mint_url, os.path.join(settings.cashu_dir, wallet.name)
)
await verify_mint(mint_wallet, mint_url)
await receive(wallet, tokenObj, lock)
elif nostr:
await receive_nostr(ctx, verbose)
await receive_nostr(wallet, verbose)
elif all:
reserved_proofs = await get_reserved_proofs(wallet.db)
if len(reserved_proofs):
for key, value in groupby(reserved_proofs, key=itemgetter("send_id")): # type: ignore
proofs = list(value)
token = await wallet.serialize_proofs(proofs)
await receive(ctx, token, lock)
tokenObj = TokenV3.deserialize(token)
# verify that we trust all mints in these tokens
# ask the user if they want to trust the new mints
for mint_url in set([t.mint for t in tokenObj.token if t.mint]):
mint_wallet = Wallet(
mint_url, os.path.join(settings.cashu_dir, wallet.name)
)
await verify_mint(mint_wallet, mint_url)
await receive(wallet, tokenObj, lock)
else:
print("Error: enter token or use either flag --nostr or --all.")
@@ -505,12 +403,14 @@ async def pending(ctx: Context, legacy, number: int, offset: int):
):
grouped_proofs = list(value)
token = await wallet.serialize_proofs(grouped_proofs)
tokenObj = await deserialize_token_from_string(token)
mint = [t.mint for t in tokenObj.token][0]
# token_hidden_secret = await wallet.serialize_proofs(grouped_proofs)
reserved_date = datetime.utcfromtimestamp(
int(grouped_proofs[0].time_reserved)
).strftime("%Y-%m-%d %H:%M:%S")
print(
f"#{i} Amount: {sum_proofs(grouped_proofs)} sat Time: {reserved_date} ID: {key}\n"
f"#{i} Amount: {sum_proofs(grouped_proofs)} sat Time: {reserved_date} ID: {key} Mint: {mint}\n"
)
print(f"{token}\n")

View File

@@ -21,6 +21,77 @@ from ...wallet.crud import get_keyset
from ...wallet.wallet import Wallet as Wallet
async def get_mint_wallet(ctx: Context):
"""
Helper function that asks the user for an input to select which mint they want to load.
Useful for selecting the mint that the user wants to send tokens from.
"""
# we load a dummy wallet so we can check the balance per mint
wallet: Wallet = ctx.obj["WALLET"]
mint_balances = await wallet.balance_per_minturl()
# if we have balances on more than one mint, we ask the user to select one
if len(mint_balances) > 1:
await print_mint_balances(wallet, show_mints=True)
url_max = max(mint_balances, key=lambda v: mint_balances[v]["available"])
nr_max = list(mint_balances).index(url_max) + 1
mint_nr_str = input(
f"Select mint [1-{len(mint_balances)}] or "
f"press enter for mint with largest balance (Mint {nr_max}): "
)
if not mint_nr_str: # largest balance
mint_url = url_max
elif mint_nr_str.isdigit() and int(mint_nr_str) <= len(
mint_balances
): # specific mint
mint_url = list(mint_balances.keys())[int(mint_nr_str) - 1]
else:
raise Exception("invalid input.")
elif len(mint_balances) == 1:
mint_url = list(mint_balances.keys())[0]
else:
mint_url = wallet.url
# load this mint_url into a wallet
mint_wallet = Wallet(
mint_url,
os.path.join(settings.cashu_dir, ctx.obj["WALLET_NAME"]),
name=wallet.name,
)
# await mint_wallet.load_mint()
await mint_wallet.load_proofs()
return mint_wallet
async def print_mint_balances(wallet, show_mints=False):
"""
Helper function that prints the balances for each mint URL that we have tokens from.
"""
# get balances per mint
mint_balances = await wallet.balance_per_minturl()
# if we have a balance on a non-default mint, we show its URL
keysets = [k for k, v in wallet.balance_per_keyset().items()]
for k in keysets:
ks = await get_keyset(id=str(k), db=wallet.db)
if ks and ks.mint_url != wallet.url:
show_mints = True
# or we have a balance on more than one mint
# show balances per mint
if len(mint_balances) > 1 or show_mints:
print(f"You have balances in {len(mint_balances)} mints:")
print("")
for i, (k, v) in enumerate(mint_balances.items()):
print(
f"Mint {i+1}: Balance: {v['available']} sat (pending: {v['balance']-v['available']} sat) URL: {k}"
)
print("")
async def verify_mint(mint_wallet: Wallet, url: str):
"""A helper function that asks the user if they trust the mint if the user
has not encountered the mint before (there is no entry in the database).
@@ -46,202 +117,3 @@ async def verify_mint(mint_wallet: Wallet, url: str):
)
else:
logger.debug(f"We know keyset {mint_keysets.id} already")
async def verify_mints_tokenv2(ctx: Context, token: TokenV2):
"""
A helper function that iterates through all mints in the token and if it has
not been encountered before, asks the user to confirm.
It will instantiate a Wallet with each keyset and check whether the mint supports it.
It will then get the keys for that keyset from the mint and check whether the keyset id is correct.
"""
if token.mints is None:
return
proofs_keysets = set([p.id for p in token.proofs])
logger.debug(f"Verifying mints")
trust_token_mints = True
for mint in token.mints:
for keyset in set([id for id in mint.ids if id in proofs_keysets]):
# init a temporary wallet object
keyset_wallet = Wallet(
mint.url, os.path.join(settings.cashu_dir, ctx.obj["WALLET_NAME"])
)
# make sure that this mint supports this keyset
mint_keysets = await keyset_wallet._get_keyset_ids(mint.url)
assert keyset in mint_keysets, "mint does not have this keyset."
# we validate the keyset id by fetching the keys from the mint and computing the id locally
mint_keyset = await keyset_wallet._get_keys_of_keyset(mint.url, keyset)
assert keyset == mint_keyset.id, Exception("keyset not valid.")
# we check the db whether we know this mint already and ask the user if not
mint_keysets = await get_keyset(mint_url=mint.url, db=keyset_wallet.db)
if mint_keysets is None:
# we encountered a new mint and ask for a user confirmation
trust_token_mints = False
print("")
print("Warning: Tokens are from a mint you don't know yet.")
print("\n")
print(f"Mint URL: {mint.url}")
print(f"Mint keyset: {keyset}")
print("\n")
click.confirm(
f"Do you trust this mint and want to receive the tokens?",
abort=True,
default=True,
)
trust_token_mints = True
else:
logger.debug(f"We know keyset {mint_keysets.id} already")
assert trust_token_mints, Exception("Aborted!")
async def redeem_TokenV2_multimint(ctx: Context, token: TokenV2, script, signature):
"""
Helper function to iterate thruogh a token with multiple mints and redeem them from
these mints one keyset at a time.
"""
# we get the mint information in the token and load the keys of each mint
# we then redeem the tokens for each keyset individually
if token.mints is None:
return
proofs_keysets = set([p.id for p in token.proofs])
for mint in token.mints:
for keyset in set([id for id in mint.ids if id in proofs_keysets]):
logger.debug(f"Redeeming tokens from keyset {keyset}")
# init a temporary wallet object
keyset_wallet = Wallet(
mint.url, os.path.join(settings.cashu_dir, ctx.obj["WALLET_NAME"])
)
await keyset_wallet.load_mint()
# redeem proofs of this keyset
redeem_proofs = [p for p in token.proofs if p.id == keyset]
_, _ = await keyset_wallet.redeem(
redeem_proofs, scnd_script=script, scnd_siganture=signature
)
print(f"Received {sum_proofs(redeem_proofs)} sats")
async def redeem_TokenV3_multimint(ctx: Context, token: TokenV3, script, signature):
"""
Helper function to iterate thruogh a token with multiple mints and redeem them from
these mints one keyset at a time.
"""
for t in token.token:
assert t.mint, Exception("Multimint redeem without URL")
mint_wallet = Wallet(
t.mint, os.path.join(settings.cashu_dir, ctx.obj["WALLET_NAME"])
)
await verify_mint(mint_wallet, t.mint)
keysets = mint_wallet._get_proofs_keysets(t.proofs)
logger.debug(f"Keysets in tokens: {keysets}")
# loop over all keysets
for keyset in set(keysets):
await mint_wallet.load_mint()
# redeem proofs of this keyset
redeem_proofs = [p for p in t.proofs if p.id == keyset]
_, _ = await mint_wallet.redeem(
redeem_proofs, scnd_script=script, scnd_siganture=signature
)
print(f"Received {sum_proofs(redeem_proofs)} sats")
async def print_mint_balances(ctx: Context, wallet, show_mints=False):
"""
Helper function that prints the balances for each mint URL that we have tokens from.
"""
# get balances per mint
mint_balances = await wallet.balance_per_minturl()
# if we have a balance on a non-default mint, we show its URL
keysets = [k for k, v in wallet.balance_per_keyset().items()]
for k in keysets:
ks = await get_keyset(id=str(k), db=wallet.db)
if ks and ks.mint_url != ctx.obj["HOST"]:
show_mints = True
# or we have a balance on more than one mint
# show balances per mint
if len(mint_balances) > 1 or show_mints:
print(f"You have balances in {len(mint_balances)} mints:")
print("")
for i, (k, v) in enumerate(mint_balances.items()):
print(
f"Mint {i+1}: Balance: {v['available']} sat (pending: {v['balance']-v['available']} sat) URL: {k}"
)
print("")
async def get_mint_wallet(ctx: Context):
"""
Helper function that asks the user for an input to select which mint they want to load.
Useful for selecting the mint that the user wants to send tokens from.
"""
# we load a dummy wallet so we can check the balance per mint
wallet: Wallet = ctx.obj["WALLET"]
mint_balances = await wallet.balance_per_minturl()
# if we have balances on more than one mint, we ask the user to select one
if len(mint_balances) > 1:
await print_mint_balances(ctx, wallet, show_mints=True)
url_max = max(mint_balances, key=lambda v: mint_balances[v]["available"])
nr_max = list(mint_balances).index(url_max) + 1
mint_nr_str = input(
f"Select mint [1-{len(mint_balances)}] or "
f"press enter for mint with largest balance (Mint {nr_max}): "
)
if not mint_nr_str: # largest balance
mint_url = url_max
elif mint_nr_str.isdigit() and int(mint_nr_str) <= len(
mint_balances
): # specific mint
mint_url = list(mint_balances.keys())[int(mint_nr_str) - 1]
else:
raise Exception("invalid input.")
else:
mint_url = list(mint_balances.keys())[0]
# load this mint_url into a wallet
mint_wallet = Wallet(
mint_url, os.path.join(settings.cashu_dir, ctx.obj["WALLET_NAME"])
)
await mint_wallet.load_mint()
return mint_wallet
async def serialize_TokenV2_to_TokenV3(wallet: Wallet, tokenv2: TokenV2):
"""Helper function for the CLI to receive legacy TokenV2 tokens.
Takes a list of proofs and constructs a *serialized* TokenV3 to be received through
the ordinary path.
Returns:
TokenV3: TokenV3
"""
tokenv3 = TokenV3(token=[TokenV3Token(proofs=tokenv2.proofs)])
if tokenv2.mints:
tokenv3.token[0].mint = tokenv2.mints[0].url
token_serialized = tokenv3.serialize()
return token_serialized
async def serialize_TokenV1_to_TokenV3(wallet: Wallet, tokenv1: TokenV1):
"""Helper function for the CLI to receive legacy TokenV1 tokens.
Takes a list of proofs and constructs a *serialized* TokenV3 to be received through
the ordinary path.
Returns:
TokenV3: TokenV3
"""
tokenv3 = TokenV3(token=[TokenV3Token(proofs=tokenv1.__root__)])
token_serialized = tokenv3.serialize()
return token_serialized

207
cashu/wallet/helpers.py Normal file
View File

@@ -0,0 +1,207 @@
import base64
import json
import os
import click
from loguru import logger
from ..core.base import TokenV1, TokenV2, TokenV3, TokenV3Token
from ..core.helpers import sum_proofs
from ..core.migrations import migrate_databases
from ..core.settings import settings
from ..wallet import migrations
from ..wallet.crud import get_keyset, get_unused_locks
from ..wallet.wallet import Wallet as Wallet
async def init_wallet(wallet: Wallet):
"""Performs migrations and loads proofs from db."""
await migrate_databases(wallet.db, migrations)
await wallet.load_proofs()
async def redeem_TokenV3_multimint(
wallet: Wallet,
token: TokenV3,
script,
signature,
):
"""
Helper function to iterate thruogh a token with multiple mints and redeem them from
these mints one keyset at a time.
"""
for t in token.token:
assert t.mint, Exception(
"redeem_TokenV3_multimint: multimint redeem without URL"
)
mint_wallet = Wallet(t.mint, os.path.join(settings.cashu_dir, wallet.name))
keysets = mint_wallet._get_proofs_keysets(t.proofs)
logger.debug(f"Keysets in tokens: {keysets}")
# loop over all keysets
for keyset in set(keysets):
await mint_wallet.load_mint()
# redeem proofs of this keyset
redeem_proofs = [p for p in t.proofs if p.id == keyset]
_, _ = await mint_wallet.redeem(
redeem_proofs, scnd_script=script, scnd_siganture=signature
)
print(f"Received {sum_proofs(redeem_proofs)} sats")
async def serialize_TokenV2_to_TokenV3(tokenv2: TokenV2):
"""Helper function to receive legacy TokenV2 tokens.
Takes a list of proofs and constructs a *serialized* TokenV3 to be received through
the ordinary path.
Returns:
TokenV3: TokenV3
"""
tokenv3 = TokenV3(token=[TokenV3Token(proofs=tokenv2.proofs)])
if tokenv2.mints:
tokenv3.token[0].mint = tokenv2.mints[0].url
token_serialized = tokenv3.serialize()
return token_serialized
async def serialize_TokenV1_to_TokenV3(tokenv1: TokenV1):
"""Helper function to receive legacy TokenV1 tokens.
Takes a list of proofs and constructs a *serialized* TokenV3 to be received through
the ordinary path.
Returns:
TokenV3: TokenV3
"""
tokenv3 = TokenV3(token=[TokenV3Token(proofs=tokenv1.__root__)])
token_serialized = tokenv3.serialize()
return token_serialized
async def deserialize_token_from_string(token: str) -> TokenV3:
# deserialize token
# ----- backwards compatibility -----
# V2Tokens (0.7-0.11.0) (eyJwcm9...)
if token.startswith("eyJwcm9"):
try:
tokenv2 = TokenV2.parse_obj(json.loads(base64.urlsafe_b64decode(token)))
token = await serialize_TokenV2_to_TokenV3(tokenv2)
except:
pass
# V1Tokens (<0.7) (W3siaWQ...)
if token.startswith("W3siaWQ"):
try:
tokenv1 = TokenV1.parse_obj(json.loads(base64.urlsafe_b64decode(token)))
token = await serialize_TokenV1_to_TokenV3(tokenv1)
except:
pass
# ----- receive token -----
# deserialize token
# dtoken = json.loads(base64.urlsafe_b64decode(token))
tokenObj = TokenV3.deserialize(token)
# tokenObj = TokenV2.parse_obj(dtoken)
assert len(tokenObj.token), Exception("no proofs in token")
assert len(tokenObj.token[0].proofs), Exception("no proofs in token")
return tokenObj
async def receive(
wallet: Wallet,
tokenObj: TokenV3,
lock: str,
):
# await wallet.load_mint()
# check for P2SH locks
if lock:
# load the script and signature of this address from the database
assert len(lock.split("P2SH:")) == 2, Exception(
"lock has wrong format. Expected P2SH:<address>."
)
address_split = lock.split("P2SH:")[1]
p2shscripts = await get_unused_locks(address_split, db=wallet.db)
assert len(p2shscripts) == 1, Exception("lock not found.")
script, signature = p2shscripts[0].script, p2shscripts[0].signature
else:
script, signature = None, None
includes_mint_info: bool = any([t.mint for t in tokenObj.token])
if includes_mint_info:
# redeem tokens with new wallet instances
await redeem_TokenV3_multimint(
wallet,
tokenObj,
script,
signature,
)
else:
# no mint information present, we extract the proofs and use wallet's default mint
proofs = [p for t in tokenObj.token for p in t.proofs]
# first we load the mint URL from the DB
keyset_in_token = proofs[0].id
assert keyset_in_token
# we get the keyset from the db
mint_keysets = await get_keyset(id=keyset_in_token, db=wallet.db)
assert mint_keysets, Exception("we don't know this keyset")
assert mint_keysets.mint_url, Exception("we don't know this mint's URL")
# now we have the URL
mint_wallet = Wallet(
mint_keysets.mint_url,
os.path.join(settings.cashu_dir, wallet.name),
)
await mint_wallet.load_mint(keyset_in_token)
_, _ = await mint_wallet.redeem(proofs, script, signature)
print(f"Received {sum_proofs(proofs)} sats")
# reload main wallet so the balance updates
await wallet.load_proofs()
wallet.status()
return wallet.available_balance
async def send(
wallet: Wallet,
amount: int,
lock: str,
legacy: bool,
):
"""
Prints token to send to stdout.
"""
if lock:
assert len(lock) > 21, Exception(
"Error: lock has to be at least 22 characters long."
)
p2sh = False
if lock and len(lock.split("P2SH:")) == 2:
p2sh = True
await wallet.load_mint()
await wallet.load_proofs()
_, send_proofs = await wallet.split_to_send(
wallet.proofs, amount, lock, set_reserved=True
)
token = await wallet.serialize_proofs(
send_proofs,
include_mints=True,
)
print(token)
if legacy:
print("")
print("Old token format:")
print("")
token = await wallet.serialize_proofs(
send_proofs,
legacy=True,
)
print(token)
wallet.status()
return wallet.available_balance, token

View File

@@ -1,7 +1,7 @@
from ..core.db import Database
async def m000_create_migrations_table(db):
async def m000_create_migrations_table(db: Database):
await db.execute(
"""
CREATE TABLE IF NOT EXISTS dbversions (
@@ -62,7 +62,7 @@ async def m001_initial(db: Database):
)
async def m002_add_proofs_reserved(db):
async def m002_add_proofs_reserved(db: Database):
"""
Column for marking proofs as reserved when they are being sent.
"""
@@ -70,7 +70,7 @@ async def m002_add_proofs_reserved(db):
await db.execute("ALTER TABLE proofs ADD COLUMN reserved BOOL")
async def m003_add_proofs_sendid_and_timestamps(db):
async def m003_add_proofs_sendid_and_timestamps(db: Database):
"""
Column with unique ID for each initiated send attempt
so proofs can be later grouped together for each send attempt.

View File

@@ -3,16 +3,16 @@ import threading
import time
import click
from click import Context
from requests.exceptions import ConnectionError
from ...core.settings import settings
from ...nostr.nostr.client.client import NostrClient
from ...nostr.nostr.event import Event
from ...nostr.nostr.key import PublicKey
from ..crud import get_nostr_last_check_timestamp, set_nostr_last_check_timestamp
from ..wallet import Wallet
from .cli_helpers import get_mint_wallet
from ..core.settings import settings
from ..nostr.nostr.client.client import NostrClient
from ..nostr.nostr.event import Event
from ..nostr.nostr.key import PublicKey
from .cli.cli_helpers import get_mint_wallet
from .crud import get_nostr_last_check_timestamp, set_nostr_last_check_timestamp
from .helpers import receive
from .wallet import Wallet
async def nip5_to_pubkey(wallet: Wallet, address: str):
@@ -43,17 +43,21 @@ async def nip5_to_pubkey(wallet: Wallet, address: str):
return pubkey
async def send_nostr(ctx: Context, amount: int, pubkey: str, verbose: bool, yes: bool):
async def send_nostr(
wallet: Wallet,
amount: int,
pubkey: str,
verbose: bool = False,
yes: bool = True,
):
"""
Sends tokens via nostr.
"""
# load a wallet for the chosen mint
wallet = await get_mint_wallet(ctx)
if "@" in pubkey or "." in pubkey:
# matches user@domain.com and domain.com (which is _@domain.com)
pubkey = await nip5_to_pubkey(wallet, pubkey)
await wallet.load_mint()
await wallet.load_proofs()
_, send_proofs = await wallet.split_to_send(
wallet.proofs, amount, set_reserved=True
@@ -87,12 +91,17 @@ async def send_nostr(ctx: Context, amount: int, pubkey: str, verbose: bool, yes:
print(f"Token sent to {pubkey_to.bech32()}")
await asyncio.sleep(5)
client.close()
return token, pubkey_to.bech32()
async def receive_nostr(ctx: Context, verbose: bool):
async def receive_nostr(
wallet: Wallet,
verbose: bool = False,
):
if settings.nostr_private_key is None:
print(
"Warning: No nostr private key set! You don't have NOSTR_PRIVATE_KEY set in your .env file. I will create a random private key for this session but I will not remember it."
"Warning: No nostr private key set! You don't have NOSTR_PRIVATE_KEY set in your .env file. "
"I will create a random private key for this session but I will not remember it."
)
print("")
client = NostrClient(
@@ -110,14 +119,18 @@ async def receive_nostr(ctx: Context, verbose: bool):
)
try:
# call the receive method
from ...wallet.cli.cli import receive
asyncio.run(receive(ctx, decrypted_content, ""))
asyncio.run(
receive(
wallet,
decrypted_content,
"",
)
)
except Exception as e:
pass
# determine timestamp of last check so we don't scan all historical DMs
wallet: Wallet = ctx.obj["WALLET"]
last_check = await get_nostr_last_check_timestamp(db=wallet.db)
if last_check:
last_check -= 60 * 60 # 1 hour tolerance

View File

@@ -607,6 +607,7 @@ class Wallet(LedgerAPI):
preimage=status.preimage,
paid=True,
time_paid=time.time(),
hash="",
)
await store_lightning_invoice(db=self.db, invoice=invoice_obj)

View File

@@ -52,4 +52,5 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry.scripts]
mint = "cashu.mint.main:main"
cashu = "cashu.wallet.cli.cli:cli"
api = "cashu.wallet.api.main:main"
wallet-test = "tests.test_wallet:test"

View File

@@ -7,6 +7,7 @@ from pathlib import Path
import pytest
import pytest_asyncio
import uvicorn
from fastapi import FastAPI
from uvicorn import Config, Server
from cashu.core.db import Database
@@ -16,16 +17,18 @@ from cashu.lightning.fake import FakeWallet
from cashu.mint import migrations as migrations_mint
from cashu.mint.ledger import Ledger
from cashu.wallet import migrations as migrations_wallet
from cashu.wallet.api.router import router
from cashu.wallet.wallet import Wallet
SERVER_ENDPOINT = "http://localhost:3337"
class UvicornServer(multiprocessing.Process):
def __init__(self, config: Config):
def __init__(self, config: Config, private_key: str = "TEST_PRIVATE_KEY"):
super().__init__()
self.server = Server(config=config)
self.config = config
self.private_key = private_key
def stop(self):
self.terminate()
@@ -35,7 +38,7 @@ class UvicornServer(multiprocessing.Process):
settings.mint_lightning_backend = "FakeWallet"
settings.mint_listen_port = 3337
settings.mint_database = "data/test_mint"
settings.mint_private_key = "TEST_PRIVATE_KEY"
settings.mint_private_key = self.private_key
settings.mint_derivation_path = "0/0/0/0"
dirpath = Path(settings.mint_database)
@@ -86,3 +89,22 @@ async def ledger():
)
await start_mint_init(ledger)
yield ledger
@pytest.fixture(autouse=True, scope="session")
def mint_3338():
settings.mint_listen_port = 3338
settings.port = 3338
settings.mint_url = "http://localhost:3338"
settings.port = settings.mint_listen_port
config = uvicorn.Config(
"cashu.mint.app:app",
port=settings.mint_listen_port,
host="127.0.0.1",
)
server = UvicornServer(config=config, private_key="SECOND_PRIVATE_KEY")
server.start()
time.sleep(1)
yield server
server.stop()

120
tests/test_api.py Normal file
View File

@@ -0,0 +1,120 @@
from fastapi.testclient import TestClient
from cashu.core.settings import settings
from cashu.wallet.api.app import app
def test_invoice(mint):
with TestClient(app) as client:
response = client.post("/invoice?amount=100")
assert response.status_code == 200
if settings.lightning:
assert response.json()["invoice"]
else:
assert response.json()["balance"]
assert response.json()["amount"]
def test_balance():
with TestClient(app) as client:
response = client.get("/balance")
assert response.status_code == 200
assert response.json()["balance"]
assert response.json()["keysets"]
assert response.json()["mints"]
def test_send(mint):
with TestClient(app) as client:
response = client.post("/send?amount=10")
assert response.status_code == 200
assert response.json()["balance"]
def test_pending():
with TestClient(app) as client:
response = client.get("/pending")
assert response.status_code == 200
assert response.json()["0"]
def test_receive_all(mint):
with TestClient(app) as client:
response = client.post("/receive?all=true")
assert response.status_code == 200
assert response.json()["initial_balance"]
assert response.json()["balance"]
def test_burn_all(mint):
with TestClient(app) as client:
response = client.post("/send?amount=20")
assert response.status_code == 200
response = client.post("/burn?all=true")
assert response.status_code == 200
assert response.json()["balance"]
def test_pay():
with TestClient(app) as client:
invoice = (
"lnbc100n1pjzp22cpp58xvjxvagzywky9xz3vurue822aaax"
"735hzc5pj5fg307y58v5znqdq4vdshx6r4ypjx2ur0wd5hgl"
"h6ahauv24wdmac4zk478pmwfzd7sdvm8tje3dmfue3lc2g4l"
"9g40a073h39748uez9p8mxws5vqwjmkqr4wl5l7n4dlhj6z6"
"va963cqvufrs4"
)
response = client.post(f"/pay?invoice={invoice}")
if not settings.lightning:
assert response.status_code == 400
else:
assert response.status_code == 200
def test_lock():
with TestClient(app) as client:
response = client.get("/lock")
assert response.status_code == 200
def test_locks():
with TestClient(app) as client:
response = client.get("/locks")
assert response.status_code == 200
def test_invoices():
with TestClient(app) as client:
response = client.get("/invoices")
assert response.status_code == 200
def test_wallets():
with TestClient(app) as client:
response = client.get("/wallets")
assert response.status_code == 200
def test_info():
with TestClient(app) as client:
response = client.get("/info")
assert response.status_code == 200
assert response.json()["version"]
def test_flow(mint):
with TestClient(app) as client:
if not settings.lightning:
response = client.get("/balance")
initial_balance = response.json()["balance"]
response = client.post("/invoice?amount=100")
assert response.json()["balance"] == initial_balance + 100
response = client.post("/send?amount=50")
assert response.json()["balance"] == initial_balance + 50
response = client.post("/send?amount=50")
assert response.json()["balance"] == initial_balance
response = client.get("/pending")
token = response.json()["0"]["token"]
amount = response.json()["0"]["amount"]
response = client.post(f"/receive?token={token}")
assert response.json()["balance"] == initial_balance + amount