diff --git a/probe/probe.py b/probe/probe.py index df367ca..c153832 100755 --- a/probe/probe.py +++ b/probe/probe.py @@ -112,10 +112,16 @@ def probe(request, plugin, node_id=None, **kwargs): s.commit() plugin.rpc.sendpay(route, p.payment_hash) - complete_probe(plugin, request, p.id) + plugin.pending_probes.append({ + 'request': request, + 'probe_id': p.id, + 'payment_hash': p.payment_hash, + 'callback': complete_probe, + 'plugin': plugin, + }) -def complete_probe(plugin, request, probe_id): +def complete_probe(plugin, request, probe_id, payment_hash): s = plugin.Session() p = s.query(Probe).get(probe_id) try: @@ -148,6 +154,19 @@ def complete_probe(plugin, request, probe_id): request.set_result(res) +def poll_payments(plugin): + """Iterate through all probes and complete the finalized ones. + """ + for probe in plugin.pending_probes: + p = plugin.rpc.listpayments(None, payment_hash=probe['payment_hash']) + if p['payments'][0]['status'] == 'pending': + continue + + plugin.pending_probes.remove(probe) + cb = probe['callback'] + del probe['callback'] + cb(**probe) + def clear_temporary_exclusion(plugin): timed_out = [k for k, v in temporary_exclusions.items() if v < time()] for k in timed_out: @@ -162,7 +181,8 @@ def schedule(plugin): # List of scheduled calls with next runtime, function and interval next_runs = [ (time() + 300, clear_temporary_exclusion, 300), - (time() + plugin.probe_interval, start_probe, plugin.probe_interval) + (time() + plugin.probe_interval, start_probe, plugin.probe_interval), + (time() + 1, poll_payments, 1), ] heapq.heapify(next_runs) @@ -196,6 +216,9 @@ def init(configuration, options, plugin): t.daemon = True t.start() + # Probes that are still pending and need to be checked against. + plugin.pending_probes = [] + plugin.add_option( 'probe-interval',