diff --git a/bfxapi/__init__.py b/bfxapi/__init__.py new file mode 100644 index 0000000..c11c9ab --- /dev/null +++ b/bfxapi/__init__.py @@ -0,0 +1 @@ +from .client import Client, Constants \ No newline at end of file diff --git a/bfxapi/client.py b/bfxapi/client.py new file mode 100644 index 0000000..7374251 --- /dev/null +++ b/bfxapi/client.py @@ -0,0 +1,11 @@ +from .websocket import BfxWebsocketClient + +from enum import Enum + +class Constants(str, Enum): + WSS_HOST = "wss://api.bitfinex.com/ws/2" + PUB_WSS_HOST = "wss://api-pub.bitfinex.com/ws/2" + +class Client(object): + def __init__(self, WSS_HOST: str = Constants.WSS_HOST): + self.wss = BfxWebsocketClient(host=WSS_HOST) \ No newline at end of file diff --git a/bfxapi/websocket/BfxWebsocketClient.py b/bfxapi/websocket/BfxWebsocketClient.py new file mode 100644 index 0000000..587576d --- /dev/null +++ b/bfxapi/websocket/BfxWebsocketClient.py @@ -0,0 +1,68 @@ +import json, asyncio, websockets + +from pyee.asyncio import AsyncIOEventEmitter + +from .channels import Channels + +class BfxWebsocketClient(object): + def __init__(self, host, channels=None): + self.host = host + + self.chanIds, self.event_emitter = dict(), AsyncIOEventEmitter() + + self.channels = channels or list() + + def run_forever(self): + asyncio.run(self.connect()) + + async def connect(self): + async for websocket in websockets.connect(self.host): + try: + self.websocket = websocket + + for channel, parameters in self.channels: + await self.subscribe(channel, **parameters) + else: self.event_emitter.emit("open") + + async for message in websocket: + message = json.loads(message) + + if isinstance(message, dict) and message["event"] == "subscribed": + self.chanIds[message["chanId"]] = message + + self.event_emitter.emit("subscribed", message) + + if isinstance(message, list): + chanId, parameters = message[0], message[1:] + + subscription = self.chanIds[chanId] + + if subscription["channel"] == Channels.TICKER: + self.event_emitter.emit("ticker", subscription, parameters[0]) + + if subscription["channel"] == Channels.TRADES: + if len(parameters) == 1: + self.event_emitter.emit("trades_snapshot", subscription, parameters[0]) + + if len(parameters) == 2: + self.event_emitter.emit("trades_update", subscription, parameters[0], parameters[1]) + + if subscription["channel"] == Channels.BOOK: + if all(isinstance(element, list) for element in parameters[0]): + self.event_emitter.emit("book_snapshot", subscription, parameters[0]) + else: self.event_emitter.emit("book_update", subscription, parameters[0]) + except websockets.ConnectionClosed: + continue + + async def subscribe(self, channel, **kwargs): + await self.websocket.send(json.dumps({ + "event": "subscribe", + "channel": channel, + **kwargs + })) + + def on(self, event): + def handler(function): + self.event_emitter.on(event, function) + + return handler diff --git a/bfxapi/websocket/__init__.py b/bfxapi/websocket/__init__.py new file mode 100644 index 0000000..f33aa63 --- /dev/null +++ b/bfxapi/websocket/__init__.py @@ -0,0 +1,3 @@ +from .BfxWebsocketClient import BfxWebsocketClient + +from .channels import Channels \ No newline at end of file diff --git a/bfxapi/websocket/channels.py b/bfxapi/websocket/channels.py new file mode 100644 index 0000000..5db673c --- /dev/null +++ b/bfxapi/websocket/channels.py @@ -0,0 +1,6 @@ +from enum import Enum + +class Channels(str, Enum): + TICKER = "ticker" + TRADES = "trades" + BOOK = "book" \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..78560fa Binary files /dev/null and b/requirements.txt differ