Coder Social home page Coder Social logo

bmoscon / cryptofeed Goto Github PK

View Code? Open in Web Editor NEW
2.1K 78.0 661.0 118.95 MB

Cryptocurrency Exchange Websocket Data Feed Handler

License: Other

Python 95.26% Cython 4.61% Shell 0.13%
python trading market-data cryptocurrency bitcoin exchange websockets coinbase binance influxdb asyncio websocket ftx-exchange trading-platform ethereum cryptocurrencies btc crypto coinbase-api

cryptofeed's Introduction

Cryptocurrency Exchange Feed Handler

License Python PyPi Codacy Badge

Handles multiple cryptocurrency exchange data feeds and returns normalized and standardized results to client registered callbacks for events like trades, book updates, ticker updates, etc. Utilizes websockets when possible, but can also poll data via REST endpoints if a websocket is not provided.

Supported exchanges

Basic Usage

Create a FeedHandler object and add subscriptions. For the various data channels that an exchange supports, you can supply callbacks for data events, or use provided backends (described below) to handle the data for you. Start the feed handler and you're done!

from cryptofeed import FeedHandler
# not all imports shown for clarity

fh = FeedHandler()

# ticker, trade, and book are user defined functions that
# will be called when ticker, trade and book updates are received
ticker_cb = {TICKER: ticker}
trade_cb = {TRADES: trade}
gemini_cb = {TRADES: trade, L2_BOOK: book}


fh.add_feed(Coinbase(symbols=['BTC-USD'], channels=[TICKER], callbacks=ticker_cb))
fh.add_feed(Bitfinex(symbols=['BTC-USD'], channels=[TICKER], callbacks=ticker_cb))
fh.add_feed(Poloniex(symbols=['BTC-USDT'], channels=[TRADES], callbacks=trade_cb))
fh.add_feed(Gemini(symbols=['BTC-USD', 'ETH-USD'], channels=[TRADES, L2_BOOK], callbacks=gemini_cb))

fh.run()

Please see the examples for more code samples and the documentation for more information about the library usage.

For an example of a containerized application using cryptofeed to store data to a backend, please see Cryptostore.

National Best Bid/Offer (NBBO)

Cryptofeed also provides a synthetic NBBO (National Best Bid/Offer) feed that aggregates the best bids and asks from the user specified feeds.

from cryptofeed import FeedHandler
from cryptofeed.exchanges import Coinbase, Gemini, Kraken


def nbbo_update(symbol, bid, bid_size, ask, ask_size, bid_feed, ask_feed):
    print(f'Pair: {symbol} Bid Price: {bid:.2f} Bid Size: {bid_size:.6f} Bid Feed: {bid_feed} Ask Price: {ask:.2f} Ask Size: {ask_size:.6f} Ask Feed: {ask_feed}')


def main():
    f = FeedHandler()
    f.add_nbbo([Coinbase, Kraken, Gemini], ['BTC-USD'], nbbo_update)
    f.run()

Supported Channels

Cryptofeed supports the following channels from exchanges:

Market Data Channels (Public)

  • L1_BOOK - Top of book
  • L2_BOOK - Price aggregated sizes. Some exchanges provide the entire depth, some provide a subset.
  • L3_BOOK - Price aggregated orders. Like the L2 book, some exchanges may only provide partial depth.
  • TRADES - Note this reports the taker's side, even for exchanges that report the maker side.
  • TICKER
  • FUNDING
  • OPEN_INTEREST - Open interest data.
  • LIQUIDATIONS
  • INDEX
  • CANDLES - Candlestick / K-Line data.

Authenticated Data Channels

  • ORDER_INFO - Order status updates
  • TRANSACTIONS - Real-time updates on account deposits and withdrawals
  • BALANCES - Updates on wallet funds
  • FILLS - User's executed trades

Backends

Cryptofeed supports backend callbacks that will write directly to storage or other interfaces.

Supported Backends:

  • Redis (Streams and Sorted Sets)
  • Arctic
  • ZeroMQ
  • UDP Sockets
  • TCP Sockets
  • Unix Domain Sockets
  • InfluxDB v2
  • MongoDB
  • Kafka
  • RabbitMQ
  • PostgreSQL
  • QuasarDB
  • GCP Pub/Sub
  • QuestDB

Installation

Note: cryptofeed requires Python 3.8+

Cryptofeed can be installed from PyPi. (It's recommended that you install in a virtual environment of your choosing).

pip install cryptofeed

Cryptofeed has optional dependencies, depending on the backends used. You can install them individually, or all at once. To install Cryptofeed along with all its optional dependencies in one bundle:

pip install cryptofeed[all]

If you wish to clone the repository and install from source, run this command from the root of the cloned repository.

python setup.py install

Alternatively, you can install in 'edit' mode (also called development mode):

python setup.py develop

See more discussion of package installation in INSTALL.md.

Rest API

Cryptofeed supports some REST interfaces for retrieving real-time and historical data, as well as order placement and account management. These are integrated into the exchange classes directly. You can view the supported methods by calling the info() method on any exchange. The methods for interacting with the exchange RET endpoints exist in two flavors, the synchronous methods (suffixed with _sync) as well as the asynchronous which can be utilized with asyncio. For more information see the documentation.

Future Work

There are a lot of planned features, new exchanges, etc planned! If you'd like to discuss ongoing development, please join the slack or open a thread in the discussions in GitHub.

Contributing

Issues and PRs are welcomed!

Cryptofeed wouldn't be possible without the help of many contributors! I owe them and all other contributors my thanks!

Donations / Support

Support and donations are appreciated but not required. You can donate via GitHub Sponsors, or via the addresses below:

  • Bitcoin: bc1qm0kxz8hqacaglku5fjhfe9a5hjnuyfwk02lsyr
  • Ethereum: 0x690709FEe13eEce9E7852089BB2D53Ae5D073154

cryptofeed's People

Contributors

abashinfection avatar ablochs avatar agijsberts avatar akasimo avatar bmoscon avatar davidecoldebella avatar dependabot[bot] avatar dynamikey avatar evil-maid avatar globophobe avatar jinusean avatar lightcycleresearch avatar marechj avatar olibre avatar pandaxcentric avatar peedrr avatar qinghuangchan avatar quantfiction avatar quenos avatar rshtirmer avatar sheungon avatar t2o2 avatar toyan avatar tristan-murfitt-elw avatar twmeggs avatar vdbelt avatar vivikar avatar wangzhe3224 avatar xiandong79 avatar zahnz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

cryptofeed's Issues

Fail to reset the Poloniex order book after disconnection

It happened in two consecutive days that after running Poloniex feed for more than 5 hours, no longer it can parse the upcoming messages and keep returning error as follows.

2019-01-11 11:33:14,312 : ERROR : POLONIEX: encountered an exception, reconnecting
Traceback (most recent call last):
  File "/root/workspace/cryptofeed/cryptofeed/feedhandler.py", line 156, in _connect
    await self._handler(websocket, feed.message_handler, feed.id)
  File "/root/workspace/cryptofeed/cryptofeed/feedhandler.py", line 174, in _handler
    await handler(message)
  File "/root/workspace/cryptofeed/cryptofeed/poloniex/poloniex.py", line 146, in message_handler
    LOG.warning("%s: missing sequence number. Received %d, expected %d", self.id, seq_no, self.seq_no+1)
TypeError: unsupported operand type(s) for +: 'dict' and 'int'

The issue is due to incorrect warning message formatted and it caused failure to reset the book.

SSL issue after updating to current master (both win 10 and mac)

Launching any of examples I end up with the following either on mac or win10:

...
File "C:\Python37\lib\ssl.py", line 412, in wrap_socket
session=session
File "C:\Python37\lib\ssl.py", line 853, in _create
self.do_handshake()
File "C:\Python37\lib\ssl.py", line 1117, in do_handshake
self._sslobj.do_handshake()
TimeoutError: [WinError 10060] A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond

Ensure exchanges trades are reported the same way

example: Gemini reports realtime trades on the websocket based on the maker - the order that supplied the liquidity. A buy from the websocket, means someone sold btc, and the maker's order was used to fill that order. Their rest API is reversed, the buy/sell refers to the taker's order. A buy from the rest API means that a buy order was placed. Need to go through the APIs and make sure all exchanges are consistent. The preferred behavior should be to use the taker's side

Feed base class throws error with Gemini

Gemini does not support channels but passing None to Feeds base class throws error

The following code in gemini.py prevents from passing a value to the channels parameter

if channels is not None:
            LOG.error("Gemini does not support different channels")
            raise ValueError("Gemini does not support different channels")

and when Gemini constructor calls the super constructor it passes None

super().__init__('wss://api.gemini.com/v1/marketdata/' + pair_std_to_exchange(self.pair, 'GEMINI'),
                         pairs=None,
                         channels=None,
                         callbacks=callbacks)

but inside the init of the Feeds Constructor

if FUNDING in channels and self.id == BITFINEX:

which throws an error because channels is None and is not iterable

Add support of Bitmex indices

Hello I need record Bitmex indices๏ผŒbut they are not included in the active_symbols. I'm going to start a Pull request to solve it.

Multithreading data digest

I might get this wrong, so apologies if I do. During a book retrieval callback you might want to save the data into DB. The speed of the write routine and update arrival frequency become important. If writing is too slow or arrival rate to high, the writing routine becomes a bottleneck.

No messages are lost (great!), due to asyncio.ensure_future() (correct??). But due to slow data digestion a queue starts forming (correct??). If you are dealing with multiple currencies, you might never be able to catch up with the queue. So maybe and idea is to abandon the queue, take a new snapshot and start again. You miss a few updates but perhaps that's ok for a small trader. Re-starting the whole process is not ideal though.

I guess one way to deal with this is to write producer/consumer where the consumer (writer) runs on a separate thread. In this way, the main thread (with the book update) runs happily and (in general) no queues are formed because the writing is on an independent thread and so it's not blocking the main thread. This assumes that you are saving snapshots of the order book though.

use sorted dict

Replace order books with sorted dicts (from sortedcontainers)

standardize timestamps

some exchanges returns strings, some return timestamps in seconds, some in ms, etc. Going forward make all timestamps floating point second representation in UTC

Bitmex support

Youโ€™ve done a great job here and Iโ€™m very much looking forward to when BitMEX is added.

documentation

need to add documentation, both technical (for someone who wants to add features) and high level (for users of the library)

Kraken L2 crossed bid/ask prices

I'm getting negative spreads subscribing to Kraken exchange using the demo_zmq.py example. I am also subscribing to the trades feed along with Coinbase and Binance feeds.

Here are two subsequent messages from ETH-USD where spread is incorrect. Comparing to Kraken's trading platform, it seems the ask price and ask sizes are updating correctly but the bid price and bid size are not updating.

{'type': 'book', 'feed': 'KRAKEN', 'pair': 'ETH-USD', 'data': {'timestamp': 1555987491.0964444, 'bid': {'171.68000': '0.69084773'}, 'ask': {'171.63000': '9.96411128'}}}
{'type': 'book', 'feed': 'KRAKEN', 'pair': 'ETH-USD', 'data': {'timestamp': 1555987492.615416, 'bid': {'171.68000': '0.69084773'}, 'ask': {'171.63000': '31.89969709'}}}

from multiprocessing import Process
from cryptofeed.backends.zmq import BookZMQ, TradeZMQ
from cryptofeed import FeedHandler
from cryptofeed.exchanges import Coinbase, Kraken, Poloniex, Binance
from cryptofeed.defines import L3_BOOK, TRADES, L2_BOOK, TRADES


def book_receiver(port):
	import zmq
	import time
	addr = 'tcp://127.0.0.1:{}'.format(port)
	ctx = zmq.Context.instance()
	s = ctx.socket(zmq.PULL)
	s.connect(addr)
	while True:
		data = s.recv_json()
		print(data)
		time.sleep(0.5)


def trade_receiver(port):
	import zmq
	import time
	addr = 'tcp://127.0.0.1:{}'.format(port)
	ctx = zmq.Context.instance()
	s = ctx.socket(zmq.PULL)
	s.connect(addr)
	while True:
		data = s.recv_json()
		# print(data)
		time.sleep(0.5)


def read_cfg(fileName, key=None):
	'''open config file'''
	import yaml

	cfg = {}
	with open(fileName, 'r') as f:
		try:
			config = yaml.load(f)
			f.close()
			# only grab exchange specific config
			if key:
				for k, v in config.items():
					if key in k:
						cfg[k] = config[k]
				return cfg
			else:
				return config
		except yaml.YAMLError as exc:
			print(exc)



def main():
		subscriptions = read_cfg(fileName)
		cbpro_tickers = subscriptions['CBPRO']['TICKERS']
		kraken_tickers = subscriptions['KRAKEN']['TICKERS']
		# poloniex_tickers = subscriptions['POLONIEX']['TICKERS']
		binance_tickers = subscriptions['BINANCE']['TICKERS']

		try:
			# coinbase
			p1 = Process(target=trade_receiver, args=(5555,))
			p2 = Process(target=book_receiver, args=(5556,))
			# kraken
			p3 = Process(target=trade_receiver, args=(5557,))
			p4 = Process(target=book_receiver, args=(5558,))
			# poloniex
			# p5 = Process(target=trade_receiver, args=(5559,))
			# p6 = Process(target=book_receiver, args=(5560,))
			# binance
			p7 = Process(target=trade_receiver, args=(5561,))
			p8 = Process(target=book_receiver, args=(5562,))

			p1.start()
			p2.start()
			p3.start()
			p4.start()
			p5.start()
			p6.start()
			p7.start()
			p8.start()

			f = FeedHandler()
			f.add_feed(Coinbase(channels=[L3_BOOK, TRADES], pairs=cbpro_tickers, callbacks={TRADES: TradeZMQ(port=5555), L3_BOOK: BookZMQ(depth=1, port=5556)}))
			f.add_feed(Kraken(channels=[L2_BOOK, TRADES], pairs=kraken_tickers, callbacks={TRADES: TradeZMQ(port=5557), L2_BOOK: BookZMQ(depth=1, port=5558)}))
			# f.add_feed(Poloniex(channels=[L2_BOOK, TRADES], pairs=poloniex_tickers, callbacks={TRADES: TradeZMQ(port=5559), L2_BOOK: BookZMQ(depth=1, port=5560)}))
			f.add_feed(Binance(channels=[L2_BOOK, TRADES], pairs=binance_tickers, callbacks={TRADES: TradeZMQ(port=5561), L2_BOOK: BookZMQ(depth=1, port=5562)}))

			f.run()
		finally:
			p1.terminate()
			p2.terminate()
			p3.terminate()
			p4.terminate()
			# p5.terminate()
			# p6.terminate()
			p7.terminate()
			p8.terminate()

if __name__ == '__main__':
	main()

nbbo source

build NBBO from books rather than tickers. its more accurate and it also would allow the inclusion of any feed that supports books into the nbbo

Import pair_exchange_to_std throws handshake error for Kraken

The below import throws the error:
"requests.exceptions.SSLError: HTTPSConnectionPool(host='api.kraken.com', port=443): Max retries
exceeded with url: /0/public/AssetPairs (Caused by SSLError(SSLError("bad handshake:
SysCallError(10054, 'WSAECONNRESET')",),))"

I listed all the files I'm using that include the import for the file that throws the error.

from cryptofeed.standards import pair_exchange_to_std :
{
from cryptofeed.feed import Feed
from cryptofeed import FeedHandler
from cryptofeed.feedhandler import FeedHandler
from cryptofeed.coinbase.coinbase import Coinbase
from cryptofeed.exchanges import Coinbase
}

It's very possible this is just my environment, or current connection, but it brought me to a question.

Is there a way to disable attempts to connect with exchanges like Kraken that we might not have an interest in accessing, and thereby negate the error?

timestamp for order book

Is it possible to retrieve timestamp of the order book? The code below only seems to return bid/ask price-volumes

f = FeedHandler()
f.add_feed(GDAX(pairs=['BTC-USD', 'ETH-USD'], channels=['full'], callbacks={'book':BookCallback(book)}))
f.run()

BookDelta doesn't support custom callback

I've just begun experimenting with getting updates for a level 2 order book and I've noticed my custom callback for the BOOK_DELTA is not being called.

Looking through the code in the feed.py file I notice that the update callback is treated differently and not called unless it is an instance of BookUpdateCallback.

if callbacks:
for cb in callbacks:
self.callbacks[cb] = callbacks[cb]
if isinstance(callbacks[cb], BookUpdateCallback):
self.do_deltas = True

This means I can't supply my own custom callback in a similar way as to how you have created and used the backend callbacks, I can get around it but I was wondering if there was a reason for this? Perhaps something like this would be more flexible?

                          TICKER: Callback(None),
                          L2_BOOK: Callback(None),
                          L3_BOOK: Callback(None),
                          VOLUME: Callback(None),
                          FUNDING: Callback(None),
			  BOOK_DELTA: Callback(None)}

        if callbacks:
            for cb in callbacks:
                self.callbacks[cb] = callbacks[cb]
                if (cb == BOOK_DELTA):
                    self.do_deltas = True```

binance now supports multiple streams per websocket

need to update the binance subscription code with the new subscription format that allows multiple subscriptions per connection (assuming I've understood their docs correctly):

Combined streams are accessed at /stream?streams=<streamName1>/<streamName2>/<streamName3>
Combined stream events are wrapped as follows: {"stream":"<streamName>","data":<rawPayload>}

GDAX L3 Book Deltas

I'm trying to obtain Book deltas (I want as unprocessed a feed of order book updates as possible). I'm using the following code:

async def bookdelta(feed, pair, delta):
    print('Feed: {} Pair: {} adds: {} dels: {} upd: {}'.format(feed, pair, len(delta[BID][ADD]), len(delta[BID][DEL]), len(delta[BID][UPD]))) 

f.add_feed(GDAX(pairs=['BTC-USD'], channels=[L3_BOOK], callbacks={BOOK_DELTA: BookUpdateCallback(bookdelta)}))

however the callback never seems to get called. Running the L3_BOOK channel with the BookCallback does work but I don't want the updated orderbook, I'm trying to get the individual update messages for the order book.

rest feed api not clear

rest feeds started with a common interface but recent contributions from others have muddied the waters. need to clean up the calls and make them standard across exchanges

Handle ConnectionClosed exceptions

Websocket connections can be closed for a variety of reasons. These should be handled and the connection re-established if possible. Some tracebacks from my server:

Error in data transfer
Traceback (most recent call last):
  File "/home/bryant/anaconda3/envs/cryptofeed/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/protocol.py", line 496, in transfer_data
    msg = yield from self.read_message()
  File "/home/bryant/anaconda3/envs/cryptofeed/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/protocol.py", line 526, in read_message
    frame = yield from self.read_data_frame(max_size=self.max_size)
  File "/home/bryant/anaconda3/envs/cryptofeed/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/protocol.py", line 591, in read_data_frame
    frame = yield from self.read_frame(max_size)
  File "/home/bryant/anaconda3/envs/cryptofeed/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/protocol.py", line 632, in read_frame
    extensions=self.extensions,
  File "/home/bryant/anaconda3/envs/cryptofeed/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/framing.py", line 100, in read
    data = yield from reader(2)
  File "/home/bryant/anaconda3/envs/cryptofeed/lib/python3.6/asyncio/streams.py", line 663, in readexactly
    raise self._exception
  File "/home/bryant/anaconda3/envs/cryptofeed/lib/python3.6/asyncio/selector_events.py", line 723, in _read_ready
    data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
Error in data transfer
Traceback (most recent call last):
  File "/home/bryant/anaconda3/envs/cryptofeed/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/protocol.py", line 496, in transfer_data
    msg = yield from self.read_message()
  File "/home/bryant/anaconda3/envs/cryptofeed/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/protocol.py", line 526, in read_message
    frame = yield from self.read_data_frame(max_size=self.max_size)
  File "/home/bryant/anaconda3/envs/cryptofeed/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/protocol.py", line 591, in read_data_frame
    frame = yield from self.read_frame(max_size)
  File "/home/bryant/anaconda3/envs/cryptofeed/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/protocol.py", line 632, in read_frame
    extensions=self.extensions,
  File "/home/bryant/anaconda3/envs/cryptofeed/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/framing.py", line 100, in read
    data = yield from reader(2)
  File "/home/bryant/anaconda3/envs/cryptofeed/lib/python3.6/asyncio/streams.py", line 663, in readexactly
    raise self._exception
  File "/home/bryant/anaconda3/envs/cryptofeed/lib/python3.6/asyncio/selector_events.py", line 723, in _read_ready
    data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer
Task exception was never retrieved
future: <Task finished coro=<FeedHandler._connect() done, defined at /home/bryant/workspace/cryptofeed/cryptofeed/feedhandler.py:41> exception=ConnectionClosed('WebSocket connection is closed: code = 1006 (connection closed abnormally [internal]), no reason',)>
Traceback (most recent call last):
  File "/home/bryant/workspace/cryptofeed/cryptofeed/feedhandler.py", line 44, in _connect
    await self._handler(websocket, feed.message_handler)
  File "/home/bryant/workspace/cryptofeed/cryptofeed/feedhandler.py", line 47, in _handler
    async for message in websocket:
  File "/home/bryant/anaconda3/envs/cryptofeed/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/py36/protocol.py", line 15, in __aiter__
    yield await self.recv()
  File "/home/bryant/anaconda3/envs/cryptofeed/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/protocol.py", line 323, in recv
    raise ConnectionClosed(self.close_code, self.close_reason)
websockets.exceptions.ConnectionClosed: WebSocket connection is closed: code = 1006 (connection closed abnormally [internal]), no reason
Task exception was never retrieved
future: <Task finished coro=<FeedHandler._connect() done, defined at /home/bryant/workspace/cryptofeed/cryptofeed/feedhandler.py:41> exception=ConnectionClosed('WebSocket connection is closed: code = 1006 (connection closed abnormally [internal]), no reason',)>
Traceback (most recent call last):
  File "/home/bryant/workspace/cryptofeed/cryptofeed/feedhandler.py", line 44, in _connect
    await self._handler(websocket, feed.message_handler)
  File "/home/bryant/workspace/cryptofeed/cryptofeed/feedhandler.py", line 47, in _handler
    async for message in websocket:
  File "/home/bryant/anaconda3/envs/cryptofeed/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/py36/protocol.py", line 15, in __aiter__
    yield await self.recv()
  File "/home/bryant/anaconda3/envs/cryptofeed/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/protocol.py", line 323, in recv
    raise ConnectionClosed(self.close_code, self.close_reason)
websockets.exceptions.ConnectionClosed: WebSocket connection is closed: code = 1006 (connection closed abnormally [internal]), no reason
Task exception was never retrieved
future: <Task finished coro=<FeedHandler._connect() done, defined at /home/bryant/workspace/cryptofeed/cryptofeed/feedhandler.py:41> exception=ConnectionClosed('WebSocket connection is closed: code = 1006 (connection closed abnormally [internal]), no reason',)>
Traceback (most recent call last):
  File "/home/bryant/workspace/cryptofeed/cryptofeed/feedhandler.py", line 44, in _connect
    await self._handler(websocket, feed.message_handler)
  File "/home/bryant/workspace/cryptofeed/cryptofeed/feedhandler.py", line 47, in _handler
    async for message in websocket:
  File "/home/bryant/anaconda3/envs/cryptofeed/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/py36/protocol.py", line 15, in __aiter__
    yield await self.recv()
  File "/home/bryant/anaconda3/envs/cryptofeed/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/protocol.py", line 323, in recv
    raise ConnectionClosed(self.close_code, self.close_reason)
websockets.exceptions.ConnectionClosed: WebSocket connection is closed: code = 1006 (connection closed abnormally [internal]), no reason
Task exception was never retrieved
future: <Task finished coro=<FeedHandler._connect() done, defined at /home/bryant/workspace/cryptofeed/cryptofeed/feedhandler.py:41> exception=ConnectionClosed('WebSocket connection is closed: code = 1001 (going away), reason = CloudFlare WebSocket proxy restarting',)>
Traceback (most recent call last):
  File "/home/bryant/workspace/cryptofeed/cryptofeed/feedhandler.py", line 44, in _connect
    await self._handler(websocket, feed.message_handler)
  File "/home/bryant/workspace/cryptofeed/cryptofeed/feedhandler.py", line 47, in _handler
    async for message in websocket:
  File "/home/bryant/anaconda3/envs/cryptofeed/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/py36/protocol.py", line 15, in __aiter__
    yield await self.recv()
  File "/home/bryant/anaconda3/envs/cryptofeed/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/protocol.py", line 323, in recv
    raise ConnectionClosed(self.close_code, self.close_reason)
websockets.exceptions.ConnectionClosed: WebSocket connection is closed: code = 1001 (going away), reason = CloudFlare WebSocket proxy restarting

Strange bug related to Huobi, only the first trade.detail data can be received

I encountered a very strange bug where the trade.detail could only receive a subscription success message ({'id': 'id10', 'status': 'ok', 'subbed': 'market.ethusdt.trade.detail', 'ts': 1554382490572}) and the first data message ({'ch': 'market.ethusdt.trade.detail', 'ts': 1554382096458, 'tick': {'id': 100807878420, 'ts': 1554382096446, 'data': [{'id': '10080787842028598565963', 'amount': Decimal('0.2'), 'price': Decimal('161.25'), 'direction': 'buy', 'ts': 1554382096446}]}}) after subscribing, and then only ping message ({'ping': 1554382491613}) after that.
Even stranger is the fact that the depth data subscription will receive subsequent messages as normal. I also tried Huobi's official demo https://github.com/huobiapi/Websocket-Python3-demo, and it worked perfectly.

L3 messages feed and storage

If I was looking to store L3 book data (let's assume with Arctic), wouldn't it be more efficient to create and store a stream of standardized delta messages as opposed to the entire book?

I only ask because the book callback takes feed, pair and book as the inputs. Using that callback for book updates would not provide any information about the updates themselves. I guess a user can just define a custom callback for this, but I figured it would make more sense to just have the BookCallback do this if it was meant to be called for book updates as mentioned in the docs. Many exchanges provide multiple updates per second which would result in the entire book being passed around as opposed to just the changed items.

Also, if that change does make sense, then as pointed out in my other recently opened issue regarding dropped messages (#20), we would have to generate the missing messages by diffing our current order book with a fresh snapshot.

GDAX trades channel and trade ids

Hi,

I think the GDAX trades channel should be emulated from the 'ticker' one as it contains price, side and size for the last trade, which is pretty much what Bitfinex gives in their trades channels. It's not convenient to also have to add the oderbook. What I offer would be "aggregated trades" similar to what binance does. The current functionality could be separated as a "granular trades" (binance has that too).

Side note and less important: I think it's a shame to exclude ids from msgs (trade id for example), many implementations might need it (we do). Have a look at what the ccxt lib does, IMO it's a good level of abstraction and standardization but they also leave the raw message in the event for clients that need to specialize.

ConnectionResetError: [Errno 104] Connection Reset by peer

018-05-11 03:21:20,137 : WARNING : Error in data transfer
Traceback (most recent call last):
  File "/home/ubuntu/.local/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/protocol.py", line 496, in transfer_data
    msg = yield from self.read_message()
  File "/home/ubuntu/.local/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/protocol.py", line 526, in read_message
    frame = yield from self.read_data_frame(max_size=self.max_size)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/protocol.py", line 591, in read_data_frame
    frame = yield from self.read_frame(max_size)
  File "/home/ubuntu/.local/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/protocol.py", line 632, in read_frame
    extensions=self.extensions,
  File "/home/ubuntu/.local/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/framing.py", line 100, in read
    data = yield from reader(2)
  File "/home/ubuntu/anaconda3/lib/python3.6/asyncio/streams.py", line 663, in readexactly
    raise self._exception
  File "/home/ubuntu/anaconda3/lib/python3.6/asyncio/selector_events.py", line 723, in _read_ready
    data = self._sock.recv(self.max_size)
ConnectionResetError: [Errno 104] Connection reset by peer


#16 handled the ConnectionClosed exceptions, however streaming and writing Bitmex data has caused the following error after one and half days of keeping connection alive. Some sources say that there is not much that can be done about this error: https://stackoverflow.com/questions/20568216/python-handling-socket-error-errno-104-connection-reset-by-peer?utm_medium=organic&utm_source=google_rich_qa&utm_campaign=google_rich_qa

Was wondering if there is another try-catch block that can be defined in feedhandler.py to handle this.

Fix to provide a correct pair in Binance book feed

Now the order book from binance feed may indicate the full subscription list rather than the updated instrument due to #50

await self.callbacks[L2_BOOK](feed=self.id, pair=self.pairs, book=self.l2_book, timestamp=time.time() * 1000)

According to binance websocket documentation, the instrument name is provided in the "stream"

Kraken L2 delta issue

It looks like L2 book delta not implemented in Kraken (web socket version).
When running the following
f.add_feed(Kraken(book_interval=100, pairs=['BTC-USD'], channels=[L2_BOOK], callbacks={L2_BOOK: BookCallback(book), BOOK_DELTA: BookUpdateCallback(delta)})) in demo_book_delta.py example I'm getting the following exception

File "C:\Users...\cryptofeed\cryptofeed\feedhandler.py", line 159, in _connect
await self._handler(websocket, feed.message_handler, feed.id)
File "C:\Users...\cryptofeed\cryptofeed\feedhandler.py", line 177, in _handler
await handler(message)
File "C:\Users...\cryptofeed\cryptofeed\kraken\kraken.py", line 105, in message_handler
await self._book(msg, self.channel_map[msg[0]][1])
File "C:\Users...\cryptofeed\cryptofeed\kraken\kraken.py", line 94, in _book
await self.book_callback(pair, L2_BOOK, False, False, time.time())
File "C:\Users...\cryptofeed\cryptofeed\feed.py", line 61, in book_callback
await self.callbacks[BOOK_DELTA](feed=self.id, pair=pair, delta=delta, timestamp=timestamp)
File "C:\Users...\cryptofeed\cryptofeed\callback.py", line 82, in call
await self.callback(feed, pair, delta, timestamp)
File "C:/Users/.../cryptofeed/examples/demo_book_delta.py", line 49, in delta
print('delta', len(update['ask']), len(update['bid']))

It looks like the cause in Kraken._book return statement

async def _book(self, msg, pair):
...
    await self.book_callback(pair, L2_BOOK, False, **False**, time.time())

while expected call should be
async def book_callback(self, pair, book_type, forced, **delta**, timestamp):

full channel que

gdax help page for full channel via web-socket recommends to

  1. subscribe to channel
  2. que messages
  3. take a REST snapshot
  4. discard queued messages with id lower then the REST snapshot

just wondering if you do the discarding??

thank you

Trading endpoints

  • Poloniex: buy, sell (buy and sell can be in a single interface), cancel order, return order status, return trade history, account balances
  • Gemini: new, cancel, status, past trades, account balances
  • Coinbase: place order, cancel order, order status, accounts (need the balance information)
  • Kraken: add order, cancel order, trade history, order info, get account balance

demo.py causes error using XBTUSD pair with Funding callback

The following line in the demo.py script under examples directory causes an error

f.add_feed(Bitmex(pairs=['XBTUSD'], channels=[FUNDING, TRADES], callbacks={FUNDING: FundingCallback(funding), TRADES: TradeCallback(trade)}))

It appears to only be looking for pairs that start with a lowercase 'f'

# feed.py
if channels is not None and FUNDING in channels and self.id == BITFINEX:
            if any(map(lambda x: x[0] != 'f', pairs)):
                raise ValueError("Funding channel on bitfinex can be used with funding pairs only")

although I don't see any pairs in the listed bitfinex trading pairs that do start with an 'f'

bitfinex_trading_pairs = {
    'tBTCEUR', 'tBTCUSD', 'tXRPUSD', 'tXRPBTC', 'tETHUSD', 'tETHBTC',
    'tIOTAEUR', 'tIOTAUSD', 'tIOTABTC', 'tIOTAETH', 'tBCHUSD', 'tBCHBTC',
    'tBCHETH', 'tLTCUSD', 'tLTCBTC', 'tEOSUSD', 'tEOSBTC', 'tEOSETH',
    'tSNTUSD', 'tSNTBTC', 'tSNTETH', 'tNEOUSD', 'tNEOBTC', 'tNEOETH',
    'tOMGUSD', 'tOMGBTC', 'tOMGETH', 'tETCUSD', 'tETCBTC', 'tXMRUSD',
    'tXMRBTC', 'tDASHUSD', 'tDASHBTC', 'tZECUSD', 'tZECBTC', 'tBTGUSD',
    'tBTGBTC', 'tGNTUSD', 'tGNTBTC', 'tGNTETH', 'tQTUMUSD', 'tQTUMBTC',
    'tQTUMETH', 'tDATAUSD', 'tDATABTC', 'tDATAETH', 'tSANUSD', 'tSANBTC',
    'tSANETH', 'tYYWUSD', 'tYYWBTC', 'tYYWETH', 'tETPUSD', 'tETPBTC',
    'tETPETH', 'tEDOUSD', 'tEDOBTC', 'tEDOETH', 'tQASHUSD', 'tQASHBTC',
    'tQASHETH', 'tAVTUSD', 'tAVTBTC', 'tAVTETH', 'tRRTUSD', 'tRRTBTC'
}

Also curious as to the decision to put this check for bitfinex in 'feed.py' rather than in 'bitfinex.py'. Seems like the same check could be done there?

dropped messages

What about dropped messages? GDAX mentions:

While a websocket connection is over TCP, the websocket servers receive market data in a manner which can result in dropped messages. Your feed consumer should either be designed to expect and handle sequence gaps and out-of-order messages, or use channels that guarantee delivery of messages.

Certain other exchanges make similar warnings about out of order/dropped messages, although I cannot currently recall which ones exactly.

The message_handler currently only checks if msg_seq < last_seen_seq. Unless I am missing something, it would seem this can result in an inaccurate order book if msg_seq > last_seen_seq + 1. The simple solution here is to call _book_snapshot upon detecting this.

So in message_handler:

            last_seen_seq = self.seq_no[msg['product_id']]
            msg_seq = msg['sequence']
            if msg_seq < last_seen_seq:
                # out of order message, ignore
                return
            elif msg_seq > (last_seen_seq + 1):
                LOG.warning(
                    'Missing messages for {} (Last seq: {}, received seq: {}). Requesting new snapshot'.format(
                        msg['product_id'],
                        last_seen_seq,
                        msg_seq
                    )
                )
                await self._book_snapshot()  # rebuild book after missing msg detected
                return

This may also have to be considered in the case of dropped connections which result in missed messages, as referenced in #16 .

rest api will retry forever

two issues currently:

  • rest api will retry forever. Need to specify a max retry count
  • timeout unspecified (currently will timeout when SSL connection timesout). Should allow user to specify a timeout

Async I/O Pausing Feed

I'm currently working on some code to save Level 2 information to a local Postgres database. Each update to the order book comes with a lot of information so I definitely need to do some work on compressing this and speeding up the I/O operation but I have noticed that the feed is not accepting or queuing updates while the operation is being processed.

I've discovered this comment:

# Examples of some handlers for different updates. These currently don't do much.
# Handlers should conform to the patterns/signatures in callback.py
# Handlers can be normal methods/functions or async. The feedhandler is paused
# while the callbacks are being handled (unless they in turn await other functions or I/O)
# so they should be as lightweight as possible

https://github.com/bmoscon/cryptofeed/blob/master/examples/demo.py

I was wondering what is meant by: (unless they in turn await other functions or I/O)? My code is using asyncpgto write to the database. The feed and the method calling it are also async so am I right in thinking it should not be paused?

Apologies for what is most likely a basic question, I'm fairly new to Python and it seems Coinbase updates are multiple times a second so I might have to save a snapshot of the orderbook to the database at more feasible schedules.

more granular configurations

currently you can only specify channels and pairs for each exchange, and the channels are configured with all the specified pairs. Should be modified so you can configure, as en example. trade data for BTC-USD and ETH-USD and BTC-USD book data on one exchange, with one single exchange class.

bitmex reconnect errors

2018-05-17 08:11:58,571 : WARNING : Feed BITMEX encountered connection issue WebSocket connection is closed: code = 1006 (connection closed abnormally [internal]), no reason - reconnecting...
2018-05-17 16:55:50,276 : WARNING : Feed BITMEX encountered connection issue WebSocket connection is closed: code = 1006 (connection closed abnormally [internal]), no reason - reconnecting...
2018-05-17 16:55:55,165 : ERROR : Task exception was never retrieved
future: <Task finished coro=<FeedHandler._connect() done, defined at /home/bryant/workspace/cryptofeed/cryptofeed/feedhandler.py:57> exception=KeyError(8799142950,)>
Traceback (most recent call last):
  File "/home/bryant/workspace/cryptofeed/cryptofeed/feedhandler.py", line 64, in _connect
    await self._handler(websocket, feed.message_handler)
  File "/home/bryant/workspace/cryptofeed/cryptofeed/feedhandler.py", line 74, in _handler
    await handler(message)
  File "/home/bryant/workspace/cryptofeed/cryptofeed/bitmex/bitmex.py", line 116, in message_handler
    await self._book(msg)
  File "/home/bryant/workspace/cryptofeed/cryptofeed/bitmex/bitmex.py", line 84, in _book
    price, _ = self.order_id[pair][data['id']]
KeyError: 8799142950
2018-05-17 16:55:55,169 : ERROR : Task exception was never retrieved
future: <Task finished coro=<FeedHandler._connect() done, defined at /home/bryant/workspace/cryptofeed/cryptofeed/feedhandler.py:57> exception=InvalidStatusCode('Status code not 101: 503',)>
Traceback (most recent call last):
  File "/home/bryant/workspace/cryptofeed/cryptofeed/feedhandler.py", line 62, in _connect
    async with websockets.connect(feed.address) as websocket:
  File "/home/bryant/anaconda3/envs/cryptofeed/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/client.py", line 386, in __aenter__
    return (yield from self)
  File "/home/bryant/anaconda3/envs/cryptofeed/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/client.py", line 400, in __await__
    extra_headers=protocol.extra_headers,
  File "/home/bryant/anaconda3/envs/cryptofeed/lib/python3.6/site-packages/websockets-4.0.1-py3.6-linux-x86_64.egg/websockets/client.py", line 263, in handshake
    raise InvalidStatusCode(status_code)
websockets.exceptions.InvalidStatusCode: Status code not 101: 503

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.