mirror of
https://github.com/aljazceru/nutshell.git
synced 2026-02-15 05:34:19 +01:00
introduce basemodel for token
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
from sqlite3 import Row
|
||||
from typing import Any, Dict, List, Union
|
||||
from typing import Any, Dict, List, Union, TypedDict, Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
@@ -212,3 +212,13 @@ class MintKeysets:
|
||||
|
||||
def get_ids(self):
|
||||
return [k for k, _ in self.keysets.items()]
|
||||
|
||||
|
||||
class TokenMintJson(BaseModel):
|
||||
url: str
|
||||
ks: List[str]
|
||||
|
||||
|
||||
class TokenJson(BaseModel):
|
||||
tokens: List[Proof]
|
||||
mints: Optional[Dict[str, TokenMintJson]] = None
|
||||
|
||||
@@ -243,7 +243,6 @@ async def send(ctx, amount: int, lock: str, legacy: bool):
|
||||
)
|
||||
token = await wallet.serialize_proofs(
|
||||
send_proofs,
|
||||
hide_secrets=True if lock and not p2sh else False,
|
||||
include_mints=True,
|
||||
)
|
||||
print(token)
|
||||
@@ -256,7 +255,6 @@ async def send(ctx, amount: int, lock: str, legacy: bool):
|
||||
print("")
|
||||
token = await wallet.serialize_proofs(
|
||||
send_proofs,
|
||||
hide_secrets=True if lock and not p2sh else False,
|
||||
legacy=True,
|
||||
)
|
||||
print(token)
|
||||
@@ -297,7 +295,7 @@ async def receive(ctx, token: str, lock: str):
|
||||
# if there is a `mints` field in the token
|
||||
# we get the mint information in the token and load the keys of each mint
|
||||
# we then redeem the tokens for each keyset individually
|
||||
if "mints" in dtoken:
|
||||
if "mints" in dtoken and dtoken.get("mints") is not None:
|
||||
for mint_id in dtoken.get("mints"):
|
||||
for keyset in set(dtoken["mints"][mint_id]["ks"]):
|
||||
mint_url = dtoken["mints"][mint_id]["url"]
|
||||
@@ -390,16 +388,14 @@ async def pending(ctx):
|
||||
):
|
||||
grouped_proofs = list(value)
|
||||
token = await wallet.serialize_proofs(grouped_proofs)
|
||||
token_hidden_secret = await wallet.serialize_proofs(
|
||||
grouped_proofs, hide_secrets=True
|
||||
)
|
||||
token_hidden_secret = await wallet.serialize_proofs(grouped_proofs)
|
||||
reserved_date = datetime.utcfromtimestamp(
|
||||
int(grouped_proofs[0].time_reserved)
|
||||
).strftime("%Y-%m-%d %H:%M:%S")
|
||||
print(
|
||||
f"#{i} Amount: {sum_proofs(grouped_proofs)} sat Time: {reserved_date} ID: {key}\n"
|
||||
)
|
||||
print(f"With secret: {token}\n\nSecretless: {token_hidden_secret}\n")
|
||||
print(f"{token}\n")
|
||||
print(f"--------------------------\n")
|
||||
wallet.status()
|
||||
|
||||
|
||||
@@ -24,6 +24,8 @@ from cashu.core.base import (
|
||||
Proof,
|
||||
SplitRequest,
|
||||
WalletKeyset,
|
||||
TokenJson,
|
||||
TokenMintJson,
|
||||
)
|
||||
from cashu.core.bolt11 import Invoice as InvoiceBolt11
|
||||
from cashu.core.db import Database
|
||||
@@ -405,6 +407,8 @@ class Wallet(LedgerAPI):
|
||||
async def _get_proofs_per_minturl(self, proofs: List[Proof]):
|
||||
ret = {}
|
||||
for id in set([p.id for p in proofs]):
|
||||
if id is None:
|
||||
continue
|
||||
keyset: WalletKeyset = await get_keyset(id=id, db=self.db)
|
||||
if keyset.mint_url not in ret:
|
||||
ret[keyset.mint_url] = [p for p in proofs if p.id == id]
|
||||
@@ -482,50 +486,49 @@ class Wallet(LedgerAPI):
|
||||
return status["paid"]
|
||||
|
||||
async def serialize_proofs(
|
||||
self, proofs: List[Proof], hide_secrets=False, include_mints=False, legacy=False
|
||||
self, proofs: List[Proof], include_mints=False, legacy=False
|
||||
):
|
||||
"""
|
||||
Produces sharable token with proofs and mint information.
|
||||
"""
|
||||
# serialize the list of proofs, either with secrets included or without secrets
|
||||
if hide_secrets:
|
||||
proofs_serialized = [p.to_dict_no_secret() for p in proofs]
|
||||
else:
|
||||
proofs_serialized = [p.to_dict() for p in proofs]
|
||||
|
||||
if legacy:
|
||||
proofs_serialized = [p.to_dict() for p in proofs]
|
||||
return base64.urlsafe_b64encode(
|
||||
json.dumps(proofs_serialized).encode()
|
||||
).decode()
|
||||
|
||||
token = dict(tokens=proofs_serialized)
|
||||
token = TokenJson(tokens=proofs)
|
||||
# token = dict(tokens=proofs_serialized)
|
||||
|
||||
# add mint information to the token, if requested
|
||||
if include_mints:
|
||||
|
||||
mints = dict()
|
||||
mints: Dict[str, TokenMintJson] = dict()
|
||||
|
||||
# iterate through all proofs and add their keyset to `mints`
|
||||
for proof in proofs:
|
||||
if proof.id:
|
||||
keyset = await get_keyset(id=proof.id, db=self.db)
|
||||
if keyset:
|
||||
if keyset and keyset.mint_url:
|
||||
# TODO: replace this with a mint pubkey
|
||||
placeholder_mint_id = keyset.mint_url
|
||||
if placeholder_mint_id not in mints:
|
||||
id = dict(
|
||||
id = TokenMintJson(
|
||||
url=keyset.mint_url,
|
||||
ks=[keyset.id],
|
||||
)
|
||||
mints[placeholder_mint_id] = id
|
||||
else:
|
||||
if keyset.id not in mints[placeholder_mint_id]["ks"]:
|
||||
mints[placeholder_mint_id]["ks"].append(keyset.id)
|
||||
if mints:
|
||||
token["mints"] = mints
|
||||
if keyset.id not in mints[placeholder_mint_id].ks:
|
||||
mints[placeholder_mint_id].ks.append(keyset.id)
|
||||
if len(mints) > 0:
|
||||
token.mints = mints
|
||||
|
||||
# encode the token as a base64 string
|
||||
token_base64 = base64.urlsafe_b64encode(json.dumps(token).encode()).decode()
|
||||
token_base64 = base64.urlsafe_b64encode(
|
||||
json.dumps(token.dict()).encode()
|
||||
).decode()
|
||||
return token_base64
|
||||
|
||||
async def _get_spendable_proofs(self, proofs: List[Proof]):
|
||||
|
||||
Reference in New Issue
Block a user