From a49e587d7c8288e55713d16f17a6576489b9a2df Mon Sep 17 00:00:00 2001 From: Sergi Delgado Segura Date: Fri, 12 Jun 2020 12:48:28 +0200 Subject: [PATCH] watcher - adds RWLocks for the LocatorCache Threads should aboit reading the cache when it is being updated/fixed. The latter is specially relevant since during a reorg most of the cache may change. --- requirements.txt | 3 ++- teos/watcher.py | 32 +++++++++++++++++++++----------- test/teos/unit/test_watcher.py | 16 ++++++++-------- 3 files changed, 31 insertions(+), 20 deletions(-) diff --git a/requirements.txt b/requirements.txt index 5ea83ae..20406f5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,5 @@ cryptography==2.8 coincurve pyzbase32 requests -plyvel \ No newline at end of file +plyvel +readerwriterlock \ No newline at end of file diff --git a/teos/watcher.py b/teos/watcher.py index 00d3d81..644bc79 100644 --- a/teos/watcher.py +++ b/teos/watcher.py @@ -1,6 +1,7 @@ from queue import Queue from threading import Thread from collections import OrderedDict +from readerwriterlock import rwlock from common.logger import Logger from common.tools import compute_locator @@ -47,6 +48,7 @@ class LocatorCache: self.cache = dict() self.blocks = OrderedDict() self.cache_size = blocks_in_cache + self.rw_lock = rwlock.RWLockWrite() def init(self, last_known_block, block_processor): """ @@ -87,7 +89,10 @@ class LocatorCache: Returns: :obj:`str` or :obj:`None`: The txid linked to the given locator if found. None otherwise. """ - return self.cache.get(locator) + + with self.rw_lock.gen_rlock(): + locator = self.cache.get(locator) + return locator def update(self, block_hash, locator_txid_map): """ @@ -99,22 +104,26 @@ class LocatorCache: ids. """ - self.cache.update(locator_txid_map) - self.blocks[block_hash] = list(locator_txid_map.keys()) - logger.debug("Block added to cache", block_hash=block_hash) + with self.rw_lock.gen_wlock(): + self.cache.update(locator_txid_map) + self.blocks[block_hash] = list(locator_txid_map.keys()) + logger.debug("Block added to cache", block_hash=block_hash) if self.is_full(): self.remove_oldest_block() def is_full(self): """ Returns whether the cache is full or not """ - return len(self.blocks) > self.cache_size + with self.rw_lock.gen_rlock(): + full = len(self.blocks) > self.cache_size + return full def remove_oldest_block(self): """ Removes the oldest block from the cache """ - block_hash, locators = self.blocks.popitem(last=False) - for locator in locators: - del self.cache[locator] + with self.rw_lock.gen_wlock(): + block_hash, locators = self.blocks.popitem(last=False) + for locator in locators: + del self.cache[locator] logger.debug("Block removed from cache", block_hash=block_hash) @@ -135,15 +144,16 @@ class LocatorCache: for _ in range(tmp_cache.cache_size): target_block = block_processor.get_block(target_block_hash) if target_block: - # Compute the locator:txid par for every transaction in the block and update both the cache and + # Compute the locator:txid pair for every transaction in the block and update both the cache and # the block mapping. locator_txid_map = {compute_locator(txid): txid for txid in target_block.get("tx")} tmp_cache.cache.update(locator_txid_map) tmp_cache.blocks[target_block_hash] = list(locator_txid_map.keys()) target_block_hash = target_block.get("previousblockhash") - self.blocks = OrderedDict(reversed((list(tmp_cache.blocks.items())))) - self.cache = tmp_cache.cache + with self.rw_lock.gen_wlock(): + self.blocks = OrderedDict(reversed((list(tmp_cache.blocks.items())))) + self.cache = tmp_cache.cache class Watcher: diff --git a/test/teos/unit/test_watcher.py b/test/teos/unit/test_watcher.py index d6ef90b..79d7a93 100644 --- a/test/teos/unit/test_watcher.py +++ b/test/teos/unit/test_watcher.py @@ -281,24 +281,24 @@ def test_fix_cache(block_processor): # Test the same for a full cache reorg. We can simulate this by adding more blocks than the cache can fit and # trigger a fix. We'll use a new cache to compare with the old - new_cache = deepcopy(locator_cache) + old_cache_blocks = deepcopy(locator_cache.blocks) generate_blocks_w_delay((config.get("LOCATOR_CACHE_SIZE") * 2)) - new_cache.fix(block_processor.get_best_block_hash(), block_processor) + locator_cache.fix(block_processor.get_best_block_hash(), block_processor) # None of the data from the old cache is in the new cache - for block_hash, locators in locator_cache.blocks.items(): - assert block_hash not in new_cache.blocks + for block_hash, locators in old_cache_blocks.items(): + assert block_hash not in locator_cache.blocks for locator in locators: - assert locator not in new_cache.cache + assert locator not in locator_cache.cache # The data in the new cache corresponds to the last ``cache_size`` blocks. block_count = block_processor.get_block_count() for i in range(block_count, block_count - locator_cache.cache_size, -1): block_hash = bitcoin_cli(bitcoind_connect_params).getblockhash(i - 1) - assert block_hash in new_cache.blocks - for locator in new_cache.blocks[block_hash]: - assert locator in new_cache.cache + assert block_hash in locator_cache.blocks + for locator in locator_cache.blocks[block_hash]: + assert locator in locator_cache.cache def test_watcher_init(watcher):