Files
breez-sdk-liquid/lib/core/src/event.rs
yse 6782e8beef feat: add real-time sync service (#629)
* feat(rt-sync): add persistency layer (#555)

* feat(rt-sync): add pull and merge (#556)

* feat(rt-sync): add push logic and run method (#568)

* feat(rt-sync): integrate rt-sync with the sdk

fix: add formatting command to build

feat: add secondary trigger to sync

deps: add tonic tls-webpki-roots

feat: prevent double claiming on status stream handlers

fix: add tx commit to chain swap update (#588)

fix: ensure we pull records before prepare_send

fix: fmt and tests

* fix: set initial pulled state to `Recoverable`

* feat(rt-sync): add `last_derivation_index` to sync service

* Single chain source

* Handle Recoverable state

* fix(rt-sync): chain recovery integration (#590)

Co-authored-by: yse <hydra_yse@proton.me>

* feat(rt-sync): add `pair_fees_json`

* fix(rt-sync): clean already persisted incoming records

* feat: cache wallet info (#591)

* log: add status-stream logging for non-local swaps

* Sync improvements (#598)

* Full sync on new Bitcoin block

* Track the last unconfirmed refund tx

* Trigger synced event on partial sync

* fix: remove `REPLACE` clause from swap insert/update

* fix(rt-sync): update chain swap payer/receiver amount (#604)

* Fix syncing last derivation index

* fix: update bindings and lockfile

* remove logs

* fix(rt-sync): avoid reuse of derivation index update logic (#608)

* Add 5 index buffer to full scan

* Fix storing claim_address (#609)

* Filter incoming MRH txs by swap timestamp

* fix liquid panding timestamp

* persist tx data timestamp

* Update unconfirmed transactions

* feat: add API key (#618)

* Use configured lazy connect channel in sync service

* fix: set lower-case header (#624)

* fix error message

* Store & Sync LNURL info (#617)

* Persist and decrypt LNURL info

* Update Notification Plugin

* Sync payment details

* Update list payments to include chain swaps with only user lockup (#620)

* Update payments query to include chain swaps without txs

* Allow emitting a payment event without tx_id

* fix: bindings

---------

Co-authored-by: Ross Savage <hello@satimoto.com>
Co-authored-by: Ross Savage <551697+dangeross@users.noreply.github.com>
Co-authored-by: Roei Erez <roeierez@gmail.com>
2024-12-24 10:39:43 +01:00

69 lines
1.9 KiB
Rust

use std::collections::HashMap;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{SystemTime, UNIX_EPOCH};
use anyhow::Result;
use log::{debug, info};
use tokio::sync::{broadcast, RwLock};
use crate::model::{EventListener, SdkEvent};
pub(crate) struct EventManager {
listeners: RwLock<HashMap<String, Box<dyn EventListener>>>,
notifier: broadcast::Sender<SdkEvent>,
is_paused: AtomicBool,
}
impl EventManager {
pub fn new() -> Self {
let (notifier, _) = broadcast::channel::<SdkEvent>(100);
Self {
listeners: Default::default(),
notifier,
is_paused: AtomicBool::new(false),
}
}
pub async fn add(&self, listener: Box<dyn EventListener>) -> Result<String> {
let id = format!(
"{:X}",
SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis()
);
(*self.listeners.write().await).insert(id.clone(), listener);
Ok(id)
}
pub async fn remove(&self, id: String) {
(*self.listeners.write().await).remove(&id);
}
pub async fn notify(&self, e: SdkEvent) {
match self.is_paused.load(Ordering::SeqCst) {
true => info!("Event notifications are paused, not emitting event {e:?}"),
false => {
debug!("Emitting event: {e:?}");
let _ = self.notifier.send(e.clone());
for listener in (*self.listeners.read().await).values() {
listener.on_event(e.clone());
}
}
}
}
pub(crate) fn subscribe(&self) -> broadcast::Receiver<SdkEvent> {
self.notifier.subscribe()
}
pub(crate) fn pause_notifications(&self) {
info!("Pausing event notifications");
self.is_paused.store(true, Ordering::SeqCst);
}
pub(crate) fn resume_notifications(&self) {
info!("Resuming event notifications");
self.is_paused.store(false, Ordering::SeqCst);
}
}