mirror of
https://github.com/aljazceru/nutshell.git
synced 2026-01-06 10:24:21 +01:00
lightning
This commit is contained in:
3
lightning/__init__.py
Normal file
3
lightning/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from lightning.lnbits import LNbitsWallet
|
||||
|
||||
WALLET = LNbitsWallet()
|
||||
88
lightning/base.py
Normal file
88
lightning/base.py
Normal file
@@ -0,0 +1,88 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import AsyncGenerator, Coroutine, NamedTuple, Optional
|
||||
|
||||
|
||||
class StatusResponse(NamedTuple):
|
||||
error_message: Optional[str]
|
||||
balance_msat: int
|
||||
|
||||
|
||||
class InvoiceResponse(NamedTuple):
|
||||
ok: bool
|
||||
checking_id: Optional[str] = None # payment_hash, rpc_id
|
||||
payment_request: Optional[str] = None
|
||||
error_message: Optional[str] = None
|
||||
|
||||
|
||||
class PaymentResponse(NamedTuple):
|
||||
# when ok is None it means we don't know if this succeeded
|
||||
ok: Optional[bool] = None
|
||||
checking_id: Optional[str] = None # payment_hash, rcp_id
|
||||
fee_msat: Optional[int] = None
|
||||
preimage: Optional[str] = None
|
||||
error_message: Optional[str] = None
|
||||
|
||||
|
||||
class PaymentStatus(NamedTuple):
|
||||
paid: Optional[bool] = None
|
||||
fee_msat: Optional[int] = None
|
||||
preimage: Optional[str] = None
|
||||
|
||||
@property
|
||||
def pending(self) -> bool:
|
||||
return self.paid is not True
|
||||
|
||||
@property
|
||||
def failed(self) -> bool:
|
||||
return self.paid == False
|
||||
|
||||
def __str__(self) -> str:
|
||||
if self.paid == True:
|
||||
return "settled"
|
||||
elif self.paid == False:
|
||||
return "failed"
|
||||
elif self.paid == None:
|
||||
return "still pending"
|
||||
else:
|
||||
return "unknown (should never happen)"
|
||||
|
||||
|
||||
class Wallet(ABC):
|
||||
@abstractmethod
|
||||
def status(self) -> Coroutine[None, None, StatusResponse]:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def create_invoice(
|
||||
self,
|
||||
amount: int,
|
||||
memo: Optional[str] = None,
|
||||
description_hash: Optional[bytes] = None,
|
||||
) -> Coroutine[None, None, InvoiceResponse]:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def pay_invoice(
|
||||
self, bolt11: str, fee_limit_msat: int
|
||||
) -> Coroutine[None, None, PaymentResponse]:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_invoice_status(
|
||||
self, checking_id: str
|
||||
) -> Coroutine[None, None, PaymentStatus]:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_payment_status(
|
||||
self, checking_id: str
|
||||
) -> Coroutine[None, None, PaymentStatus]:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def paid_invoices_stream(self) -> AsyncGenerator[str, None]:
|
||||
pass
|
||||
|
||||
|
||||
class Unsupported(Exception):
|
||||
pass
|
||||
150
lightning/lnbits.py
Normal file
150
lightning/lnbits.py
Normal file
@@ -0,0 +1,150 @@
|
||||
import asyncio
|
||||
import hashlib
|
||||
import json
|
||||
from os import getenv
|
||||
from typing import AsyncGenerator, Dict, Optional
|
||||
|
||||
import requests
|
||||
|
||||
from .base import (
|
||||
InvoiceResponse,
|
||||
PaymentResponse,
|
||||
PaymentStatus,
|
||||
StatusResponse,
|
||||
Wallet,
|
||||
)
|
||||
|
||||
|
||||
class LNbitsWallet(Wallet):
|
||||
"""https://github.com/lnbits/lnbits"""
|
||||
|
||||
def __init__(self):
|
||||
self.endpoint = getenv("LNBITS_ENDPOINT")
|
||||
|
||||
key = getenv("LNBITS_KEY")
|
||||
self.key = {"X-Api-Key": key}
|
||||
self.s = requests.Session()
|
||||
self.s.auth = ("user", "pass")
|
||||
self.s.headers.update({"X-Api-Key": key})
|
||||
|
||||
async def status(self) -> StatusResponse:
|
||||
try:
|
||||
r = self.s.get(url=f"{self.endpoint}/api/v1/wallet", timeout=15)
|
||||
except Exception as exc:
|
||||
return StatusResponse(
|
||||
f"Failed to connect to {self.endpoint} due to: {exc}", 0
|
||||
)
|
||||
|
||||
try:
|
||||
data = r.json()
|
||||
except:
|
||||
return StatusResponse(
|
||||
f"Failed to connect to {self.endpoint}, got: '{r.text[:200]}...'", 0
|
||||
)
|
||||
if "detail" in data:
|
||||
return StatusResponse(f"LNbits error: {data['detail']}", 0)
|
||||
return StatusResponse(None, data["balance"])
|
||||
|
||||
async def create_invoice(
|
||||
self,
|
||||
amount: int,
|
||||
memo: Optional[str] = None,
|
||||
description_hash: Optional[bytes] = None,
|
||||
unhashed_description: Optional[bytes] = None,
|
||||
) -> InvoiceResponse:
|
||||
data: Dict = {"out": False, "amount": amount}
|
||||
if description_hash:
|
||||
data["description_hash"] = description_hash.hex()
|
||||
if unhashed_description:
|
||||
data["unhashed_description"] = unhashed_description.hex()
|
||||
|
||||
data["memo"] = memo or ""
|
||||
try:
|
||||
r = self.s.post(url=f"{self.endpoint}/api/v1/payments", json=data)
|
||||
except:
|
||||
return InvoiceResponse(False, None, None, r.json()["detail"])
|
||||
ok, checking_id, payment_request, error_message = (
|
||||
True,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
|
||||
data = r.json()
|
||||
checking_id, payment_request = data["checking_id"], data["payment_request"]
|
||||
|
||||
return InvoiceResponse(ok, checking_id, payment_request, error_message)
|
||||
|
||||
async def pay_invoice(self, bolt11: str, fee_limit_msat: int) -> PaymentResponse:
|
||||
try:
|
||||
r = self.s.post(
|
||||
url=f"{self.endpoint}/api/v1/payments",
|
||||
json={"out": True, "bolt11": bolt11},
|
||||
timeout=None,
|
||||
)
|
||||
except:
|
||||
error_message = r.json()["detail"]
|
||||
return PaymentResponse(None, None, None, None, error_message)
|
||||
ok, checking_id, fee_msat, preimage, error_message = (
|
||||
True,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
|
||||
data = r.json()
|
||||
checking_id = data["payment_hash"]
|
||||
|
||||
# we do this to get the fee and preimage
|
||||
payment: PaymentStatus = await self.get_payment_status(checking_id)
|
||||
|
||||
return PaymentResponse(ok, checking_id, payment.fee_msat, payment.preimage)
|
||||
|
||||
async def get_invoice_status(self, checking_id: str) -> PaymentStatus:
|
||||
try:
|
||||
|
||||
r = self.s.get(
|
||||
url=f"{self.endpoint}/api/v1/payments/{checking_id}",
|
||||
headers=self.key,
|
||||
)
|
||||
except:
|
||||
return PaymentStatus(None)
|
||||
return PaymentStatus(r.json()["paid"])
|
||||
|
||||
async def get_payment_status(self, checking_id: str) -> PaymentStatus:
|
||||
try:
|
||||
r = self.s.get(
|
||||
url=f"{self.endpoint}/api/v1/payments/{checking_id}", headers=self.key
|
||||
)
|
||||
except:
|
||||
return PaymentStatus(None)
|
||||
data = r.json()
|
||||
if "paid" not in data and "details" not in data:
|
||||
return PaymentStatus(None)
|
||||
|
||||
return PaymentStatus(data["paid"], data["details"]["fee"], data["preimage"])
|
||||
|
||||
async def paid_invoices_stream(self) -> AsyncGenerator[str, None]:
|
||||
url = f"{self.endpoint}/api/v1/payments/sse"
|
||||
|
||||
while True:
|
||||
try:
|
||||
async with requests.stream("GET", url) as r:
|
||||
async for line in r.aiter_lines():
|
||||
if line.startswith("data:"):
|
||||
try:
|
||||
data = json.loads(line[5:])
|
||||
except json.decoder.JSONDecodeError:
|
||||
continue
|
||||
|
||||
if type(data) is not dict:
|
||||
continue
|
||||
|
||||
yield data["payment_hash"] # payment_hash
|
||||
|
||||
except:
|
||||
pass
|
||||
|
||||
print("lost connection to lnbits /payments/sse, retrying in 5 seconds")
|
||||
await asyncio.sleep(5)
|
||||
Reference in New Issue
Block a user