mirror of
https://github.com/aljazceru/lightning.git
synced 2025-12-19 15:14:23 +01:00
pyln: Use a fair FS lock to throttle node startups
We were getting a couple of starvations, so we need a fair filelock. I also wasn't too happy with the lock as is, so I hand-coded it quickly. Should be correct, but the overall timeout will tell us how well we are doing on CI.
This commit is contained in:
committed by
Rusty Russell
parent
8cc62d76e4
commit
4c3ee04bb7
@@ -198,8 +198,8 @@ def teardown_checks(request):
|
|||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def throttler():
|
def throttler(test_base_dir):
|
||||||
yield Throttler()
|
yield Throttler(test_base_dir)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
|
|||||||
@@ -1,12 +1,13 @@
|
|||||||
from bitcoin.core import COIN # type: ignore
|
from bitcoin.core import COIN # type: ignore
|
||||||
from bitcoin.rpc import RawProxy as BitcoinProxy # type: ignore
|
from bitcoin.rpc import RawProxy as BitcoinProxy # type: ignore
|
||||||
from bitcoin.rpc import JSONRPCError
|
from bitcoin.rpc import JSONRPCError
|
||||||
|
from contextlib import contextmanager
|
||||||
|
from pathlib import Path
|
||||||
from pyln.client import RpcError
|
from pyln.client import RpcError
|
||||||
from pyln.testing.btcproxy import BitcoinRpcProxy
|
from pyln.testing.btcproxy import BitcoinRpcProxy
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
from decimal import Decimal
|
from decimal import Decimal
|
||||||
from ephemeral_port_reserve import reserve # type: ignore
|
from ephemeral_port_reserve import reserve # type: ignore
|
||||||
from filelock import FileLock # type: ignore
|
|
||||||
from pyln.client import LightningRpc
|
from pyln.client import LightningRpc
|
||||||
from pyln.client import Millisatoshi
|
from pyln.client import Millisatoshi
|
||||||
|
|
||||||
@@ -1064,6 +1065,43 @@ class LightningNode(object):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def flock(directory: Path):
|
||||||
|
"""A fair filelock, based on atomic fs operations.
|
||||||
|
"""
|
||||||
|
if not isinstance(directory, Path):
|
||||||
|
directory = Path(directory)
|
||||||
|
d = directory / Path(".locks")
|
||||||
|
os.makedirs(str(d), exist_ok=True)
|
||||||
|
fname = None
|
||||||
|
|
||||||
|
while True:
|
||||||
|
# Try until we find a filename that doesn't exist yet.
|
||||||
|
try:
|
||||||
|
fname = d / Path("lock-{}".format(time.time()))
|
||||||
|
fd = os.open(str(fname), flags=os.O_CREAT | os.O_EXCL)
|
||||||
|
os.close(fd)
|
||||||
|
break
|
||||||
|
except FileExistsError:
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
# So now we have a position in the lock, let's check if we are the
|
||||||
|
# next one to go:
|
||||||
|
while True:
|
||||||
|
files = sorted([f.resolve() for f in d.iterdir() if f.is_file()])
|
||||||
|
# We're queued, so it should at least have us.
|
||||||
|
assert len(files) >= 1
|
||||||
|
if files[0] == fname:
|
||||||
|
break
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
# We can continue
|
||||||
|
yield fname
|
||||||
|
|
||||||
|
# Remove our file, so the next one can go ahead.
|
||||||
|
fname.unlink()
|
||||||
|
|
||||||
|
|
||||||
class Throttler(object):
|
class Throttler(object):
|
||||||
"""Throttles the creation of system-processes to avoid overload.
|
"""Throttles the creation of system-processes to avoid overload.
|
||||||
|
|
||||||
@@ -1082,26 +1120,24 @@ class Throttler(object):
|
|||||||
tests, which could cause more timeouts.
|
tests, which could cause more timeouts.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
def __init__(self, target: float = 75):
|
def __init__(self, directory: str, target: float = 75):
|
||||||
"""If specified we try to stick to a load of target (in percent).
|
"""If specified we try to stick to a load of target (in percent).
|
||||||
"""
|
"""
|
||||||
self.target = target
|
self.target = target
|
||||||
self.lock = FileLock("/tmp/ltest.lock")
|
|
||||||
self.current_load = self.target # Start slow
|
self.current_load = self.target # Start slow
|
||||||
psutil.cpu_percent() # Prime the internal load metric
|
psutil.cpu_percent() # Prime the internal load metric
|
||||||
|
self.directory = directory
|
||||||
|
|
||||||
def wait(self):
|
def wait(self):
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
with self.lock.acquire(poll_intervall=0.250):
|
with flock(self.directory):
|
||||||
# We just got the lock, assume someone else just released it
|
# We just got the lock, assume someone else just released it
|
||||||
self.current_load = 100
|
self.current_load = 100
|
||||||
while self.load() >= self.target:
|
while self.load() >= self.target:
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
delay = time.time() - start_time
|
|
||||||
with open("/tmp/ltest-throttler.csv", "a") as f:
|
|
||||||
f.write("{}, {}, {}, {}\n".format(time.time(), self.load(), self.target, delay))
|
|
||||||
self.current_load = 100 # Back off slightly to avoid triggering right away
|
self.current_load = 100 # Back off slightly to avoid triggering right away
|
||||||
|
print("Throttler delayed startup for {} seconds".format(time.time() - start_time))
|
||||||
|
|
||||||
def load(self):
|
def load(self):
|
||||||
"""An exponential moving average of the load
|
"""An exponential moving average of the load
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
Flask==1.1.*
|
Flask==1.1.*
|
||||||
cheroot==8.5.*
|
cheroot==8.5.*
|
||||||
ephemeral-port-reserve==1.1.1
|
ephemeral-port-reserve==1.1.1
|
||||||
filelock==3.0.*
|
|
||||||
flaky ~= 3.7.0
|
flaky ~= 3.7.0
|
||||||
psutil==5.7.*
|
psutil==5.7.*
|
||||||
psycopg2-binary==2.8.*
|
psycopg2-binary==2.8.*
|
||||||
|
|||||||
Reference in New Issue
Block a user