mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-18 14:44:21 +01:00
Adds basic functionality for both zmq_pub and bitcoin_rpc simulators
This commit is contained in:
35
tests/bitcoin_rpc_sim.py
Normal file
35
tests/bitcoin_rpc_sim.py
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
from flask import Flask, request, Response, abort
|
||||||
|
import json
|
||||||
|
|
||||||
|
app = Flask(__name__)
|
||||||
|
HOST = 'localhost'
|
||||||
|
PORT = '18443'
|
||||||
|
|
||||||
|
|
||||||
|
@app.route('/', methods=['POST'])
|
||||||
|
def process_request():
|
||||||
|
request_data = request.get_json()
|
||||||
|
method = request_data.get('method')
|
||||||
|
|
||||||
|
if method == "help":
|
||||||
|
pass
|
||||||
|
elif method == "getblockcount":
|
||||||
|
pass
|
||||||
|
elif method == "getblock":
|
||||||
|
pass
|
||||||
|
elif method == "getblockhash":
|
||||||
|
pass
|
||||||
|
elif method == "getrawtransaction":
|
||||||
|
pass
|
||||||
|
elif method == "decoderawtransaction":
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
return abort(500, "Unsupported method")
|
||||||
|
|
||||||
|
response = {"id": 0, "result": 0, "error": None}
|
||||||
|
|
||||||
|
return Response(json.dumps(response), status=200, mimetype='application/json')
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
app.run(host=HOST, port=PORT)
|
||||||
25
tests/zmq_pub_sim.py
Normal file
25
tests/zmq_pub_sim.py
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
import zmq
|
||||||
|
import time
|
||||||
|
from binascii import unhexlify
|
||||||
|
from pisa.conf import FEED_PROTOCOL, FEED_ADDR, FEED_PORT
|
||||||
|
|
||||||
|
|
||||||
|
class ZMQServerSimulator:
|
||||||
|
def __init__(self, topic=b'hashblock'):
|
||||||
|
self.topic = topic
|
||||||
|
self.context = zmq.Context()
|
||||||
|
self.socket = self.context.socket(zmq.PUB)
|
||||||
|
self.socket.bind("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT))
|
||||||
|
|
||||||
|
def mine_block(self, block_hash):
|
||||||
|
self.socket.send_multipart([self.topic, block_hash])
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
simulator = ZMQServerSimulator()
|
||||||
|
|
||||||
|
block_hash = unhexlify('0000000000000000000873e8ebc0bb1e61e560a773ec2319457b71f1b4030be0')
|
||||||
|
|
||||||
|
while True:
|
||||||
|
simulator.mine_block(block_hash)
|
||||||
Reference in New Issue
Block a user