From 3b4f5b56a042fe04c7b3e6044b50f1c66e7ca7e8 Mon Sep 17 00:00:00 2001 From: lollerfirst <43107113+lollerfirst@users.noreply.github.com> Date: Mon, 8 Sep 2025 18:20:21 +0200 Subject: [PATCH] fix: subscription re-init bug (#781) * fix subscription re-initialization bug * test for regression issue * format * Update cashu/mint/events/client.py Co-authored-by: callebtc <93376500+callebtc@users.noreply.github.com> --------- Co-authored-by: callebtc <93376500+callebtc@users.noreply.github.com> --- cashu/mint/events/client.py | 10 ++-- tests/wallet/test_wallet_subscription.py | 70 +++++++++++++++++++++++- 2 files changed, 71 insertions(+), 9 deletions(-) diff --git a/cashu/mint/events/client.py b/cashu/mint/events/client.py index 06959fc..b561cdd 100644 --- a/cashu/mint/events/client.py +++ b/cashu/mint/events/client.py @@ -184,13 +184,11 @@ class LedgerEventClientManager: if len(self.subscriptions[kind]) >= self.max_subscriptions: raise ValueError("Max subscriptions reached") - for filter in filters: - if filter not in self.subscriptions: - self.subscriptions[kind][filter] = [] - logger.debug(f"Adding subscription {subId} for filter {filter}") - self.subscriptions[kind][filter].append(subId) + for f in filters: + logger.debug(f"Adding subscription {subId} for filter {f}") + self.subscriptions[kind].setdefault(f, []).append(subId) # Initialize the subscription - asyncio.create_task(self._init_subscription(subId, filter, kind)) + asyncio.create_task(self._init_subscription(subId, f, kind)) def remove_subscription(self, subId: str) -> None: for kind, sub_filters in self.subscriptions.items(): diff --git a/tests/wallet/test_wallet_subscription.py b/tests/wallet/test_wallet_subscription.py index 2c8dd6d..286ce55 100644 --- a/tests/wallet/test_wallet_subscription.py +++ b/tests/wallet/test_wallet_subscription.py @@ -1,16 +1,16 @@ import asyncio +import threading import pytest import pytest_asyncio from cashu.core.base import Method, MintQuoteState, ProofState -from cashu.core.json_rpc.base import JSONRPCNotficationParams +from cashu.core.json_rpc.base import JSONRPCNotficationParams, JSONRPCSubscriptionKinds from cashu.core.nuts.nuts import WEBSOCKETS_NUT from cashu.core.settings import settings from cashu.wallet.wallet import Wallet from tests.conftest import SERVER_ENDPOINT from tests.helpers import ( - is_fake, pay_if_regtest, ) @@ -27,7 +27,6 @@ async def wallet(mint): @pytest.mark.asyncio -@pytest.mark.skipif(is_fake, reason="only regtest") async def test_wallet_subscription_mint(wallet: Wallet): if not wallet.mint_info.supports_nut(WEBSOCKETS_NUT): pytest.skip("No websocket support") @@ -110,3 +109,68 @@ async def test_wallet_subscription_swap(wallet: Wallet): for msg in spent_stack: proof_state = ProofState.parse_obj(msg.payload) assert proof_state.spent + + +@pytest.mark.asyncio +async def test_wallet_subscription_multiple_listeners_receive_updates(wallet: Wallet): + """Regression test: ensure multiple subscriptions for the same quote receive updates. + + We open two websocket subscriptions for the same mint quote and verify that + both listeners receive the initial (unpaid) state and the subsequent paid update. + """ + if not wallet.mint_info.supports_nut(WEBSOCKETS_NUT): + pytest.skip("No websocket support") + + if not wallet.mint_info.supports_websocket_mint_quote( + Method["bolt11"], wallet.unit + ): + pytest.skip("No websocket support for bolt11_mint_quote") + + # Request a quote without auto-subscribing + mint_quote = await wallet.request_mint(123) + + # Manually create a SubscriptionManager and subscribe twice to the same quote + from cashu.wallet.subscriptions import SubscriptionManager + + subs = SubscriptionManager(wallet.url) + threading.Thread(target=subs.connect, name="SubscriptionManager", daemon=True).start() + + stack1: list[JSONRPCNotficationParams] = [] + stack2: list[JSONRPCNotficationParams] = [] + + def cb1(msg: JSONRPCNotficationParams): + stack1.append(msg) + + def cb2(msg: JSONRPCNotficationParams): + stack2.append(msg) + + subs.subscribe( + kind=JSONRPCSubscriptionKinds.BOLT11_MINT_QUOTE, + filters=[mint_quote.quote], + callback=cb1, + ) + subs.subscribe( + kind=JSONRPCSubscriptionKinds.BOLT11_MINT_QUOTE, + filters=[mint_quote.quote], + callback=cb2, + ) + + # Allow time for the initial snapshot to arrive on both subscriptions + await asyncio.sleep(0.5) + + assert len(stack1) >= 1 and len(stack2) >= 1 + assert stack1[0].payload["state"] == MintQuoteState.unpaid.value + assert stack2[0].payload["state"] == MintQuoteState.unpaid.value + + # Pay the invoice and wait for the paid update to be pushed to both listeners + await pay_if_regtest(mint_quote.request) + + wait = (settings.fakewallet_delay_incoming_payment or 1) + 1 + await asyncio.sleep(wait) + + # Verify that both listeners received a paid update + assert any(m.payload["state"] == MintQuoteState.paid.value for m in stack1) + assert any(m.payload["state"] == MintQuoteState.paid.value for m in stack2) + + # Cleanup the websocket + subs.close()