generic_websocket: remove daemon thread and add stop function

This commit is contained in:
JacobPlaster
2020-04-14 14:10:34 +01:00
committed by Jacob Plaster
parent 46302e312b
commit 275cd1c75b

View File

@@ -57,15 +57,7 @@ class Socket():
await self.ws.send(data) await self.ws.send(data)
def _start_event_worker(): def _start_event_worker():
def start_loop(loop): return EventEmitter(scheduler=asyncio.ensure_future)
asyncio.set_event_loop(loop)
loop.run_forever()
event_loop = asyncio.new_event_loop()
ee = EventEmitter(scheduler=asyncio.ensure_future)
worker = Thread(target=start_loop, args=(event_loop,))
worker.daemon = True
worker.start()
return ee
class GenericWebsocket: class GenericWebsocket:
""" """
@@ -83,7 +75,7 @@ class GenericWebsocket:
self.max_retries = max_retries self.max_retries = max_retries
self.attempt_retry = True self.attempt_retry = True
self.sockets = {} self.sockets = {}
# start seperate process for the even emitter # start separate process for the even emitter
create_ee = create_event_emitter or _start_event_worker create_ee = create_event_emitter or _start_event_worker
self.events = create_ee() self.events = create_ee()
@@ -166,6 +158,15 @@ class GenericWebsocket:
self.logger.info("Unable to connect to websocket.") self.logger.info("Unable to connect to websocket.")
self._emit('stopped') self._emit('stopped')
async def stop(self):
"""
Stop all websocket connections
"""
self.attempt_retry = False
for key, socket in self.sockets.items():
await socket.ws.close()
self._emit('done')
def remove_all_listeners(self, event): def remove_all_listeners(self, event):
""" """
Remove all listeners from event emitter Remove all listeners from event emitter
@@ -200,13 +201,9 @@ class GenericWebsocket:
async def on_close(self): async def on_close(self):
""" """
On websocket close print and fire event. This is used by the data server. This is used by the HF data server.
""" """
self.logger.info("Websocket closed.") self.stop()
self.attempt_retry = False
for key, socket in self.sockets.items():
await socket.ws.close()
self._emit('done')
async def on_open(self): async def on_open(self):
""" """