mirror of
https://github.com/aljazceru/python-teos.git
synced 2025-12-17 22:24:23 +01:00
Improves on watcher logic.
Improves the dummy watcher logic. Sets a proper structure. Watcher can now check for potential matches and handle multiple appointments with the same locator (hash collisions).
This commit is contained in:
committed by
Sergi Delgado Segura
parent
21eed576b2
commit
803712e9b1
@@ -1,10 +1,15 @@
|
||||
import threading
|
||||
from multiprocessing.connection import Listener
|
||||
from pisa import *
|
||||
from pisa.watcher import Watcher
|
||||
from pisa.inspector import Inspector
|
||||
from multiprocessing.connection import Listener
|
||||
|
||||
|
||||
def manage_api(debug, host=HOST, port=PORT):
|
||||
listener = Listener((host, port))
|
||||
watcher = Watcher()
|
||||
inspector = Inspector()
|
||||
|
||||
while True:
|
||||
conn = listener.accept()
|
||||
|
||||
@@ -12,20 +17,37 @@ def manage_api(debug, host=HOST, port=PORT):
|
||||
print('Connection accepted from', listener.last_accepted)
|
||||
|
||||
# Maintain metadata up to date.
|
||||
t_serve = threading.Thread(target=serve_data, args=[debug, conn, listener.last_accepted])
|
||||
t_serve = threading.Thread(target=manage_request, args=[conn, listener.last_accepted, inspector, watcher,
|
||||
debug])
|
||||
t_serve.start()
|
||||
|
||||
|
||||
def serve_data(debug, conn, remote_addr):
|
||||
def manage_request(conn, remote_addr, inspector, watcher, debug):
|
||||
while not conn.closed:
|
||||
try:
|
||||
response = "Unknown command"
|
||||
msg = conn.recv()
|
||||
|
||||
if type(msg) == tuple:
|
||||
if len(msg) is 2:
|
||||
command, arg = msg
|
||||
|
||||
print(command, arg)
|
||||
if command == "add_appointment":
|
||||
appointment = inspector.inspect(msg, debug)
|
||||
if appointment:
|
||||
appointment_added = watcher.add_appointment(appointment, debug)
|
||||
|
||||
if appointment_added:
|
||||
response = "Appointment accepted"
|
||||
else:
|
||||
response = "Appointment rejected"
|
||||
else:
|
||||
response = "Appointment rejected"
|
||||
|
||||
# Send response back. Change multiprocessing.connection for an http based connection
|
||||
if debug:
|
||||
print('Sending response and disconnecting:', response, remote_addr)
|
||||
conn.close()
|
||||
|
||||
except (IOError, EOFError):
|
||||
if debug:
|
||||
|
||||
10
pisa-btc/pisa/appointment.py
Normal file
10
pisa-btc/pisa/appointment.py
Normal file
@@ -0,0 +1,10 @@
|
||||
# Basic appointment structure
|
||||
class Appointment:
|
||||
def __init__(self, locator, start_time, end_time, encrypted_blob, cypher):
|
||||
self.locator = locator
|
||||
self.start_time = start_time
|
||||
self.end_time = end_time
|
||||
self.encrypted_blob = encrypted_blob
|
||||
self.cypher = cypher
|
||||
|
||||
|
||||
@@ -2,8 +2,6 @@ from getopt import getopt
|
||||
from sys import argv
|
||||
from threading import Thread
|
||||
from pisa import shared
|
||||
from pisa.zmq_subscriber import run_subscribe
|
||||
from pisa.tx_watcher import watch_txs
|
||||
from pisa.api import manage_api
|
||||
|
||||
|
||||
@@ -16,10 +14,5 @@ if __name__ == '__main__':
|
||||
|
||||
shared.init()
|
||||
|
||||
zmq_thread = Thread(target=run_subscribe, args=[debug])
|
||||
tx_watcher_thread = Thread(target=watch_txs, args=[debug])
|
||||
api_thread = Thread(target=manage_api, args=[debug])
|
||||
|
||||
zmq_thread.start()
|
||||
tx_watcher_thread.start()
|
||||
api_thread.start()
|
||||
|
||||
@@ -2,8 +2,10 @@ import zmq
|
||||
import binascii
|
||||
from queue import Queue
|
||||
from threading import Thread
|
||||
from pisa.tools import decrypt_tx
|
||||
from bitcoinrpc.authproxy import AuthServiceProxy, JSONRPCException
|
||||
from conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, FEED_PROTOCOL, FEED_ADDR, FEED_PORT
|
||||
from conf import BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST, BTC_RPC_PORT, FEED_PROTOCOL, FEED_ADDR, FEED_PORT, \
|
||||
MAX_APPOINTMENTS
|
||||
|
||||
|
||||
class ZMQHandler:
|
||||
@@ -15,7 +17,7 @@ class ZMQHandler:
|
||||
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
|
||||
self.zmqSubSocket.connect("%s://%s:%s" % (FEED_PROTOCOL, FEED_ADDR, FEED_PORT))
|
||||
|
||||
def handle(self, debug, block_queue):
|
||||
def handle(self, block_queue, debug):
|
||||
msg = self.zmqSubSocket.recv_multipart()
|
||||
topic = msg[0]
|
||||
body = msg[1]
|
||||
@@ -29,32 +31,50 @@ class ZMQHandler:
|
||||
|
||||
|
||||
class Watcher:
|
||||
def __init__(self, debug):
|
||||
self.appointments = []
|
||||
self.sleep = True
|
||||
self.debug = debug
|
||||
def __init__(self, max_appointments=MAX_APPOINTMENTS):
|
||||
self.appointments = dict()
|
||||
self.block_queue = Queue()
|
||||
self.sleep = True
|
||||
self.max_appointments = max_appointments
|
||||
|
||||
def add_appointment(self, appointment):
|
||||
def add_appointment(self, appointment, debug):
|
||||
# ToDo: Discuss about validation of input data
|
||||
self.appointments.append(appointment)
|
||||
|
||||
if self.sleep:
|
||||
self.sleep = False
|
||||
zmq_subscriber = Thread(target=self.do_subscribe, args=[self.block_queue])
|
||||
zmq_subscriber.start()
|
||||
self.do_watch()
|
||||
if len(self.appointments) < self.max_appointments:
|
||||
# Appointments are identified by the locator: the most significant 16 bytes of the commitment txid.
|
||||
# While 16-byte hash collisions are not likely, they are possible, so we will store appointments in lists
|
||||
# even if we only have one (so the code logic is simplified from this point on).
|
||||
if not self.appointments.get(appointment.locator):
|
||||
self.appointments[appointment.locator] = []
|
||||
|
||||
# Rationale:
|
||||
# The Watcher will analyze every received block looking for appointment matches. If there is no work
|
||||
# to do the watcher can sleep (appointments = [] and sleep = True) otherwise for every received block
|
||||
# the watcher will get the list of transactions and compare it with the list of appointments
|
||||
# Use an internal id (position in the list) to distinguish between different appointments with the same
|
||||
# locator
|
||||
appointment.id = len(self.appointments[appointment.locator])
|
||||
self.appointments[appointment.locator].append(appointment)
|
||||
|
||||
def do_subscribe(self, block_queue):
|
||||
if self.sleep:
|
||||
self.sleep = False
|
||||
zmq_subscriber = Thread(target=self.do_subscribe, args=[self.block_queue, debug])
|
||||
watcher = Thread(target=self.do_watch, args=[debug])
|
||||
zmq_subscriber.start()
|
||||
watcher.start()
|
||||
|
||||
appointment_added = True
|
||||
|
||||
# Rationale:
|
||||
# The Watcher will analyze every received block looking for appointment matches. If there is no work
|
||||
# to do the watcher can sleep (appointments = [] and sleep = True) otherwise for every received block
|
||||
# the watcher will get the list of transactions and compare it with the list of appointments
|
||||
else:
|
||||
appointment_added = False
|
||||
|
||||
return appointment_added
|
||||
|
||||
def do_subscribe(self, block_queue, debug):
|
||||
daemon = ZMQHandler()
|
||||
daemon.handle(self.debug, block_queue)
|
||||
daemon.handle(block_queue, debug)
|
||||
|
||||
def do_watch(self):
|
||||
def do_watch(self, debug):
|
||||
bitcoin_cli = AuthServiceProxy("http://%s:%s@%s:%d" % (BTC_RPC_USER, BTC_RPC_PASSWD, BTC_RPC_HOST,
|
||||
BTC_RPC_PORT))
|
||||
|
||||
@@ -63,17 +83,51 @@ class Watcher:
|
||||
|
||||
try:
|
||||
block = bitcoin_cli.getblock(block_hash)
|
||||
|
||||
prev_block_id = block.get('previousblockhash')
|
||||
txs = block.get('tx')
|
||||
|
||||
# ToDo: Check for every tx in txs if there is an appointment that matches (MS 16-bytes)
|
||||
|
||||
if self.debug:
|
||||
print("New block received ", block_hash)
|
||||
print("Prev. block hash ", prev_block_id)
|
||||
if debug:
|
||||
print("New block received", block_hash)
|
||||
print("Prev. block hash", prev_block_id)
|
||||
print("List of transactions", txs)
|
||||
|
||||
potential_matches = []
|
||||
|
||||
for k in self.appointments.keys():
|
||||
potential_matches += [(k, tx[32:]) for tx in txs if tx.startswith(k)]
|
||||
|
||||
if debug:
|
||||
if len(potential_matches) > 0:
|
||||
print("List of potential matches", potential_matches)
|
||||
else:
|
||||
print("No potential matches found")
|
||||
|
||||
matches = self.check_potential_matches(potential_matches, bitcoin_cli)
|
||||
|
||||
# ToDo: Handle matches
|
||||
# ToDo: Matches will be empty list if no matches, list of matches otherwise
|
||||
# ToDo: Notify responder with every match.
|
||||
# ToDo: Get rid of appointment? Set appointment to a different state (create appointment state first)?
|
||||
|
||||
except JSONRPCException as e:
|
||||
print(e)
|
||||
continue
|
||||
|
||||
def check_potential_matches(self, potential_matches, bitcoin_cli):
|
||||
matches = []
|
||||
|
||||
for locator, k in potential_matches:
|
||||
for appointment in self.appointments.get(locator):
|
||||
try:
|
||||
decrypted_data = decrypt_tx(appointment.encrypted_blob, k, appointment.cypher)
|
||||
bitcoin_cli.decoderawtransaction(decrypted_data)
|
||||
matches.append((locator, appointment.id, decrypted_data))
|
||||
except JSONRPCException as e:
|
||||
# Tx decode failed returns error code -22, maybe we should be more strict here. Leaving it simple
|
||||
# for the POC
|
||||
print(e)
|
||||
continue
|
||||
|
||||
return matches
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user