diff --git a/bfxapi/client.py b/bfxapi/client.py index 0ac9fd6..8dabd64 100644 --- a/bfxapi/client.py +++ b/bfxapi/client.py @@ -19,11 +19,10 @@ class Client: """ def __init__(self, API_KEY=None, API_SECRET=None, rest_host=REST_HOST, - ws_host=WS_HOST, loop=None, logLevel='INFO', dead_man_switch=False, + ws_host=WS_HOST, create_event_emitter=None, logLevel='INFO', dead_man_switch=False, ws_capacity=25, *args, **kwargs): - self.loop = loop or asyncio.get_event_loop() self.ws = BfxWebsocket(API_KEY=API_KEY, API_SECRET=API_SECRET, host=ws_host, - loop=self.loop, logLevel=logLevel, dead_man_switch=dead_man_switch, - ws_capacity=ws_capacity, *args, **kwargs) + logLevel=logLevel, dead_man_switch=dead_man_switch, + ws_capacity=ws_capacity, create_event_emitter=create_event_emitter, *args, **kwargs) self.rest = BfxRest(API_KEY=API_KEY, API_SECRET=API_SECRET, host=rest_host, - loop=self.loop, logLevel=logLevel, *args, **kwargs) + logLevel=logLevel, *args, **kwargs) diff --git a/bfxapi/tests/helpers.py b/bfxapi/tests/helpers.py index 6edcf66..1e76139 100644 --- a/bfxapi/tests/helpers.py +++ b/bfxapi/tests/helpers.py @@ -1,6 +1,7 @@ import time import json import asyncio +from pyee import EventEmitter from .. import Client, BfxWebsocket, Socket @@ -8,7 +9,7 @@ def get_now(): return int(round(time.time() * 1000)) def ev_worker_override(): - return asyncio.get_event_loop() + return EventEmitter() class StubbedWebsocket(BfxWebsocket): diff --git a/bfxapi/websockets/generic_websocket.py b/bfxapi/websockets/generic_websocket.py index 1f4db35..6c59a81 100644 --- a/bfxapi/websockets/generic_websocket.py +++ b/bfxapi/websockets/generic_websocket.py @@ -60,7 +60,8 @@ def _start_event_worker(): event_loop = asyncio.new_event_loop() worker = Thread(target=start_loop, args=(event_loop,)) worker.start() - return event_loop + ee = EventEmitter(scheduler=asyncio.ensure_future, loop=event_loop) + return ee class GenericWebsocket: """ @@ -68,11 +69,9 @@ class GenericWebsocket: Inlcudes an event emitter and a standard websocket client. """ - def __init__(self, host, logLevel='INFO', loop=None, max_retries=5, - create_event_emitter=_start_event_worker): + def __init__(self, host, logLevel='INFO', max_retries=5, create_event_emitter=None): self.host = host self.logger = CustomLogger('BfxWebsocket', logLevel=logLevel) - self.loop = loop or asyncio.get_event_loop() # overide 'error' event to stop it raising an exception # self.events.on('error', self.on_error) self.ws = None @@ -80,8 +79,8 @@ class GenericWebsocket: self.attempt_retry = True self.sockets = {} # start seperate process for the even emitter - eventLoop = create_event_emitter() - self.events = EventEmitter(scheduler=asyncio.ensure_future, loop=eventLoop) + create_ee = create_event_emitter or _start_event_worker + self.events = create_ee() def run(self): """ @@ -198,7 +197,8 @@ class GenericWebsocket: """ self.logger.info("Websocket closed.") self.attempt_retry = False - await self.ws.close() + for key, socket in self.sockets.items(): + await socket.ws.close() self._emit('done') async def on_open(self):