mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 22:24:23 +01:00
Multiple simulator improvements
The simulator has been updated to work with real transaction structures instead of transaction hashes. It now supports: - Non-SegWit transaction format - Generation of blocks event-wise and time-wise Some small issues have also been fixed. With the new approach, the simulator can be used in a broader range of tests. Moreover tests can run faster since they do not have to wait for blocks. Instead, the generation of new blocks can be triggered by the test.
This commit is contained in:
@@ -1,107 +1,146 @@
|
||||
import re
|
||||
import os
|
||||
import pytest
|
||||
from time import sleep
|
||||
from threading import Thread
|
||||
|
||||
from test.simulator.transaction import TX
|
||||
from test.simulator.bitcoind_sim import run_simulator
|
||||
from pisa.utils.auth_proxy import AuthServiceProxy, JSONRPCException
|
||||
from pisa.conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT
|
||||
from pisa.tools import check_txid_format
|
||||
|
||||
MIXED_VALUES = values = [-1, 500, '', '111', [], 1.1, None, '', "a" * 31, "b" * 33, os.urandom(32).hex()]
|
||||
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def run_bitcoind():
|
||||
bitcoind_thread = Thread(target=run_simulator, kwargs={"mode": "event"})
|
||||
bitcoind_thread.daemon = True
|
||||
bitcoind_thread.start()
|
||||
|
||||
# It takes a little bit of time to start the API (otherwise the requests are sent too early and they fail)
|
||||
sleep(0.1)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def genesis_block_hash(run_bitcoind):
|
||||
return bitcoin_cli.getblockhash(0)
|
||||
|
||||
|
||||
def check_hash_format(txid):
|
||||
# TODO: #12-check-txid-regexp
|
||||
return isinstance(txid, str) and re.search(r'^[0-9A-Fa-f]{64}$', txid) is not None
|
||||
|
||||
|
||||
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT))
|
||||
|
||||
# Help should always return 0
|
||||
assert(bitcoin_cli.help() == 0)
|
||||
|
||||
# getblockhash should return a blockid (which matches the txid format)
|
||||
block_hash = bitcoin_cli.getblockhash(0)
|
||||
assert(check_txid_format(block_hash))
|
||||
def test_help(run_bitcoind):
|
||||
# Help should always return 0
|
||||
assert(bitcoin_cli.help() == 0)
|
||||
|
||||
# Check that the values are within range and of the proper format (all should fail)
|
||||
values = [-1, 500, None, '', '111', [], 1.1]
|
||||
print("getblockhash fails ({}):".format(len(values)))
|
||||
|
||||
for v in values:
|
||||
# FIXME: Better assert for the exceptions would be nice (check the returned errno is the expected one)
|
||||
|
||||
def test_getblockhash(genesis_block_hash):
|
||||
# First block
|
||||
assert(check_hash_format(genesis_block_hash))
|
||||
|
||||
# Check that the values are within range and of the proper format (all should fail)
|
||||
for v in MIXED_VALUES:
|
||||
try:
|
||||
bitcoin_cli.getblockhash(v)
|
||||
assert False
|
||||
except JSONRPCException as e:
|
||||
assert True
|
||||
|
||||
|
||||
def test_get_block(genesis_block_hash):
|
||||
# getblock should return a list of transactions and the height
|
||||
block = bitcoin_cli.getblock(genesis_block_hash)
|
||||
assert(isinstance(block.get('tx'), list))
|
||||
assert(len(block.get('tx')) != 0)
|
||||
assert(isinstance(block.get('height'), int))
|
||||
|
||||
# It should fail for wrong data formats and random ids
|
||||
for v in MIXED_VALUES:
|
||||
try:
|
||||
bitcoin_cli.getblock(v)
|
||||
assert False
|
||||
except JSONRPCException as e:
|
||||
assert True
|
||||
|
||||
|
||||
def test_decoderawtransaction(genesis_block_hash):
|
||||
# decoderawtransaction should only return if the given transaction matches a txid format
|
||||
block = bitcoin_cli.getblock(genesis_block_hash)
|
||||
coinbase_txid = block.get('tx')[0]
|
||||
|
||||
coinbase_tx = bitcoin_cli.getrawtransaction(coinbase_txid).get("hex")
|
||||
tx = bitcoin_cli.decoderawtransaction(coinbase_tx)
|
||||
|
||||
assert(isinstance(tx, dict))
|
||||
assert(isinstance(tx.get('txid'), str))
|
||||
assert(check_hash_format(tx.get('txid')))
|
||||
|
||||
# Therefore should also work for a random transaction hex in our simulation
|
||||
random_tx = TX.create_dummy_transaction()
|
||||
tx = bitcoin_cli.decoderawtransaction(random_tx)
|
||||
assert(isinstance(tx, dict))
|
||||
assert(isinstance(tx.get('txid'), str))
|
||||
assert(check_hash_format(tx.get('txid')))
|
||||
|
||||
# But it should fail for not proper formatted one
|
||||
for v in MIXED_VALUES:
|
||||
try:
|
||||
bitcoin_cli.decoderawtransaction(v)
|
||||
assert False
|
||||
except JSONRPCException as e:
|
||||
assert True
|
||||
|
||||
|
||||
def test_sendrawtransaction(genesis_block_hash):
|
||||
# sendrawtransaction should only allow txids that the simulator has not mined yet
|
||||
bitcoin_cli.sendrawtransaction(TX.create_dummy_transaction())
|
||||
|
||||
# Any data not matching the txid format or that matches with an already mined transaction should fail
|
||||
try:
|
||||
block_hash = bitcoin_cli.getblockhash(v)
|
||||
genesis_tx = bitcoin_cli.getblock(genesis_block_hash).get("tx")[0]
|
||||
bitcoin_cli.sendrawtransaction(genesis_tx)
|
||||
assert False
|
||||
|
||||
except JSONRPCException as e:
|
||||
print('\t{}'.format(e))
|
||||
assert True
|
||||
|
||||
# getblock should return a list of transactions and the height
|
||||
block = bitcoin_cli.getblock(block_hash)
|
||||
assert(isinstance(block.get('tx'), list))
|
||||
assert(len(block.get('tx')) != 0)
|
||||
assert(isinstance(block.get('height'), int))
|
||||
for v in MIXED_VALUES:
|
||||
try:
|
||||
bitcoin_cli.sendrawtransaction(v)
|
||||
assert False
|
||||
except JSONRPCException as e:
|
||||
assert True
|
||||
|
||||
# Some fails
|
||||
values += ["a"*64, os.urandom(32).hex()]
|
||||
print("\ngetblock fails ({}):".format(len(values)))
|
||||
|
||||
for v in values:
|
||||
try:
|
||||
block = bitcoin_cli.getblock(v)
|
||||
assert False
|
||||
except JSONRPCException as e:
|
||||
print('\t{}'.format(e))
|
||||
def test_getrawtransaction(genesis_block_hash):
|
||||
# getrawtransaction should work for existing transactions, and fail for non-existing ones
|
||||
genesis_tx = bitcoin_cli.getblock(genesis_block_hash).get("tx")[0]
|
||||
tx = bitcoin_cli.getrawtransaction(genesis_tx)
|
||||
|
||||
# decoderawtransaction should only return if the given transaction matches a txid format
|
||||
coinbase_tx = block.get('tx')[0]
|
||||
tx = bitcoin_cli.decoderawtransaction(coinbase_tx)
|
||||
assert(isinstance(tx, dict))
|
||||
assert(isinstance(tx.get('txid'), str))
|
||||
assert(check_txid_format(tx.get('txid')))
|
||||
assert(isinstance(tx, dict))
|
||||
assert(isinstance(tx.get('confirmations'), int))
|
||||
|
||||
# Therefore should also work for a random formatted 32-byte hex in our simulation
|
||||
random_tx = os.urandom(32).hex()
|
||||
tx = bitcoin_cli.decoderawtransaction(random_tx)
|
||||
assert(isinstance(tx, dict))
|
||||
assert(isinstance(tx.get('txid'), str))
|
||||
assert(check_txid_format(tx.get('txid')))
|
||||
for v in MIXED_VALUES:
|
||||
try:
|
||||
bitcoin_cli.getrawtransaction(v)
|
||||
assert False
|
||||
except JSONRPCException as e:
|
||||
assert True
|
||||
|
||||
# But it should fail for not proper formatted one
|
||||
values = [1, None, '', "a"*63, "b"*65, [], os.urandom(31).hex()]
|
||||
print("\ndecoderawtransaction fails ({}):".format(len(values)))
|
||||
|
||||
for v in values:
|
||||
try:
|
||||
block = bitcoin_cli.decoderawtransaction(v)
|
||||
assert False
|
||||
except JSONRPCException as e:
|
||||
print('\t{}'.format(e))
|
||||
|
||||
# sendrawtransaction should only allow txids that the simulator has not mined yet
|
||||
bitcoin_cli.sendrawtransaction(os.urandom(32).hex())
|
||||
|
||||
# Any data not matching the txid format or that matches with an already mined transaction should fail
|
||||
values += [coinbase_tx]
|
||||
|
||||
print("\nsendrawtransaction fails ({}):".format(len(values)))
|
||||
|
||||
for v in values:
|
||||
try:
|
||||
block = bitcoin_cli.sendrawtransaction(v)
|
||||
assert False
|
||||
except JSONRPCException as e:
|
||||
print('\t{}'.format(e))
|
||||
|
||||
# getrawtransaction should work for existing transactions, and fail for non-existing ones
|
||||
tx = bitcoin_cli.getrawtransaction(coinbase_tx)
|
||||
|
||||
assert(isinstance(tx, dict))
|
||||
assert(isinstance(tx.get('confirmations'), int))
|
||||
|
||||
print("\nsendrawtransaction fails ({}):".format(len(values)))
|
||||
|
||||
for v in values:
|
||||
try:
|
||||
block = bitcoin_cli.sendrawtransaction(v)
|
||||
assert False
|
||||
except JSONRPCException as e:
|
||||
print('\t{}'.format(e))
|
||||
|
||||
# getblockcount should always return a positive integer
|
||||
bc = bitcoin_cli.getblockcount()
|
||||
assert (isinstance(bc, int))
|
||||
assert (bc >= 0)
|
||||
|
||||
print("\nAll tests passed!")
|
||||
def test_getblockcount():
|
||||
# getblockcount should always return a positive integer
|
||||
bc = bitcoin_cli.getblockcount()
|
||||
assert (isinstance(bc, int))
|
||||
assert (bc >= 0)
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -1,29 +1,36 @@
|
||||
import re
|
||||
import os
|
||||
import time
|
||||
import json
|
||||
import logging
|
||||
import binascii
|
||||
from threading import Thread
|
||||
from threading import Thread, Event
|
||||
from flask import Flask, request, Response, abort
|
||||
|
||||
from pisa.rpc_errors import *
|
||||
from test2.simulator.utils import sha256d
|
||||
from pisa.tools import check_txid_format
|
||||
from test2.simulator.transaction import TX
|
||||
from test2.simulator.zmq_publisher import ZMQPublisher
|
||||
from test.simulator.utils import sha256d
|
||||
from test.simulator.transaction import TX
|
||||
from test.simulator.zmq_publisher import ZMQPublisher
|
||||
from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT
|
||||
|
||||
app = Flask(__name__)
|
||||
HOST = 'localhost'
|
||||
PORT = '18443'
|
||||
|
||||
TIME_BETWEEN_BLOCKS = 5
|
||||
|
||||
mempool = []
|
||||
mined_transactions = {}
|
||||
blocks = {}
|
||||
blockchain = []
|
||||
blocks = {}
|
||||
mined_transactions = {}
|
||||
mempool = []
|
||||
|
||||
mine_new_block = Event()
|
||||
|
||||
|
||||
@app.route('/generate', methods=['POST'])
|
||||
def generate():
|
||||
global mine_new_block
|
||||
|
||||
mine_new_block.set()
|
||||
|
||||
return Response(status=200, mimetype='application/json')
|
||||
|
||||
|
||||
@app.route('/', methods=['POST'])
|
||||
@@ -72,9 +79,11 @@ def process_request():
|
||||
if method == "decoderawtransaction":
|
||||
rawtx = get_param(request_data)
|
||||
|
||||
if isinstance(rawtx, str):
|
||||
if isinstance(rawtx, str) and len(rawtx) % 2 is 0:
|
||||
txid = sha256d(rawtx)
|
||||
|
||||
if TX.deserialize(rawtx) is not None:
|
||||
response["result"] = {"txid": rawtx}
|
||||
response["result"] = {"txid": txid}
|
||||
|
||||
else:
|
||||
response["error"] = {"code": RPC_DESERIALIZATION_ERROR, "message": "TX decode failed"}
|
||||
@@ -87,10 +96,13 @@ def process_request():
|
||||
# TODO: A way of rejecting transactions should be added to test edge cases.
|
||||
rawtx = get_param(request_data)
|
||||
|
||||
if isinstance(rawtx, str):
|
||||
if isinstance(rawtx, str) and len(rawtx) % 2 is 0:
|
||||
txid = sha256d(rawtx)
|
||||
|
||||
if TX.deserialize(rawtx) is not None:
|
||||
if rawtx not in list(mined_transactions.keys()):
|
||||
if txid not in list(mined_transactions.keys()):
|
||||
mempool.append(rawtx)
|
||||
response["result"] = {"txid": txid}
|
||||
|
||||
else:
|
||||
response["error"] = {"code": RPC_VERIFY_ALREADY_IN_CHAIN,
|
||||
@@ -107,10 +119,10 @@ def process_request():
|
||||
txid = get_param(request_data)
|
||||
|
||||
if isinstance(txid, str):
|
||||
block = blocks.get(mined_transactions.get(txid))
|
||||
|
||||
if block:
|
||||
response["result"] = {"confirmations": len(blockchain) - block.get('height')}
|
||||
if txid in mined_transactions:
|
||||
block = blocks.get(mined_transactions[txid]["block"])
|
||||
rawtx = mined_transactions[txid].get('tx')
|
||||
response["result"] = {"hex": rawtx, "confirmations": len(blockchain) - block.get('height')}
|
||||
|
||||
elif txid in mempool:
|
||||
response["result"] = {"confirmations": 0}
|
||||
@@ -123,8 +135,6 @@ def process_request():
|
||||
response["error"] = no_param_err
|
||||
response["error"]["message"] = response["error"]["message"].format("string")
|
||||
|
||||
print(response)
|
||||
|
||||
elif method == "getblockcount":
|
||||
response["result"] = len(blockchain)
|
||||
|
||||
@@ -185,43 +195,19 @@ def load_data():
|
||||
pass
|
||||
|
||||
|
||||
def create_dummy_transaction(prev_tx_id=None, prev_out_index=None):
|
||||
tx = TX()
|
||||
|
||||
if prev_tx_id is None:
|
||||
prev_tx_id = os.urandom(32).hex()
|
||||
|
||||
if prev_out_index is None:
|
||||
prev_out_index = 0
|
||||
|
||||
tx.version = 1
|
||||
tx.inputs = 1
|
||||
tx.outputs = 1
|
||||
tx.prev_tx_id = [prev_tx_id]
|
||||
tx.prev_out_index = [prev_out_index]
|
||||
tx.nLockTime = 0
|
||||
tx.scriptSig = ['47304402204e45e16932b8af514961a1d3a1a25fdf3f4f7732e9d624c6c61548ab5fb8cd410220181522ec8eca07de4860'
|
||||
'a4acdd12909d831cc56cbbac4622082221a8768d1d0901']
|
||||
tx.scriptSig_len = [77]
|
||||
tx.nSequence = [4294967295]
|
||||
tx.value = [5000000000]
|
||||
tx.scriptPubKey = ['4104ae1a62fe09c5f51b13905f07f06b99a2f7159b2225f374cd378d71302fa28414e7aab37397f554a7df5f142c21c'
|
||||
'1b7303b8a0626f1baded5c72a704f7e6cd84cac']
|
||||
tx.scriptPubKey_len = [67]
|
||||
|
||||
return tx.serialize()
|
||||
|
||||
|
||||
def simulate_mining():
|
||||
global mempool, mined_transactions, blocks, blockchain
|
||||
def simulate_mining(mode, time_between_blocks):
|
||||
global mempool, mined_transactions, blocks, blockchain, mine_new_block
|
||||
prev_block_hash = None
|
||||
|
||||
mining_simulator = ZMQPublisher(topic=b'hashblock', feed_protocol=FEED_PROTOCOL, feed_addr=FEED_ADDR,
|
||||
feed_port=FEED_PORT)
|
||||
|
||||
while True:
|
||||
# Set the mining event to initialize the blockchain with a block
|
||||
mine_new_block.set()
|
||||
|
||||
while mine_new_block.wait():
|
||||
block_hash = os.urandom(32).hex()
|
||||
coinbase_tx = create_dummy_transaction()
|
||||
coinbase_tx = TX.create_dummy_transaction()
|
||||
coinbase_tx_hash = sha256d(coinbase_tx)
|
||||
|
||||
txs_to_mine = dict({coinbase_tx_hash: coinbase_tx})
|
||||
@@ -248,11 +234,18 @@ def simulate_mining():
|
||||
print("New block mined: {}".format(block_hash))
|
||||
print("\tTransactions: {}".format(list(txs_to_mine.keys())))
|
||||
|
||||
time.sleep(TIME_BETWEEN_BLOCKS)
|
||||
if mode == 'time':
|
||||
time.sleep(time_between_blocks)
|
||||
|
||||
else:
|
||||
mine_new_block.clear()
|
||||
|
||||
|
||||
def run_simulator():
|
||||
mining_thread = Thread(target=simulate_mining)
|
||||
def run_simulator(mode='time', time_between_blocks=5):
|
||||
if mode not in ["time", 'event']:
|
||||
raise ValueError("Node must be time or event")
|
||||
|
||||
mining_thread = Thread(target=simulate_mining, args=[mode, time_between_blocks])
|
||||
mining_thread.start()
|
||||
|
||||
# Setting Flask log to ERROR only so it does not mess with out logging
|
||||
|
||||
150
test/simulator/transaction.py
Normal file
150
test/simulator/transaction.py
Normal file
@@ -0,0 +1,150 @@
|
||||
# Porting some functionality from https://github.com/sr-gi/bitcoin_tools with some modifications <3
|
||||
from os import urandom
|
||||
|
||||
from test.simulator.utils import *
|
||||
|
||||
|
||||
class TX:
|
||||
""" Defines a class TX (transaction) that holds all the modifiable fields of a Bitcoin transaction, such as
|
||||
version, number of inputs, reference to previous transactions, input and output scripts, value, etc.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.version = None
|
||||
self.inputs = None
|
||||
self.outputs = None
|
||||
self.nLockTime = None
|
||||
self.prev_tx_id = []
|
||||
self.prev_out_index = []
|
||||
self.scriptSig = []
|
||||
self.scriptSig_len = []
|
||||
self.nSequence = []
|
||||
self.value = []
|
||||
self.scriptPubKey = []
|
||||
self.scriptPubKey_len = []
|
||||
|
||||
self.offset = 0
|
||||
self.hex = ""
|
||||
|
||||
@classmethod
|
||||
def deserialize(cls, hex_tx):
|
||||
""" Builds a transaction object from the hexadecimal serialization format of a transaction that
|
||||
could be obtained, for example, from a blockexplorer.
|
||||
:param hex_tx: Hexadecimal serialized transaction.
|
||||
:type hex_tx: hex str
|
||||
:return: The transaction build using the provided hex serialized transaction.
|
||||
:rtype: TX
|
||||
"""
|
||||
|
||||
tx = cls()
|
||||
tx.hex = hex_tx
|
||||
|
||||
try:
|
||||
tx.version = int(change_endianness(parse_element(tx, 4)), 16)
|
||||
|
||||
# INPUTS
|
||||
tx.inputs = int(parse_varint(tx), 16)
|
||||
|
||||
for i in range(tx.inputs):
|
||||
tx.prev_tx_id.append(change_endianness(parse_element(tx, 32)))
|
||||
tx.prev_out_index.append(int(change_endianness(parse_element(tx, 4)), 16))
|
||||
# ScriptSig
|
||||
tx.scriptSig_len.append(int(parse_varint(tx), 16))
|
||||
tx.scriptSig.append(parse_element(tx, tx.scriptSig_len[i]))
|
||||
tx.nSequence.append(int(parse_element(tx, 4), 16))
|
||||
|
||||
# OUTPUTS
|
||||
tx.outputs = int(parse_varint(tx), 16)
|
||||
|
||||
for i in range(tx.outputs):
|
||||
tx.value.append(int(change_endianness(parse_element(tx, 8)), 16))
|
||||
# ScriptPubKey
|
||||
tx.scriptPubKey_len.append(int(parse_varint(tx), 16))
|
||||
tx.scriptPubKey.append(parse_element(tx, tx.scriptPubKey_len[i]))
|
||||
|
||||
tx.nLockTime = int(parse_element(tx, 4), 16)
|
||||
|
||||
if tx.offset != len(tx.hex):
|
||||
# There is some error in the serialized transaction passed as input. Transaction can't be built
|
||||
tx = None
|
||||
else:
|
||||
tx.offset = 0
|
||||
|
||||
except ValueError:
|
||||
# If a parsing error occurs, the deserialization stops and None is returned
|
||||
tx = None
|
||||
|
||||
return tx
|
||||
|
||||
def serialize(self, rtype=hex):
|
||||
""" Serialize all the transaction fields arranged in the proper order, resulting in a hexadecimal string
|
||||
ready to be broadcast to the network.
|
||||
:param self: self
|
||||
:type self: TX
|
||||
:param rtype: Whether the serialized transaction is returned as a hex str or a byte array.
|
||||
:type rtype: hex or bool
|
||||
:return: Serialized transaction representation (hexadecimal or bin depending on rtype parameter).
|
||||
:rtype: hex str / bin
|
||||
"""
|
||||
|
||||
if rtype not in [hex, bin]:
|
||||
raise Exception("Invalid return type (rtype). It should be either hex or bin.")
|
||||
serialized_tx = change_endianness(int2bytes(self.version, 4)) # 4-byte version number (LE).
|
||||
|
||||
# INPUTS
|
||||
serialized_tx += encode_varint(self.inputs) # Varint number of inputs.
|
||||
|
||||
for i in range(self.inputs):
|
||||
serialized_tx += change_endianness(self.prev_tx_id[i]) # 32-byte hash of the previous transaction (LE).
|
||||
serialized_tx += change_endianness(int2bytes(self.prev_out_index[i], 4)) # 4-byte output index (LE)
|
||||
serialized_tx += encode_varint(len(self.scriptSig[i]) // 2) # Varint input script length.
|
||||
# ScriptSig
|
||||
serialized_tx += self.scriptSig[i] # Input script.
|
||||
serialized_tx += int2bytes(self.nSequence[i], 4) # 4-byte sequence number.
|
||||
|
||||
# OUTPUTS
|
||||
serialized_tx += encode_varint(self.outputs) # Varint number of outputs.
|
||||
|
||||
if self.outputs != 0:
|
||||
for i in range(self.outputs):
|
||||
serialized_tx += change_endianness(int2bytes(self.value[i], 8)) # 8-byte field Satoshi value (LE)
|
||||
# ScriptPubKey
|
||||
serialized_tx += encode_varint(len(self.scriptPubKey[i]) // 2) # Varint Output script length.
|
||||
serialized_tx += self.scriptPubKey[i] # Output script.
|
||||
|
||||
serialized_tx += int2bytes(self.nLockTime, 4) # 4-byte lock time field
|
||||
|
||||
# If return type has been set to binary, the serialized transaction is converted.
|
||||
if rtype is bin:
|
||||
serialized_tx = unhexlify(serialized_tx)
|
||||
|
||||
return serialized_tx
|
||||
|
||||
@staticmethod
|
||||
def create_dummy_transaction(prev_tx_id=None, prev_out_index=None):
|
||||
tx = TX()
|
||||
|
||||
if prev_tx_id is None:
|
||||
prev_tx_id = urandom(32).hex()
|
||||
|
||||
if prev_out_index is None:
|
||||
prev_out_index = 0
|
||||
|
||||
tx.version = 1
|
||||
tx.inputs = 1
|
||||
tx.outputs = 1
|
||||
tx.prev_tx_id = [prev_tx_id]
|
||||
tx.prev_out_index = [prev_out_index]
|
||||
tx.nLockTime = 0
|
||||
tx.scriptSig = [
|
||||
'47304402204e45e16932b8af514961a1d3a1a25fdf3f4f7732e9d624c6c61548ab5fb8cd410220181522ec8eca07de4860'
|
||||
'a4acdd12909d831cc56cbbac4622082221a8768d1d0901']
|
||||
tx.scriptSig_len = [77]
|
||||
tx.nSequence = [4294967295]
|
||||
tx.value = [5000000000]
|
||||
tx.scriptPubKey = [
|
||||
'4104ae1a62fe09c5f51b13905f07f06b99a2f7159b2225f374cd378d71302fa28414e7aab37397f554a7df5f142c21c'
|
||||
'1b7303b8a0626f1baded5c72a704f7e6cd84cac']
|
||||
tx.scriptPubKey_len = [67]
|
||||
|
||||
return tx.serialize()
|
||||
128
test/simulator/utils.py
Normal file
128
test/simulator/utils.py
Normal file
@@ -0,0 +1,128 @@
|
||||
# Porting some functionality from https://github.com/sr-gi/bitcoin_tools with some modifications <3
|
||||
from hashlib import sha256
|
||||
from binascii import unhexlify
|
||||
|
||||
|
||||
def change_endianness(x):
|
||||
""" Changes the endianness (from BE to LE and vice versa) of a given value.
|
||||
:param x: Given value which endianness will be changed.
|
||||
:type x: hex str
|
||||
:return: The opposite endianness representation of the given value.
|
||||
:rtype: hex str
|
||||
"""
|
||||
|
||||
# If there is an odd number of elements, we make it even by adding a 0
|
||||
if (len(x) % 2) == 1:
|
||||
x += "0"
|
||||
|
||||
y = bytes(x, 'utf-8')
|
||||
z = y[::-1]
|
||||
return z.decode('utf-8')
|
||||
|
||||
|
||||
def parse_varint(tx):
|
||||
""" Parses a given transaction for extracting an encoded varint element.
|
||||
:param tx: Transaction where the element will be extracted.
|
||||
:type tx: TX
|
||||
:return: The b-bytes representation of the given value (a) in hex format.
|
||||
:rtype: hex str
|
||||
"""
|
||||
|
||||
# First of all, the offset of the hex transaction if moved to the proper position (i.e where the varint should be
|
||||
# located) and the length and format of the data to be analyzed is checked.
|
||||
data = tx.hex[tx.offset:]
|
||||
assert (len(data) > 0)
|
||||
size = int(data[:2], 16)
|
||||
assert (size <= 255)
|
||||
|
||||
# Then, the integer is encoded as a varint using the proper prefix, if needed.
|
||||
if size <= 252: # No prefix
|
||||
storage_length = 1
|
||||
elif size == 253: # 0xFD
|
||||
storage_length = 3
|
||||
elif size == 254: # 0xFE
|
||||
storage_length = 5
|
||||
elif size == 255: # 0xFF
|
||||
storage_length = 9
|
||||
else:
|
||||
raise Exception("Wrong input data size")
|
||||
|
||||
# Finally, the storage length is used to extract the proper number of bytes from the transaction hex and the
|
||||
# transaction offset is updated.
|
||||
varint = data[:storage_length * 2]
|
||||
tx.offset += storage_length * 2
|
||||
|
||||
return varint
|
||||
|
||||
|
||||
def parse_element(tx, size):
|
||||
""" Parses a given transaction to extract an element of a given size.
|
||||
:param tx: Transaction where the element will be extracted.
|
||||
:type tx: TX
|
||||
:param size: Size of the parameter to be extracted.
|
||||
:type size: int
|
||||
:return: The extracted element.
|
||||
:rtype: hex str
|
||||
"""
|
||||
|
||||
element = tx.hex[tx.offset:tx.offset + size * 2]
|
||||
tx.offset += size * 2
|
||||
return element
|
||||
|
||||
|
||||
def encode_varint(value):
|
||||
""" Encodes a given integer value to a varint. It only used the four varint representation cases used by bitcoin:
|
||||
1-byte, 2-byte, 4-byte or 8-byte integers.
|
||||
:param value: The integer value that will be encoded into varint.
|
||||
:type value: int
|
||||
:return: The varint representation of the given integer value.
|
||||
:rtype: str
|
||||
"""
|
||||
|
||||
# The value is checked in order to choose the size of its final representation.
|
||||
# 0xFD(253), 0xFE(254) and 0xFF(255) are special cases, since are the prefixes defined for 2-byte, 4-byte
|
||||
# and 8-byte long values respectively.
|
||||
if value < pow(2, 8) - 3:
|
||||
size = 1
|
||||
varint = int2bytes(value, size) # No prefix
|
||||
else:
|
||||
if value < pow(2, 16):
|
||||
size = 2
|
||||
prefix = 253 # 0xFD
|
||||
elif value < pow(2, 32):
|
||||
size = 4
|
||||
prefix = 254 # 0xFE
|
||||
elif value < pow(2, 64):
|
||||
size = 8
|
||||
prefix = 255 # 0xFF
|
||||
else:
|
||||
raise Exception("Wrong input data size")
|
||||
varint = format(prefix, 'x') + change_endianness(int2bytes(value, size))
|
||||
|
||||
return varint
|
||||
|
||||
|
||||
def int2bytes(a, b):
|
||||
""" Converts a given integer value (a) its b-byte representation, in hex format.
|
||||
:param a: Value to be converted.
|
||||
:type a: int
|
||||
:param b: Byte size to be filled.
|
||||
:type b: int
|
||||
:return: The b-bytes representation of the given value (a) in hex format.
|
||||
:rtype: hex str
|
||||
"""
|
||||
|
||||
m = pow(2, 8*b) - 1
|
||||
if a > m:
|
||||
raise Exception(str(a) + " is too big to be represented with " + str(b) + " bytes. Maximum value is "
|
||||
+ str(m) + ".")
|
||||
|
||||
return ('%0' + str(2 * b) + 'x') % a
|
||||
|
||||
|
||||
def sha256d(hex_data):
|
||||
data = unhexlify(hex_data)
|
||||
double_sha256 = sha256(sha256(data).digest()).hexdigest()
|
||||
|
||||
return change_endianness(double_sha256)
|
||||
|
||||
Reference in New Issue
Block a user