From 275cd1c75befbacd5bfa28d58d3d359aa79b9b62 Mon Sep 17 00:00:00 2001 From: JacobPlaster Date: Tue, 14 Apr 2020 14:10:34 +0100 Subject: [PATCH] generic_websocket: remove daemon thread and add stop function --- bfxapi/websockets/generic_websocket.py | 29 ++++++++++++-------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/bfxapi/websockets/generic_websocket.py b/bfxapi/websockets/generic_websocket.py index 826d474..134279f 100644 --- a/bfxapi/websockets/generic_websocket.py +++ b/bfxapi/websockets/generic_websocket.py @@ -57,15 +57,7 @@ class Socket(): await self.ws.send(data) def _start_event_worker(): - def start_loop(loop): - asyncio.set_event_loop(loop) - loop.run_forever() - event_loop = asyncio.new_event_loop() - ee = EventEmitter(scheduler=asyncio.ensure_future) - worker = Thread(target=start_loop, args=(event_loop,)) - worker.daemon = True - worker.start() - return ee + return EventEmitter(scheduler=asyncio.ensure_future) class GenericWebsocket: """ @@ -83,7 +75,7 @@ class GenericWebsocket: self.max_retries = max_retries self.attempt_retry = True self.sockets = {} - # start seperate process for the even emitter + # start separate process for the even emitter create_ee = create_event_emitter or _start_event_worker self.events = create_ee() @@ -166,6 +158,15 @@ class GenericWebsocket: self.logger.info("Unable to connect to websocket.") self._emit('stopped') + async def stop(self): + """ + Stop all websocket connections + """ + self.attempt_retry = False + for key, socket in self.sockets.items(): + await socket.ws.close() + self._emit('done') + def remove_all_listeners(self, event): """ Remove all listeners from event emitter @@ -200,13 +201,9 @@ class GenericWebsocket: async def on_close(self): """ - On websocket close print and fire event. This is used by the data server. + This is used by the HF data server. """ - self.logger.info("Websocket closed.") - self.attempt_retry = False - for key, socket in self.sockets.items(): - await socket.ws.close() - self._emit('done') + self.stop() async def on_open(self): """