mirror of
https://github.com/aljazceru/plugins.git
synced 2025-12-20 06:34:20 +01:00
probe: Poll for probe completion instead of waiting
This just polls pending probes and calls a callback upon completion. Signed-off-by: Christian Decker <decker.christian@gmail.com>
This commit is contained in:
@@ -112,10 +112,16 @@ def probe(request, plugin, node_id=None, **kwargs):
|
|||||||
|
|
||||||
s.commit()
|
s.commit()
|
||||||
plugin.rpc.sendpay(route, p.payment_hash)
|
plugin.rpc.sendpay(route, p.payment_hash)
|
||||||
complete_probe(plugin, request, p.id)
|
plugin.pending_probes.append({
|
||||||
|
'request': request,
|
||||||
|
'probe_id': p.id,
|
||||||
|
'payment_hash': p.payment_hash,
|
||||||
|
'callback': complete_probe,
|
||||||
|
'plugin': plugin,
|
||||||
|
})
|
||||||
|
|
||||||
|
|
||||||
def complete_probe(plugin, request, probe_id):
|
def complete_probe(plugin, request, probe_id, payment_hash):
|
||||||
s = plugin.Session()
|
s = plugin.Session()
|
||||||
p = s.query(Probe).get(probe_id)
|
p = s.query(Probe).get(probe_id)
|
||||||
try:
|
try:
|
||||||
@@ -148,6 +154,19 @@ def complete_probe(plugin, request, probe_id):
|
|||||||
request.set_result(res)
|
request.set_result(res)
|
||||||
|
|
||||||
|
|
||||||
|
def poll_payments(plugin):
|
||||||
|
"""Iterate through all probes and complete the finalized ones.
|
||||||
|
"""
|
||||||
|
for probe in plugin.pending_probes:
|
||||||
|
p = plugin.rpc.listpayments(None, payment_hash=probe['payment_hash'])
|
||||||
|
if p['payments'][0]['status'] == 'pending':
|
||||||
|
continue
|
||||||
|
|
||||||
|
plugin.pending_probes.remove(probe)
|
||||||
|
cb = probe['callback']
|
||||||
|
del probe['callback']
|
||||||
|
cb(**probe)
|
||||||
|
|
||||||
def clear_temporary_exclusion(plugin):
|
def clear_temporary_exclusion(plugin):
|
||||||
timed_out = [k for k, v in temporary_exclusions.items() if v < time()]
|
timed_out = [k for k, v in temporary_exclusions.items() if v < time()]
|
||||||
for k in timed_out:
|
for k in timed_out:
|
||||||
@@ -162,7 +181,8 @@ def schedule(plugin):
|
|||||||
# List of scheduled calls with next runtime, function and interval
|
# List of scheduled calls with next runtime, function and interval
|
||||||
next_runs = [
|
next_runs = [
|
||||||
(time() + 300, clear_temporary_exclusion, 300),
|
(time() + 300, clear_temporary_exclusion, 300),
|
||||||
(time() + plugin.probe_interval, start_probe, plugin.probe_interval)
|
(time() + plugin.probe_interval, start_probe, plugin.probe_interval),
|
||||||
|
(time() + 1, poll_payments, 1),
|
||||||
]
|
]
|
||||||
heapq.heapify(next_runs)
|
heapq.heapify(next_runs)
|
||||||
|
|
||||||
@@ -196,6 +216,9 @@ def init(configuration, options, plugin):
|
|||||||
t.daemon = True
|
t.daemon = True
|
||||||
t.start()
|
t.start()
|
||||||
|
|
||||||
|
# Probes that are still pending and need to be checked against.
|
||||||
|
plugin.pending_probes = []
|
||||||
|
|
||||||
|
|
||||||
plugin.add_option(
|
plugin.add_option(
|
||||||
'probe-interval',
|
'probe-interval',
|
||||||
|
|||||||
Reference in New Issue
Block a user