[Wip] [Wallet] NUT-09: Get mint info and add many type annotations (#262)

* get mint info and add many type annotations

* tests

* make format
This commit is contained in:
callebtc
2023-06-25 02:21:40 +02:00
committed by GitHub
parent 339c3fb066
commit 77278127ae
4 changed files with 154 additions and 20 deletions

View File

@@ -620,9 +620,12 @@ async def wallets(ctx):
@cli.command("info", help="Information about Cashu wallet.") @cli.command("info", help="Information about Cashu wallet.")
@click.option(
"--mint", "-m", default=False, is_flag=True, help="Fetch mint information."
)
@click.pass_context @click.pass_context
@coro @coro
async def info(ctx: Context): async def info(ctx: Context, mint: bool):
print(f"Version: {settings.version}") print(f"Version: {settings.version}")
print(f"Wallet: {ctx.obj['WALLET_NAME']}") print(f"Wallet: {ctx.obj['WALLET_NAME']}")
if settings.debug: if settings.debug:
@@ -642,4 +645,25 @@ async def info(ctx: Context):
if settings.socks_host: if settings.socks_host:
print(f"Socks proxy: {settings.socks_host}:{settings.socks_port}") print(f"Socks proxy: {settings.socks_host}:{settings.socks_port}")
print(f"Mint URL: {ctx.obj['HOST']}") print(f"Mint URL: {ctx.obj['HOST']}")
if mint:
wallet: Wallet = ctx.obj["WALLET"]
mint_info: dict = (await wallet._load_mint_info()).dict()
print("")
print("Mint information:")
print("")
if mint_info:
print(f"Mint name: {mint_info['name']}")
if mint_info["description"]:
print(f"Description: {mint_info['description']}")
if mint_info["description_long"]:
print(f"Long description: {mint_info['description_long']}")
if mint_info["contact"]:
print(f"Contact: {mint_info['contact']}")
if mint_info["version"]:
print(f"Version: {mint_info['version']}")
if mint_info["motd"]:
print(f"Message of the day: {mint_info['motd']}")
if mint_info["parameter"]:
print(f"Parameter: {mint_info['parameter']}")
return return

View File

@@ -18,6 +18,7 @@ from ..core.base import (
CheckFeesRequest, CheckFeesRequest,
CheckSpendableRequest, CheckSpendableRequest,
CheckSpendableResponse, CheckSpendableResponse,
GetInfoResponse,
GetMeltResponse, GetMeltResponse,
GetMintResponse, GetMintResponse,
Invoice, Invoice,
@@ -98,6 +99,7 @@ class LedgerAPI:
keys: WalletKeyset # holds current keys of mint keys: WalletKeyset # holds current keys of mint
keyset_id: str # holds id of current keyset keyset_id: str # holds id of current keyset
public_keys: Dict[int, PublicKey] # holds public keys of public_keys: Dict[int, PublicKey] # holds public keys of
mint_info: GetInfoResponse # holds info about mint
tor: TorProxy tor: TorProxy
db: Database db: Database
s: requests.Session s: requests.Session
@@ -113,7 +115,7 @@ class LedgerAPI:
def _construct_proofs( def _construct_proofs(
self, promises: List[BlindedSignature], secrets: List[str], rs: List[PrivateKey] self, promises: List[BlindedSignature], secrets: List[str], rs: List[PrivateKey]
): ) -> List[Proof]:
"""Returns proofs of promise from promises. Wants secrets and blinding factors rs.""" """Returns proofs of promise from promises. Wants secrets and blinding factors rs."""
logger.trace(f"Constructing proofs.") logger.trace(f"Constructing proofs.")
proofs: List[Proof] = [] proofs: List[Proof] = []
@@ -137,7 +139,7 @@ class LedgerAPI:
return proofs return proofs
@staticmethod @staticmethod
def raise_on_error(resp_dict): def raise_on_error(resp_dict) -> None:
if "error" in resp_dict: if "error" in resp_dict:
raise Exception("Mint Error: {}".format(resp_dict["error"])) raise Exception("Mint Error: {}".format(resp_dict["error"]))
@@ -146,7 +148,7 @@ class LedgerAPI:
"""Returns base64 encoded random string.""" """Returns base64 encoded random string."""
return scrts.token_urlsafe(randombits // 8) return scrts.token_urlsafe(randombits // 8)
async def _load_mint_keys(self, keyset_id: str = ""): async def _load_mint_keys(self, keyset_id: str = "") -> WalletKeyset:
assert len( assert len(
self.url self.url
), "Ledger not initialized correctly: mint URL not specified yet. " ), "Ledger not initialized correctly: mint URL not specified yet. "
@@ -177,7 +179,7 @@ class LedgerAPI:
logger.debug(f"Current mint keyset: {self.keys.id}") logger.debug(f"Current mint keyset: {self.keys.id}")
return self.keys return self.keys
async def _load_mint_keysets(self): async def _load_mint_keysets(self) -> List[str]:
# get all active keysets of this mint # get all active keysets of this mint
mint_keysets = [] mint_keysets = []
try: try:
@@ -189,7 +191,13 @@ class LedgerAPI:
logger.debug(f"Mint keysets: {self.keysets}") logger.debug(f"Mint keysets: {self.keysets}")
return self.keysets return self.keysets
async def _load_mint(self, keyset_id: str = ""): async def _load_mint_info(self) -> GetInfoResponse:
"""Loads the mint info from the mint."""
self.mint_info = await self._get_info(self.url)
logger.debug(f"Mint info: {self.mint_info}")
return self.mint_info
async def _load_mint(self, keyset_id: str = "") -> None:
""" """
Loads the public keys of the mint. Either gets the keys for the specified Loads the public keys of the mint. Either gets the keys for the specified
`keyset_id` or gets the keys of the active keyset from the mint. `keyset_id` or gets the keys of the active keyset from the mint.
@@ -197,14 +205,28 @@ class LedgerAPI:
""" """
await self._load_mint_keys(keyset_id) await self._load_mint_keys(keyset_id)
await self._load_mint_keysets() await self._load_mint_keysets()
await self._load_mint_info()
if keyset_id: if keyset_id:
assert keyset_id in self.keysets, f"keyset {keyset_id} not active on mint" assert keyset_id in self.keysets, f"keyset {keyset_id} not active on mint"
@staticmethod @staticmethod
def _construct_outputs(amounts: List[int], secrets: List[str]): def _construct_outputs(
amounts: List[int], secrets: List[str]
) -> Tuple[List[BlindedMessage], List[PrivateKey]]:
"""Takes a list of amounts and secrets and returns outputs. """Takes a list of amounts and secrets and returns outputs.
Outputs are blinded messages `outputs` and blinding factors `rs`""" Outputs are blinded messages `outputs` and blinding factors `rs`
Args:
amounts (List[int]): List of amounts
secrets (List[str]): List of secrets
Returns:
Tuple[List[BlindedMessage], List[PrivateKey]]: Tuple of blinded messages and blinding factors
Raises:
Exception: If len(amounts) != len(secrets)
"""
logger.trace(f"Constructing outputs.") logger.trace(f"Constructing outputs.")
assert len(amounts) == len( assert len(amounts) == len(
secrets secrets
@@ -221,16 +243,31 @@ class LedgerAPI:
logger.trace(f"Constructed {len(outputs)} outputs.") logger.trace(f"Constructed {len(outputs)} outputs.")
return outputs, rs return outputs, rs
async def _check_used_secrets(self, secrets): async def _check_used_secrets(self, secrets) -> None:
"""Checks if any of the secrets have already been used""" """Checks if any of the secrets have already been used
Args:
secrets (List[str]): List of secrets to check
Raises:
Exception: If any of the secrets have already been used
"""
logger.trace("Checking secrets.") logger.trace("Checking secrets.")
for s in secrets: for s in secrets:
if await secret_used(s, db=self.db): if await secret_used(s, db=self.db):
raise Exception(f"secret already used: {s}") raise Exception(f"secret already used: {s}")
logger.trace("Secret check complete.") logger.trace("Secret check complete.")
def generate_secrets(self, secret, n): def generate_secrets(self, secret, n) -> List[str]:
"""`secret` is the base string that will be tweaked n times""" """`secret` is the base string that will be tweaked n times
Args:
secret (str): Base secret
n (int): Number of secrets to generate
Returns:
List[str]: List of secrets
"""
if len(secret.split("P2SH:")) == 2: if len(secret.split("P2SH:")) == 2:
return [f"{secret}:{self._generate_secret()}" for i in range(n)] return [f"{secret}:{self._generate_secret()}" for i in range(n)]
return [f"{i}:{secret}" for i in range(n)] return [f"{i}:{secret}" for i in range(n)]
@@ -240,7 +277,7 @@ class LedgerAPI:
""" """
@async_set_requests @async_set_requests
async def _get_keys(self, url: str): async def _get_keys(self, url: str) -> WalletKeyset:
"""API that gets the current keys of the mint """API that gets the current keys of the mint
Args: Args:
@@ -248,6 +285,9 @@ class LedgerAPI:
Returns: Returns:
WalletKeyset: Current mint keyset WalletKeyset: Current mint keyset
Raises:
Exception: If no keys are received from the mint
""" """
resp = self.s.get( resp = self.s.get(
url + "/keys", url + "/keys",
@@ -263,7 +303,7 @@ class LedgerAPI:
return keyset return keyset
@async_set_requests @async_set_requests
async def _get_keys_of_keyset(self, url: str, keyset_id: str): async def _get_keys_of_keyset(self, url: str, keyset_id: str) -> WalletKeyset:
"""API that gets the keys of a specific keyset from the mint. """API that gets the keys of a specific keyset from the mint.
@@ -273,6 +313,9 @@ class LedgerAPI:
Returns: Returns:
WalletKeyset: Keyset with ID keyset_id WalletKeyset: Keyset with ID keyset_id
Raises:
Exception: If no keys are received from the mint
""" """
keyset_id_urlsafe = keyset_id.replace("+", "-").replace("/", "_") keyset_id_urlsafe = keyset_id.replace("+", "-").replace("/", "_")
resp = self.s.get( resp = self.s.get(
@@ -290,7 +333,7 @@ class LedgerAPI:
return keyset return keyset
@async_set_requests @async_set_requests
async def _get_keyset_ids(self, url: str): async def _get_keyset_ids(self, url: str) -> List[str]:
"""API that gets a list of all active keysets of the mint. """API that gets a list of all active keysets of the mint.
Args: Args:
@@ -298,6 +341,9 @@ class LedgerAPI:
Returns: Returns:
KeysetsResponse (List[str]): List of all active keyset IDs of the mint KeysetsResponse (List[str]): List of all active keyset IDs of the mint
Raises:
Exception: If no keysets are received from the mint
""" """
resp = self.s.get( resp = self.s.get(
url + "/keysets", url + "/keysets",
@@ -309,8 +355,39 @@ class LedgerAPI:
return keysets.keysets return keysets.keysets
@async_set_requests @async_set_requests
async def request_mint(self, amount): async def _get_info(self, url: str) -> GetInfoResponse:
"""Requests a mint from the server and returns Lightning invoice.""" """API that gets the mint info.
Args:
url (str): Mint URL
Returns:
GetInfoResponse: Current mint info
Raises:
Exception: If the mint info request fails
"""
resp = self.s.get(
url + "/info",
)
resp.raise_for_status()
data: dict = resp.json()
mint_info: GetInfoResponse = GetInfoResponse.parse_obj(data)
return mint_info
@async_set_requests
async def request_mint(self, amount) -> Invoice:
"""Requests a mint from the server and returns Lightning invoice.
Args:
amount (int): Amount of tokens to mint
Returns:
Invoice: Lightning invoice
Raises:
Exception: If the mint request fails
"""
logger.trace("Requesting mint: GET /mint") logger.trace("Requesting mint: GET /mint")
resp = self.s.get(self.url + "/mint", params={"amount": amount}) resp = self.s.get(self.url + "/mint", params={"amount": amount})
resp.raise_for_status() resp.raise_for_status()
@@ -320,8 +397,19 @@ class LedgerAPI:
return Invoice(amount=amount, pr=mint_response.pr, hash=mint_response.hash) return Invoice(amount=amount, pr=mint_response.pr, hash=mint_response.hash)
@async_set_requests @async_set_requests
async def mint(self, amounts, hash=None): async def mint(self, amounts, hash=None) -> List[Proof]:
"""Mints new coins and returns a proof of promise.""" """Mints new coins and returns a proof of promise.
Args:
amounts (List[int]): Amounts of tokens to mint
hash (str, optional): Hash of the paid invoice. Defaults to None.
Returns:
list[Proof]: List of proofs.
Raises:
Exception: If the minting fails
"""
secrets = [self._generate_secret() for s in range(len(amounts))] secrets = [self._generate_secret() for s in range(len(amounts))]
await self._check_used_secrets(secrets) await self._check_used_secrets(secrets)
outputs, rs = self._construct_outputs(amounts, secrets) outputs, rs = self._construct_outputs(amounts, secrets)
@@ -348,7 +436,9 @@ class LedgerAPI:
return self._construct_proofs(promises, secrets, rs) return self._construct_proofs(promises, secrets, rs)
@async_set_requests @async_set_requests
async def split(self, proofs, amount, scnd_secret: Optional[str] = None): async def split(
self, proofs, amount, scnd_secret: Optional[str] = None
) -> Tuple[List[Proof], List[Proof]]:
"""Consume proofs and create new promises based on amount split. """Consume proofs and create new promises based on amount split.
If scnd_secret is None, random secrets will be generated for the tokens to keep (frst_outputs) If scnd_secret is None, random secrets will be generated for the tokens to keep (frst_outputs)

View File

@@ -45,6 +45,20 @@ def test_info(cli_prefix):
assert result.exit_code == 0 assert result.exit_code == 0
@pytest.mark.asyncio
def test_info_with_mint(cli_prefix):
runner = CliRunner()
result = runner.invoke(
cli,
[*cli_prefix, "info", "-m"],
)
assert result.exception is None
print("INFO -M")
print(result.output)
assert "Mint name" in result.output
assert result.exit_code == 0
@pytest.mark.asyncio @pytest.mark.asyncio
def test_balance(cli_prefix): def test_balance(cli_prefix):
runner = CliRunner() runner = CliRunner()

View File

@@ -72,6 +72,12 @@ async def test_get_keyset(wallet1: Wallet):
assert len(keys1.public_keys) == len(keys2.public_keys) assert len(keys1.public_keys) == len(keys2.public_keys)
@pytest.mark.asyncio
async def test_get_info(wallet1: Wallet):
info = await wallet1._get_info(wallet1.url)
assert info.name
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_get_nonexistent_keyset(wallet1: Wallet): async def test_get_nonexistent_keyset(wallet1: Wallet):
await assert_err( await assert_err(