mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 06:04:21 +01:00
tests - unit tests LocatorCache reorg protection
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
import pytest
|
||||
from uuid import uuid4
|
||||
from shutil import rmtree
|
||||
from copy import deepcopy
|
||||
from threading import Thread
|
||||
from coincurve import PrivateKey
|
||||
|
||||
@@ -136,6 +137,54 @@ def test_locator_cache_init(block_processor):
|
||||
assert block_processor.get_block(k)
|
||||
|
||||
|
||||
def test_fix_cache(block_processor):
|
||||
# This tests how a reorg will create a new version of the cache
|
||||
# Let's start setting a full cache. We'll mine ``cache_size`` bocks to be sure it's full
|
||||
generate_blocks_w_delay((config.get("LOCATOR_CACHE_SIZE")))
|
||||
|
||||
locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE"))
|
||||
locator_cache.init(block_processor.get_best_block_hash(), block_processor)
|
||||
assert len(locator_cache.blocks) == locator_cache.cache_size
|
||||
|
||||
# Now let's fake a reorg of less than ``cache_size``. We'll go two blocks into the past.
|
||||
current_tip = block_processor.get_best_block_hash()
|
||||
current_tip_locators = list(locator_cache.blocks[current_tip].keys())
|
||||
current_tip_parent = block_processor.get_block(current_tip).get("previousblockhash")
|
||||
current_tip_parent_locators = list(locator_cache.blocks[current_tip_parent].keys())
|
||||
fake_tip = block_processor.get_block(current_tip_parent).get("previousblockhash")
|
||||
locator_cache.fix_cache(fake_tip, block_processor)
|
||||
|
||||
# The last two blocks are not in the cache nor are the any of its locators
|
||||
assert current_tip not in locator_cache.blocks and current_tip_parent not in locator_cache.blocks
|
||||
for locator in current_tip_parent_locators + current_tip_locators:
|
||||
assert locator not in locator_cache.cache
|
||||
|
||||
# The fake tip is the new tip, and two additional blocks are at the bottom
|
||||
assert fake_tip in locator_cache.blocks and list(locator_cache.blocks.keys())[-1] == fake_tip
|
||||
assert len(locator_cache.blocks) == locator_cache.cache_size
|
||||
|
||||
# Test the same for a full cache reorg. We can simulate this by adding more blocks than the cache can fit and
|
||||
# trigger a fix. We'll use a new cache to compare with the old
|
||||
new_cache = deepcopy(locator_cache)
|
||||
|
||||
generate_blocks_w_delay((config.get("LOCATOR_CACHE_SIZE") * 2))
|
||||
new_cache.fix_cache(block_processor.get_best_block_hash(), block_processor)
|
||||
|
||||
# None of the data from the old cache is in the new cache
|
||||
for block_hash, data in locator_cache.blocks.items():
|
||||
assert block_hash not in new_cache.blocks
|
||||
for locator, txid in data.items():
|
||||
assert locator not in new_cache.cache
|
||||
|
||||
# The data in the new cache corresponds to the last ``cache_size`` blocks.
|
||||
block_count = block_processor.get_block_count()
|
||||
for i in range(block_count, block_count - locator_cache.cache_size, -1):
|
||||
block_hash = bitcoin_cli(bitcoind_connect_params).getblockhash(i - 1)
|
||||
assert block_hash in new_cache.blocks
|
||||
for locator, _ in new_cache.blocks[block_hash].items():
|
||||
assert locator in new_cache.cache
|
||||
|
||||
|
||||
def test_locator_cache_is_full(block_processor):
|
||||
# Empty cache
|
||||
locator_cache = LocatorCache(config.get("LOCATOR_CACHE_SIZE"))
|
||||
|
||||
Reference in New Issue
Block a user