Tracking Cryptocurrency Exchange Trades with Google Cloud Platform in Real-Time
Today’s financial world is complex, and the old technology used for constructing financial data pipelines isn’t keeping up. With multiple financial exchanges operating around the world and global user demand, these data pipelines have to be fast, reliable and scalable.
Currently, using an econometric approach—applying models to financial data to forecast future trends—doesn’t work for real-time financial predictions. And data that’s old, inaccurate or from a single source doesn’t translate into dependable data for financial institutions to use. But building pipelines with Google Cloud Platform (GCP) can solve some of these key challenges. In this post, we’ll describe how to build a pipeline to predict financial trends in microseconds. We’ll walk through how to set up and configure a pipeline for ingesting real-time, time-series data from various financial exchanges and how to design a suitable data model, which facilitates querying and graphing at scale.
You’ll find a tutorial below on setting up and deploying the proposed architecture using GCP, particularly these products:
- Cloud Dataflow for scalable data ingestion system that can handle late data
- Cloud Bigtable, our scalable, low-latency time series database that’s reached 40 million transactions per second on 3500 nodes. Bonus: Scalable ML pipeline using Tensorflow eXtended, while not part of this tutorial, is a logical next step.
The tutorial will explain how to establish a connection to multiple exchanges, subscribe to their trade feed, and extract and transform these trades into a flexible format to be stored in Cloud Bigtable and be available to be graphed and analyzed.
This will also set the foundation for ML online learning predictions at scale. You’ll see how to graph the trades, volume, and time delta from trade execution until it reaches our system (an indicator of how close to real time we can get the data). You can find more details on GitHub too.
Requirements / Solutions
The typical requirement for trading systems is low latency data ingestion, and for this lab is extended with near real-time data storage and querying at scale. You will learn the following from this lab:
Ingest real-time trading data with low latency from globally scattered datasources / exchanges. Possibility to adopt data ingest worker pipeline location. Easily add additional trading pairs / exchanges. Solution: Dataflow + Xchange Reactive Websockets Framework
Demonstrate an unbounded streaming source code that is runnable with multiple runners. Solution: Apache BEAM
Strong consistency + linear scalability + super low latency for querying the trading data. Solution: Bigtable
Architecture/How it works
The code can be divided into five main framework units:
- Data ingestion — XChange Stream framework (Github link) Java library providing a simple and consistent streaming API for interacting with Bitcoin and other cryptocurrency exchanges via WebSocket protocol. XChange library is providing new interfaces for streaming API. User can subscribe for live updates via reactive streams of RxJava library. We use this JAVA 8 framework to connect and configure some exchanges (BitFinex, Poloniex, BitStamp, OkCoin, Gemini, HitBTC, Binance...). Link to the exchange / trading pair configuration code
- Parallel processing — Apache Beam (Github link) Apache Beam is an open source unified programming model to define and execute data processing pipelines, including ETL, batch, and stream (continuous) processing. Supported runners: Apache Apex, Apache Flink, Apache Gearpump, Apache Samza, Apache Spark, and Google Cloud Dataflow. You will learn how to create an unbounded streaming source/reader and manage basic watermarking, checkpointing, and record ID for data ingestion. Link to the bridge between BEAM and XChange Stream framework
- BigTable sink — Cloud Bigtable with Beam using the HBase API. (Github link) Connector and writer to Bigtable. You will see how to create a row key and create a Bigtable mutation function prior to writing to Bigtable. Link to the BigTable key creation / mutation function
- Realtime API endpoint — Flask web server at port:5000 + BigTable client (GitHub link) will be used to query the Bigtable and serve as API endpoint. Link to the BigTable query builder + results retrieval and sampling
Flask web server will be run in the GCP VM instance
For every exchange + trading pair, a different pipeline instance is created. The pipeline consists of 3 steps:
- UnboundedStreamingSource that contains ‘Unbounded Streaming Source Reader' (bitStamp2)
- BigTable pre-writing mutation and key definition (ETH-USD Mut2)
- BigTable write step (ETH-USD2)
Bigtable row key design decisions
The DTO for this lab looks like this:
The row key structure is formulated in the following way:
E.g: a row key might look like BTC/USD#Bitfinex#1546547940918#63187358085
BTC/USD — Trading Pair
Bitfinex — Exchange
1546547940918 — Epoch timestamp ( more info)
63187358085 — System Nano time ( more info)
Why is nanotime added at the key end?
Nanotime is used to avoid multiple versions per row for different trades. Two DoFn mutations might execute in the same Epoch ms time if there is a streaming sequence of TradeLoad DTOs.
NanoTime at the end will split Millisecond to an additional one million.
In your own environment, if this is not enough, you can hash the volume / price ratio and attach the hash at the end of the row key.
Row cells will contain an exact schema replica of the exchange TradeLoad DTO (see earlier in the table above). This choice will help you go from a specific (trading pair) — (exchange) to less specific (timestamp — nanotime) and avoid hotspots when you query the data.
Join Qwiklabs to read the rest of this lab...and more!
- Get temporary access to the Google Cloud Console.
- Over 200 labs from beginner to advanced levels.
- Bite-sized so you can learn at your own pace.
Create a virtual machine to perform the creation of the pipeline and use as your website.
Install java, git, maven, pip, python 2.7, cloud bigtable command line
Create the bigtable instance.
Create a Google Cloud Storage bucket
Run the daraflow pipeline
Create a firewall rule to allow tcp:5000 for visualization.