mirror of
https://github.com/aljazceru/nutshell.git
synced 2025-12-20 10:34:20 +01:00
mint: add seed decrypt (#403)
* mint: add seed decrypt * add mint seed decryoption and migration tool
This commit is contained in:
@@ -9,6 +9,7 @@ from typing import Any, Dict, List, Optional, Union
|
||||
from loguru import logger
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from .crypto.aes import AESCipher
|
||||
from .crypto.keys import (
|
||||
derive_keys,
|
||||
derive_keys_sha256,
|
||||
@@ -693,36 +694,51 @@ class MintKeyset:
|
||||
active: bool
|
||||
unit: Unit
|
||||
derivation_path: str
|
||||
seed: str
|
||||
public_keys: Union[Dict[int, PublicKey], None] = None
|
||||
valid_from: Union[str, None] = None
|
||||
valid_to: Union[str, None] = None
|
||||
first_seen: Union[str, None] = None
|
||||
version: Union[str, None] = None
|
||||
seed: Optional[str] = None
|
||||
encrypted_seed: Optional[str] = None
|
||||
seed_encryption_method: Optional[str] = None
|
||||
public_keys: Optional[Dict[int, PublicKey]] = None
|
||||
valid_from: Optional[str] = None
|
||||
valid_to: Optional[str] = None
|
||||
first_seen: Optional[str] = None
|
||||
version: Optional[str] = None
|
||||
|
||||
duplicate_keyset_id: Optional[str] = None # BACKWARDS COMPATIBILITY < 0.15.0
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
seed: str,
|
||||
derivation_path: str,
|
||||
id="",
|
||||
valid_from=None,
|
||||
valid_to=None,
|
||||
first_seen=None,
|
||||
active=None,
|
||||
seed: Optional[str] = None,
|
||||
encrypted_seed: Optional[str] = None,
|
||||
seed_encryption_method: Optional[str] = None,
|
||||
valid_from: Optional[str] = None,
|
||||
valid_to: Optional[str] = None,
|
||||
first_seen: Optional[str] = None,
|
||||
active: Optional[bool] = None,
|
||||
unit: Optional[str] = None,
|
||||
version: str = "0",
|
||||
version: Optional[str] = None,
|
||||
id: str = "",
|
||||
):
|
||||
self.derivation_path = derivation_path
|
||||
|
||||
if encrypted_seed and not settings.mint_seed_decryption_key:
|
||||
raise Exception("MINT_SEED_DECRYPTION_KEY not set, but seed is encrypted.")
|
||||
if settings.mint_seed_decryption_key and encrypted_seed:
|
||||
self.seed = AESCipher(settings.mint_seed_decryption_key).decrypt(
|
||||
encrypted_seed
|
||||
)
|
||||
else:
|
||||
self.seed = seed
|
||||
|
||||
assert self.seed, "seed not set"
|
||||
|
||||
self.id = id
|
||||
self.valid_from = valid_from
|
||||
self.valid_to = valid_to
|
||||
self.first_seen = first_seen
|
||||
self.active = bool(active) if active is not None else False
|
||||
self.version = version
|
||||
self.version = version or settings.version
|
||||
|
||||
self.version_tuple = tuple(
|
||||
[int(i) for i in self.version.split(".")] if self.version else []
|
||||
@@ -730,7 +746,7 @@ class MintKeyset:
|
||||
|
||||
# infer unit from derivation path
|
||||
if not unit:
|
||||
logger.warning(
|
||||
logger.trace(
|
||||
f"Unit for keyset {self.derivation_path} not set – attempting to parse"
|
||||
" from derivation path"
|
||||
)
|
||||
@@ -738,9 +754,9 @@ class MintKeyset:
|
||||
self.unit = Unit(
|
||||
int(self.derivation_path.split("/")[2].replace("'", ""))
|
||||
)
|
||||
logger.warning(f"Inferred unit: {self.unit.name}")
|
||||
logger.trace(f"Inferred unit: {self.unit.name}")
|
||||
except Exception:
|
||||
logger.warning(
|
||||
logger.trace(
|
||||
"Could not infer unit from derivation path"
|
||||
f" {self.derivation_path} – assuming 'sat'"
|
||||
)
|
||||
@@ -754,7 +770,7 @@ class MintKeyset:
|
||||
|
||||
self.generate_keys()
|
||||
|
||||
logger.debug(f"Keyset id: {self.id} ({self.unit.name})")
|
||||
logger.trace(f"Loaded keyset id: {self.id} ({self.unit.name})")
|
||||
|
||||
@property
|
||||
def public_keys_hex(self) -> Dict[int, str]:
|
||||
@@ -775,14 +791,14 @@ class MintKeyset:
|
||||
self.seed, self.derivation_path
|
||||
)
|
||||
self.public_keys = derive_pubkeys(self.private_keys) # type: ignore
|
||||
logger.warning(
|
||||
logger.trace(
|
||||
f"WARNING: Using weak key derivation for keyset {self.id} (backwards"
|
||||
" compatibility < 0.12)"
|
||||
)
|
||||
self.id = derive_keyset_id_deprecated(self.public_keys) # type: ignore
|
||||
elif self.version_tuple < (0, 15):
|
||||
self.private_keys = derive_keys_sha256(self.seed, self.derivation_path)
|
||||
logger.warning(
|
||||
logger.trace(
|
||||
f"WARNING: Using non-bip32 derivation for keyset {self.id} (backwards"
|
||||
" compatibility < 0.15)"
|
||||
)
|
||||
|
||||
65
cashu/core/crypto/aes.py
Normal file
65
cashu/core/crypto/aes.py
Normal file
@@ -0,0 +1,65 @@
|
||||
import base64
|
||||
from hashlib import sha256
|
||||
|
||||
from Cryptodome import Random
|
||||
from Cryptodome.Cipher import AES
|
||||
|
||||
BLOCK_SIZE = 16
|
||||
|
||||
|
||||
class AESCipher:
|
||||
"""This class is compatible with crypto-js/aes.js
|
||||
|
||||
Encrypt and decrypt in Javascript using:
|
||||
import AES from "crypto-js/aes.js";
|
||||
import Utf8 from "crypto-js/enc-utf8.js";
|
||||
AES.encrypt(decrypted, password).toString()
|
||||
AES.decrypt(encrypted, password).toString(Utf8);
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, key: str, description=""):
|
||||
self.key: str = key
|
||||
self.description = description + " "
|
||||
|
||||
def pad(self, data):
|
||||
length = BLOCK_SIZE - (len(data) % BLOCK_SIZE)
|
||||
return data + (chr(length) * length).encode()
|
||||
|
||||
def unpad(self, data):
|
||||
return data[: -(data[-1] if isinstance(data[-1], int) else ord(data[-1]))]
|
||||
|
||||
def bytes_to_key(self, data, salt, output=48):
|
||||
# extended from https://gist.github.com/gsakkis/4546068
|
||||
assert len(salt) == 8, len(salt)
|
||||
data += salt
|
||||
key = sha256(data).digest()
|
||||
final_key = key
|
||||
while len(final_key) < output:
|
||||
key = sha256(key + data).digest()
|
||||
final_key += key
|
||||
return final_key[:output]
|
||||
|
||||
def decrypt(self, encrypted: str) -> str: # type: ignore
|
||||
"""Decrypts a string using AES-256-CBC."""
|
||||
encrypted = base64.urlsafe_b64decode(encrypted) # type: ignore
|
||||
assert encrypted[0:8] == b"Salted__"
|
||||
salt = encrypted[8:16]
|
||||
key_iv = self.bytes_to_key(self.key.encode(), salt, 32 + 16)
|
||||
key = key_iv[:32]
|
||||
iv = key_iv[32:]
|
||||
aes = AES.new(key, AES.MODE_CBC, iv)
|
||||
try:
|
||||
return self.unpad(aes.decrypt(encrypted[16:])).decode() # type: ignore
|
||||
except UnicodeDecodeError:
|
||||
raise ValueError("Wrong passphrase")
|
||||
|
||||
def encrypt(self, message: bytes) -> str:
|
||||
salt = Random.new().read(8)
|
||||
key_iv = self.bytes_to_key(self.key.encode(), salt, 32 + 16)
|
||||
key = key_iv[:32]
|
||||
iv = key_iv[32:]
|
||||
aes = AES.new(key, AES.MODE_CBC, iv)
|
||||
return base64.urlsafe_b64encode(
|
||||
b"Salted__" + salt + aes.encrypt(self.pad(message))
|
||||
).decode()
|
||||
@@ -17,7 +17,7 @@ def find_env_file():
|
||||
if not os.path.isfile(env_file):
|
||||
env_file = os.path.join(str(Path.home()), ".cashu", ".env")
|
||||
if os.path.isfile(env_file):
|
||||
env.read_env(env_file)
|
||||
env.read_env(env_file, recurse=False, override=True)
|
||||
else:
|
||||
env_file = ""
|
||||
return env_file
|
||||
@@ -49,6 +49,7 @@ class EnvSettings(CashuSettings):
|
||||
|
||||
class MintSettings(CashuSettings):
|
||||
mint_private_key: str = Field(default=None)
|
||||
mint_seed_decryption_key: str = Field(default=None)
|
||||
mint_derivation_path: str = Field(default="m/0'/0'/0'")
|
||||
mint_derivation_path_list: List[str] = Field(default=[])
|
||||
mint_listen_host: str = Field(default="127.0.0.1")
|
||||
|
||||
@@ -518,12 +518,14 @@ class LedgerCrudSqlite(LedgerCrud):
|
||||
await (conn or db).execute( # type: ignore
|
||||
f"""
|
||||
INSERT INTO {table_with_schema(db, 'keysets')}
|
||||
(id, seed, derivation_path, valid_from, valid_to, first_seen, active, version, unit)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
(id, seed, encrypted_seed, seed_encryption_method, derivation_path, valid_from, valid_to, first_seen, active, version, unit)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
keyset.id,
|
||||
keyset.seed,
|
||||
keyset.encrypted_seed,
|
||||
keyset.seed_encryption_method,
|
||||
keyset.derivation_path,
|
||||
keyset.valid_from or timestamp_now(db),
|
||||
keyset.valid_to or timestamp_now(db),
|
||||
|
||||
151
cashu/mint/decrypt.py
Normal file
151
cashu/mint/decrypt.py
Normal file
@@ -0,0 +1,151 @@
|
||||
import click
|
||||
|
||||
try:
|
||||
from ..core.crypto.aes import AESCipher
|
||||
except ImportError:
|
||||
# for the CLI to work
|
||||
from cashu.core.crypto.aes import AESCipher
|
||||
import asyncio
|
||||
from functools import wraps
|
||||
|
||||
from cashu.core.db import Database, table_with_schema
|
||||
from cashu.core.migrations import migrate_databases
|
||||
from cashu.core.settings import settings
|
||||
from cashu.mint import migrations
|
||||
from cashu.mint.crud import LedgerCrudSqlite
|
||||
from cashu.mint.ledger import Ledger
|
||||
|
||||
|
||||
# https://github.com/pallets/click/issues/85#issuecomment-503464628
|
||||
def coro(f):
|
||||
@wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
return asyncio.run(f(*args, **kwargs))
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
@click.group()
|
||||
def cli():
|
||||
"""Ledger Decrypt CLI"""
|
||||
pass
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.option("--message", prompt=True, help="The message to encrypt.")
|
||||
@click.option(
|
||||
"--key",
|
||||
prompt=True,
|
||||
hide_input=True,
|
||||
confirmation_prompt=True,
|
||||
help="The encryption key.",
|
||||
)
|
||||
def encrypt(message, key):
|
||||
"""Encrypt a message."""
|
||||
aes = AESCipher(key)
|
||||
encrypted_message = aes.encrypt(message.encode())
|
||||
click.echo(f"Encrypted message: {encrypted_message}")
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.option("--encrypted", prompt=True, help="The encrypted message to decrypt.")
|
||||
@click.option(
|
||||
"--key",
|
||||
prompt=True,
|
||||
hide_input=True,
|
||||
help="The decryption key.",
|
||||
)
|
||||
def decrypt(encrypted, key):
|
||||
"""Decrypt a message."""
|
||||
aes = AESCipher(key)
|
||||
decrypted_message = aes.decrypt(encrypted)
|
||||
click.echo(f"Decrypted message: {decrypted_message}")
|
||||
|
||||
|
||||
# command to migrate the database to encrypted seeds
|
||||
@cli.command()
|
||||
@coro
|
||||
@click.option("--no-dry-run", is_flag=True, help="Dry run.", default=False)
|
||||
async def migrate(no_dry_run):
|
||||
"""Migrate the database to encrypted seeds."""
|
||||
ledger = Ledger(
|
||||
db=Database("mint", settings.mint_database),
|
||||
seed=settings.mint_private_key,
|
||||
seed_decryption_key=settings.mint_seed_decryption_key,
|
||||
derivation_path=settings.mint_derivation_path,
|
||||
backends={},
|
||||
crud=LedgerCrudSqlite(),
|
||||
)
|
||||
assert settings.mint_seed_decryption_key, "MINT_SEED_DECRYPTION_KEY not set."
|
||||
assert (
|
||||
len(settings.mint_seed_decryption_key) > 12
|
||||
), "MINT_SEED_DECRYPTION_KEY is too short, must be at least 12 characters."
|
||||
click.echo(
|
||||
"Decryption key:"
|
||||
f" {settings.mint_seed_decryption_key[0]}{'*'*10}{settings.mint_seed_decryption_key[-1]}"
|
||||
)
|
||||
|
||||
aes = AESCipher(settings.mint_seed_decryption_key)
|
||||
|
||||
click.echo("Making sure that db is migrated to latest version first.")
|
||||
await migrate_databases(ledger.db, migrations)
|
||||
|
||||
# get all keysets
|
||||
async with ledger.db.connect() as conn:
|
||||
rows = await conn.fetchall(
|
||||
f"SELECT * FROM {table_with_schema(ledger.db, 'keysets')} WHERE seed IS NOT"
|
||||
" NULL"
|
||||
)
|
||||
click.echo(f"Found {len(rows)} keysets in database.")
|
||||
keysets_all = [dict(**row) for row in rows]
|
||||
keysets_migrate = []
|
||||
# encrypt the seeds
|
||||
for keyset_dict in keysets_all:
|
||||
if keyset_dict["seed"] and not keyset_dict["encrypted_seed"]:
|
||||
keyset_dict["encrypted_seed"] = aes.encrypt(keyset_dict["seed"].encode())
|
||||
keyset_dict["seed_encryption_method"] = "aes"
|
||||
keysets_migrate.append(keyset_dict)
|
||||
else:
|
||||
click.echo(f"Skipping keyset {keyset_dict['id']}: already migrated.")
|
||||
|
||||
click.echo(f"There are {len(keysets_migrate)} keysets to migrate.")
|
||||
|
||||
for keyset_dict in keysets_migrate:
|
||||
click.echo(f"Keyset {keyset_dict['id']}")
|
||||
click.echo(f" Encrypted seed: {keyset_dict['encrypted_seed']}")
|
||||
click.echo(f" Encryption method: {keyset_dict['seed_encryption_method']}")
|
||||
decryption_success_str = (
|
||||
"✅"
|
||||
if aes.decrypt(keyset_dict["encrypted_seed"]) == keyset_dict["seed"]
|
||||
else "❌"
|
||||
)
|
||||
click.echo(f" Seed decryption test: {decryption_success_str}")
|
||||
|
||||
if not no_dry_run:
|
||||
click.echo(
|
||||
"This was a dry run. Use --no-dry-run to apply the changes to the database."
|
||||
)
|
||||
if no_dry_run and keysets_migrate:
|
||||
click.confirm(
|
||||
"Are you sure you want to continue? Before you continue, make sure to have"
|
||||
" a backup of your keysets database table.",
|
||||
abort=True,
|
||||
)
|
||||
click.echo("Updating keysets in the database.")
|
||||
async with ledger.db.connect() as conn:
|
||||
for keyset_dict in keysets_migrate:
|
||||
click.echo(f"Updating keyset {keyset_dict['id']}")
|
||||
await conn.execute(
|
||||
f"UPDATE {table_with_schema(ledger.db, 'keysets')} SET seed='',"
|
||||
" encrypted_seed = ?, seed_encryption_method = ? WHERE id = ?",
|
||||
(
|
||||
keyset_dict["encrypted_seed"],
|
||||
keyset_dict["seed_encryption_method"],
|
||||
keyset_dict["id"],
|
||||
),
|
||||
)
|
||||
click.echo("✅ Migration complete.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
cli()
|
||||
@@ -25,6 +25,7 @@ from ..core.base import (
|
||||
Unit,
|
||||
)
|
||||
from ..core.crypto import b_dhke
|
||||
from ..core.crypto.aes import AESCipher
|
||||
from ..core.crypto.keys import (
|
||||
derive_keyset_id,
|
||||
derive_keyset_id_deprecated,
|
||||
@@ -68,10 +69,18 @@ class Ledger(LedgerVerification, LedgerSpendingConditions):
|
||||
db: Database,
|
||||
seed: str,
|
||||
backends: Mapping[Method, Mapping[Unit, LightningBackend]],
|
||||
seed_decryption_key: Optional[str] = None,
|
||||
derivation_path="",
|
||||
crud=LedgerCrudSqlite(),
|
||||
):
|
||||
self.master_key = seed
|
||||
assert seed, "seed not set"
|
||||
|
||||
# decrypt seed if seed_decryption_key is set
|
||||
self.master_key = (
|
||||
AESCipher(seed_decryption_key).decrypt(seed)
|
||||
if seed_decryption_key
|
||||
else seed
|
||||
)
|
||||
self.derivation_path = derivation_path
|
||||
|
||||
self.db = db
|
||||
@@ -101,16 +110,24 @@ class Ledger(LedgerVerification, LedgerSpendingConditions):
|
||||
"""
|
||||
assert derivation_path, "derivation path not set"
|
||||
seed = seed or self.master_key
|
||||
logger.debug(f"Activating keyset for derivation path {derivation_path}")
|
||||
tmp_keyset_local = MintKeyset(
|
||||
seed=seed,
|
||||
derivation_path=derivation_path,
|
||||
version=version or settings.version,
|
||||
)
|
||||
logger.debug(
|
||||
f"Activating keyset for derivation path {derivation_path} with id"
|
||||
f" {tmp_keyset_local.id}."
|
||||
)
|
||||
# load the keyset from db
|
||||
logger.trace(f"crud: loading keyset for {derivation_path}")
|
||||
tmp_keyset_local: List[MintKeyset] = await self.crud.get_keyset(
|
||||
derivation_path=derivation_path, seed=seed, db=self.db
|
||||
tmp_keysets_local: List[MintKeyset] = await self.crud.get_keyset(
|
||||
id=tmp_keyset_local.id, db=self.db
|
||||
)
|
||||
logger.trace(f"crud: loaded {len(tmp_keyset_local)} keysets")
|
||||
if tmp_keyset_local:
|
||||
logger.trace(f"crud: loaded {len(tmp_keysets_local)} keysets")
|
||||
if tmp_keysets_local:
|
||||
# we have a keyset with this derivation path in the database
|
||||
keyset = tmp_keyset_local[0]
|
||||
keyset = tmp_keysets_local[0]
|
||||
else:
|
||||
# no keyset for this derivation path yet
|
||||
# we create a new keyset (keys will be generated at instantiation)
|
||||
@@ -141,7 +158,7 @@ class Ledger(LedgerVerification, LedgerSpendingConditions):
|
||||
|
||||
async def init_keysets(self, autosave=True) -> None:
|
||||
"""Initializes all keysets of the mint from the db. Loads all past keysets from db
|
||||
and generate their keys. Then load the current keyset.
|
||||
and generate their keys. Then activate the current keyset set by self.derivation_path.
|
||||
|
||||
Args:
|
||||
autosave (bool, optional): Whether the current keyset should be saved if it is
|
||||
|
||||
@@ -357,3 +357,53 @@ async def m012_keysets_uniqueness_with_seed(db: Database):
|
||||
f" unit FROM {table_with_schema(db, 'keysets_old')}"
|
||||
)
|
||||
await conn.execute(f"DROP TABLE {table_with_schema(db, 'keysets_old')}")
|
||||
|
||||
|
||||
async def m013_keysets_add_encrypted_seed(db: Database):
|
||||
async with db.connect() as conn:
|
||||
# set keysets table unique constraint to id
|
||||
# copy table keysets to keysets_old, create a new table keysets
|
||||
# with the same columns but with a unique constraint on id
|
||||
# and copy the data from keysets_old to keysets, then drop keysets_old
|
||||
await conn.execute(
|
||||
f"DROP TABLE IF EXISTS {table_with_schema(db, 'keysets_old')}"
|
||||
)
|
||||
await conn.execute(
|
||||
f"CREATE TABLE {table_with_schema(db, 'keysets_old')} AS"
|
||||
f" SELECT * FROM {table_with_schema(db, 'keysets')}"
|
||||
)
|
||||
await conn.execute(f"DROP TABLE {table_with_schema(db, 'keysets')}")
|
||||
await conn.execute(f"""
|
||||
CREATE TABLE IF NOT EXISTS {table_with_schema(db, 'keysets')} (
|
||||
id TEXT NOT NULL,
|
||||
derivation_path TEXT,
|
||||
seed TEXT,
|
||||
valid_from TIMESTAMP,
|
||||
valid_to TIMESTAMP,
|
||||
first_seen TIMESTAMP,
|
||||
active BOOL DEFAULT TRUE,
|
||||
version TEXT,
|
||||
unit TEXT,
|
||||
|
||||
UNIQUE (id)
|
||||
|
||||
);
|
||||
""")
|
||||
await conn.execute(
|
||||
f"INSERT INTO {table_with_schema(db, 'keysets')} (id,"
|
||||
" derivation_path, valid_from, valid_to, first_seen,"
|
||||
" active, version, seed, unit) SELECT id, derivation_path,"
|
||||
" valid_from, valid_to, first_seen, active, version, seed,"
|
||||
f" unit FROM {table_with_schema(db, 'keysets_old')}"
|
||||
)
|
||||
await conn.execute(f"DROP TABLE {table_with_schema(db, 'keysets_old')}")
|
||||
|
||||
# add columns encrypted_seed and seed_encryption_method to keysets
|
||||
await conn.execute(
|
||||
f"ALTER TABLE {table_with_schema(db, 'keysets')} ADD COLUMN encrypted_seed"
|
||||
" TEXT"
|
||||
)
|
||||
await conn.execute(
|
||||
f"ALTER TABLE {table_with_schema(db, 'keysets')} ADD COLUMN"
|
||||
" seed_encryption_method TEXT"
|
||||
)
|
||||
|
||||
@@ -16,6 +16,18 @@ from ..mint.ledger import Ledger
|
||||
|
||||
logger.debug("Enviroment Settings:")
|
||||
for key, value in settings.dict().items():
|
||||
if key in [
|
||||
"mint_private_key",
|
||||
"mint_seed_decryption_key",
|
||||
"nostr_private_key",
|
||||
"mint_lnbits_key",
|
||||
"mint_strike_key",
|
||||
"mint_lnd_rest_macaroon",
|
||||
"mint_lnd_rest_admin_macaroon",
|
||||
"mint_lnd_rest_invoice_macaroon",
|
||||
"mint_corelightning_rest_macaroon",
|
||||
]:
|
||||
value = "********" if value is not None else None
|
||||
logger.debug(f"{key}: {value}")
|
||||
|
||||
wallets_module = importlib.import_module("cashu.lightning")
|
||||
@@ -39,6 +51,7 @@ backends = {
|
||||
ledger = Ledger(
|
||||
db=Database("mint", settings.mint_database),
|
||||
seed=settings.mint_private_key,
|
||||
seed_decryption_key=settings.mint_seed_decryption_key,
|
||||
derivation_path=settings.mint_derivation_path,
|
||||
backends=backends,
|
||||
crud=LedgerCrudSqlite(),
|
||||
|
||||
@@ -38,6 +38,7 @@ settings.mint_database = "./test_data/test_mint"
|
||||
settings.mint_derivation_path = "m/0'/0'/0'"
|
||||
settings.mint_derivation_path_list = []
|
||||
settings.mint_private_key = "TEST_PRIVATE_KEY"
|
||||
settings.mint_seed_decryption_key = ""
|
||||
settings.mint_max_balance = 0
|
||||
|
||||
assert "test" in settings.cashu_dir
|
||||
|
||||
98
tests/test_mint_init.py
Normal file
98
tests/test_mint_init.py
Normal file
@@ -0,0 +1,98 @@
|
||||
from typing import List
|
||||
|
||||
import pytest
|
||||
|
||||
from cashu.core.base import Proof
|
||||
from cashu.core.crypto.aes import AESCipher
|
||||
from cashu.core.db import Database
|
||||
from cashu.core.settings import settings
|
||||
from cashu.mint.crud import LedgerCrudSqlite
|
||||
from cashu.mint.ledger import Ledger
|
||||
|
||||
SEED = "TEST_PRIVATE_KEY"
|
||||
DERIVATION_PATH = "m/0'/0'/0'"
|
||||
DECRYPTON_KEY = "testdecryptionkey"
|
||||
ENCRYPTED_SEED = "U2FsdGVkX1_7UU_-nVBMBWDy_9yDu4KeYb7MH8cJTYQGD4RWl82PALH8j-HKzTrI"
|
||||
|
||||
|
||||
async def assert_err(f, msg):
|
||||
"""Compute f() and expect an error message 'msg'."""
|
||||
try:
|
||||
await f
|
||||
except Exception as exc:
|
||||
assert exc.args[0] == msg, Exception(
|
||||
f"Expected error: {msg}, got: {exc.args[0]}"
|
||||
)
|
||||
|
||||
|
||||
def assert_amt(proofs: List[Proof], expected: int):
|
||||
"""Assert amounts the proofs contain."""
|
||||
assert [p.amount for p in proofs] == expected
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ledger_encrypt():
|
||||
aes = AESCipher(DECRYPTON_KEY)
|
||||
encrypted = aes.encrypt(SEED.encode())
|
||||
assert aes.decrypt(encrypted) == SEED
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ledger_decrypt():
|
||||
aes = AESCipher(DECRYPTON_KEY)
|
||||
assert aes.decrypt(ENCRYPTED_SEED) == SEED
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_decrypt_seed():
|
||||
ledger = Ledger(
|
||||
db=Database("mint", settings.mint_database),
|
||||
seed=SEED,
|
||||
seed_decryption_key=None,
|
||||
derivation_path=DERIVATION_PATH,
|
||||
backends={},
|
||||
crud=LedgerCrudSqlite(),
|
||||
)
|
||||
await ledger.init_keysets()
|
||||
assert ledger.keyset.seed == SEED
|
||||
private_key_1 = (
|
||||
ledger.keysets[list(ledger.keysets.keys())[0]].private_keys[1].serialize()
|
||||
)
|
||||
assert (
|
||||
private_key_1
|
||||
== "8300050453f08e6ead1296bb864e905bd46761beed22b81110fae0751d84604d"
|
||||
)
|
||||
pubkeys = ledger.keysets[list(ledger.keysets.keys())[0]].public_keys
|
||||
assert pubkeys
|
||||
assert (
|
||||
pubkeys[1].serialize().hex()
|
||||
== "02194603ffa36356f4a56b7df9371fc3192472351453ec7398b8da8117e7c3e104"
|
||||
)
|
||||
|
||||
ledger_encrypted = Ledger(
|
||||
db=Database("mint", settings.mint_database),
|
||||
seed=ENCRYPTED_SEED,
|
||||
seed_decryption_key=DECRYPTON_KEY,
|
||||
derivation_path=DERIVATION_PATH,
|
||||
backends={},
|
||||
crud=LedgerCrudSqlite(),
|
||||
)
|
||||
await ledger_encrypted.init_keysets()
|
||||
assert ledger_encrypted.keyset.seed == SEED
|
||||
private_key_1 = (
|
||||
ledger_encrypted.keysets[list(ledger_encrypted.keysets.keys())[0]]
|
||||
.private_keys[1]
|
||||
.serialize()
|
||||
)
|
||||
assert (
|
||||
private_key_1
|
||||
== "8300050453f08e6ead1296bb864e905bd46761beed22b81110fae0751d84604d"
|
||||
)
|
||||
pubkeys_encrypted = ledger_encrypted.keysets[
|
||||
list(ledger_encrypted.keysets.keys())[0]
|
||||
].public_keys
|
||||
assert pubkeys_encrypted
|
||||
assert (
|
||||
pubkeys_encrypted[1].serialize().hex()
|
||||
== "02194603ffa36356f4a56b7df9371fc3192472351453ec7398b8da8117e7c3e104"
|
||||
)
|
||||
@@ -2,9 +2,7 @@ import pytest
|
||||
|
||||
from cashu.core.base import MintKeyset
|
||||
from cashu.core.settings import settings
|
||||
|
||||
SEED = "TEST_PRIVATE_KEY"
|
||||
DERIVATION_PATH = "m/0'/0'/0'"
|
||||
from tests.test_mint_init import DECRYPTON_KEY, DERIVATION_PATH, ENCRYPTED_SEED, SEED
|
||||
|
||||
|
||||
async def assert_err(f, msg):
|
||||
@@ -55,3 +53,21 @@ async def test_keyset_0_11_0():
|
||||
== "026b714529f157d4c3de5a93e3a67618475711889b6434a497ae6ad8ace6682120"
|
||||
)
|
||||
assert keyset.id == "Zkdws9zWxNc4"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keyset_0_15_0_encrypted():
|
||||
settings.mint_seed_decryption_key = DECRYPTON_KEY
|
||||
keyset = MintKeyset(
|
||||
encrypted_seed=ENCRYPTED_SEED,
|
||||
derivation_path=DERIVATION_PATH,
|
||||
version="0.15.0",
|
||||
)
|
||||
assert len(keyset.public_keys_hex) == settings.max_order
|
||||
assert keyset.seed == "TEST_PRIVATE_KEY"
|
||||
assert keyset.derivation_path == "m/0'/0'/0'"
|
||||
assert (
|
||||
keyset.public_keys_hex[1]
|
||||
== "02194603ffa36356f4a56b7df9371fc3192472351453ec7398b8da8117e7c3e104"
|
||||
)
|
||||
assert keyset.id == "009a1f293253e41e"
|
||||
|
||||
Reference in New Issue
Block a user