Files
nutshell/cashu/wallet/secrets.py
callebtc a1802b2d81 Refactor: P2PKSecret (#323)
* port changes from dleq branch

* adjust tests

* fix tag serialization

* refactor p2pk and secrets into a new Protocol class

* clean up code

* refactor p2pk types from base

* test second refund pubkey
2023-09-21 14:58:42 +02:00

199 lines
7.9 KiB
Python

import base64
import hashlib
from typing import List, Optional, Tuple
from bip32 import BIP32
from loguru import logger
from mnemonic import Mnemonic
from ..core import bolt11 as bolt11
from ..core.crypto.secp import PrivateKey
from ..core.db import Database
from ..core.settings import settings
from ..wallet.crud import (
bump_secret_derivation,
get_seed_and_mnemonic,
store_seed_and_mnemonic,
)
from .protocols import SupportsDb, SupportsKeysets
class WalletSecrets(SupportsDb, SupportsKeysets):
keyset_id: str
db: Database
async def _init_private_key(self, from_mnemonic: Optional[str] = None) -> None:
"""Initializes the private key of the wallet from the mnemonic.
There are three ways to initialize the private key:
1. If the database does not contain a seed, and no mnemonic is given, a new seed is generated.
2. If the database does not contain a seed, and a mnemonic is given, the seed is generated from the mnemonic.
3. If the database contains a seed, the seed is loaded from the database.
If the mnemonic was not loaded from the database, the seed and mnemonic are stored in the database.
Args:
from_mnemonic (Optional[str], optional): Mnemonic to use. Defaults to None.
Raises:
ValueError: If the mnemonic is not BIP39 compliant.
"""
ret_db = await get_seed_and_mnemonic(self.db)
mnemo = Mnemonic("english")
if ret_db is None and from_mnemonic is None:
# if there is no seed in the database, generate a new one
mnemonic_str = mnemo.generate()
wallet_command_prefix_str = (
f" --wallet {settings.wallet_name}"
if settings.wallet_name != "wallet"
else ""
)
wallet_name = (
f' for wallet "{settings.wallet_name}"'
if settings.wallet_name != "wallet"
else ""
)
print(
f"Generated a new mnemonic{wallet_name}. To view it, run"
f' "cashu{wallet_command_prefix_str} info --mnemonic".'
)
elif from_mnemonic:
# or use the one provided
mnemonic_str = from_mnemonic.lower().strip()
elif ret_db is not None:
# if there is a seed in the database, use it
_, mnemonic_str = ret_db[0], ret_db[1]
else:
logger.debug("No mnemonic provided")
return
if not mnemo.check(mnemonic_str):
raise ValueError("Invalid mnemonic")
self.seed = mnemo.to_seed(mnemonic_str)
self.mnemonic = mnemonic_str
logger.debug(f"Using seed: {self.seed.hex()}")
logger.debug(f"Using mnemonic: {mnemonic_str}")
# if no mnemonic was in the database, store the new one
if ret_db is None:
await store_seed_and_mnemonic(
self.db, seed=self.seed.hex(), mnemonic=mnemonic_str
)
try:
self.bip32 = BIP32.from_seed(self.seed)
self.private_key = PrivateKey(
self.bip32.get_privkey_from_path("m/129372'/0'/0'/0'")
)
except ValueError:
raise ValueError("Invalid seed")
except Exception as e:
logger.error(e)
async def _generate_secret(self, randombits=128) -> str:
"""Returns base64 encoded deterministic random string.
NOTE: This method should probably retire after `deterministic_secrets`. We are
deriving secrets from a counter but don't store the respective blinding factor.
We won't be able to restore any ecash generated with these secrets.
"""
secret_counter = await bump_secret_derivation(
db=self.db, keyset_id=self.keyset_id
)
logger.trace(f"secret_counter: {secret_counter}")
s, _, _ = await self.generate_determinstic_secret(secret_counter)
# return s.decode("utf-8")
return hashlib.sha256(s).hexdigest()
async def generate_determinstic_secret(
self, counter: int
) -> Tuple[bytes, bytes, str]:
"""
Determinstically generates two secrets (one as the secret message,
one as the blinding factor).
"""
assert self.bip32, "BIP32 not initialized yet."
# integer keyset id modulo max number of bip32 child keys
keyest_id = int.from_bytes(base64.b64decode(self.keyset_id), "big") % (
2**31 - 1
)
logger.trace(f"keyset id: {self.keyset_id} becomes {keyest_id}")
token_derivation_path = f"m/129372'/0'/{keyest_id}'/{counter}'"
# for secret
secret_derivation_path = f"{token_derivation_path}/0"
logger.trace(f"secret derivation path: {secret_derivation_path}")
secret = self.bip32.get_privkey_from_path(secret_derivation_path)
# blinding factor
r_derivation_path = f"{token_derivation_path}/1"
logger.trace(f"r derivation path: {r_derivation_path}")
r = self.bip32.get_privkey_from_path(r_derivation_path)
return secret, r, token_derivation_path
async def generate_n_secrets(
self, n: int = 1, skip_bump: bool = False
) -> Tuple[List[str], List[PrivateKey], List[str]]:
"""Generates n secrets and blinding factors and returns a tuple of secrets,
blinding factors, and derivation paths.
Args:
n (int, optional): Number of secrets to generate. Defaults to 1.
skip_bump (bool, optional): Skip increment of secret counter in the database.
You want to set this to false if you don't know whether the following operation
will succeed or not (like a POST /mint request). Defaults to False.
Returns:
Tuple[List[str], List[PrivateKey], List[str]]: Secrets, blinding factors, derivation paths
"""
secret_counters_start = await bump_secret_derivation(
db=self.db, keyset_id=self.keyset_id, by=n, skip=skip_bump
)
logger.trace(f"secret_counters_start: {secret_counters_start}")
secret_counters = list(range(secret_counters_start, secret_counters_start + n))
logger.trace(
f"Generating secret nr {secret_counters[0]} to {secret_counters[-1]}."
)
secrets_rs_derivationpaths = [
await self.generate_determinstic_secret(s) for s in secret_counters
]
# secrets are supplied as str
secrets = [hashlib.sha256(s[0]).hexdigest() for s in secrets_rs_derivationpaths]
# rs are supplied as PrivateKey
rs = [PrivateKey(privkey=s[1], raw=True) for s in secrets_rs_derivationpaths]
derivation_paths = [s[2] for s in secrets_rs_derivationpaths]
return secrets, rs, derivation_paths
async def generate_secrets_from_to(
self, from_counter: int, to_counter: int
) -> Tuple[List[str], List[PrivateKey], List[str]]:
"""Generates secrets and blinding factors from `from_counter` to `to_counter`
Args:
from_counter (int): Start counter
to_counter (int): End counter
Returns:
Tuple[List[str], List[PrivateKey], List[str]]: Secrets, blinding factors, derivation paths
Raises:
ValueError: If `from_counter` is larger than `to_counter`
"""
assert (
from_counter <= to_counter
), "from_counter must be smaller than to_counter"
secret_counters = [c for c in range(from_counter, to_counter + 1)]
secrets_rs_derivationpaths = [
await self.generate_determinstic_secret(s) for s in secret_counters
]
# secrets are supplied as str
secrets = [hashlib.sha256(s[0]).hexdigest() for s in secrets_rs_derivationpaths]
# rs are supplied as PrivateKey
rs = [PrivateKey(privkey=s[1], raw=True) for s in secrets_rs_derivationpaths]
derivation_paths = [s[2] for s in secrets_rs_derivationpaths]
return secrets, rs, derivation_paths