Build a Real-time Market Monitor with Bloomberg, xbbg, Redis, FastAPI, WebSocket and ag-Grid
This article introduces a minimum example of a real-time market monitor developed by below frameworks / packages (source code not included):
xbbg
is an intuitive Bloomberg python API.- Redis is an open source, in-memory data structure store, used as a database, cache and message broker.
- FastAPI is a modern, fast, web framework to build APIs.
- WebSocket communicates between the front-end and the back-end.
- ag-Grid is a high-performance grid UI with all major Javascript frameworks supported, including Javascript, React, Vue, and Angular.
Data Feed Pipeline
The core of the design is the Pub/Sub system.
First, start the Redis server (check this article for details of how to compile from source in Windows):
redis-server redis.conf
Use xbbg
for Bloomberg real-time data subscription: blp.live
yields a python dict
asynchronously (starting from 0.7.2
). tickers
is the list of tickers to monitor and info
(case insensitive) is the list fields to watch. Live data feeds will be serialized by orjson
and sent to Redis thru publish
:
[Jupyter / IPython console]
import redis
import orjson
from xbbg import blpr = redis.Redis()
async for data in blp.live(tickers, info=info):
r.publish(channel, orjson.dumps(data))
In FastAPI, subscribe to the Redis channel and start receiving data:
[app.py]
import redisasync def listen():
ps = redis.Redis().pubsub()
ps.subscribe(channel)
while True:
for msg in ps.listen():
yield orjson.loads(msg['data'])
and use WebSocket to sent to the back-end thru path /stream/{channel}
. This path enables a two-way communication channel between the FastAPI back-end and the ag-Grid front-end:
[app.py]
from fastapi import FastAPI, WebSocketapp = FastAPI()@app.websocket('/stream/{channel}')
async def stream_data(websocket: WebSocket, channel: str):
await websocket.accept()
async for data in listen(channel=channel):
if websocket.client_state.CONNECTED:
await websocket.send_json(data)
FastAPI is not only choice — Node.js for example can achieve the same goal:
[monitor.js]
let subscriber = require('redis').createClient();
const io = require('socket.io')(server, options)subscriber.on('message', function (channel, message) {
io.sockets.emit(channel, message)
})
subscriber.subscribe('feeds')
The final step, ag-Grid front-end receives data from the WebSocket channel (corresponded to the FastAPI path defined above). ag-Grid uses a list of dictionaries to generate the row data (column definitions are declared separately). Each row is a dictionary, with the key id
(in our case, ticker) as the unique identifier for ag-Grid — it knows which exact row to update. If the the. The update is done asynchronously for the high-frequency data (not high enough in the context of trading of course):
[monitor.js]
let ws = new WebSocket(`ws://${host}:${port}/stream/${channel}`)
ws.onmessage = function(event) {
let data = JSON.parse(event.data)
gridOptions.api.applyTransactionAsync({ update: [data] })
}
Remember to close connection before closing:
[monitor.js]
window.onbeforeunload = function() {
ws.onclose = function () {}
ws.close()
}
Building the Monitor
Having learnt the pipeline of data feeds, next is to build the infrastructure of the monitor.
FastAPI is fairly easy to setup a server:
- Jinja template is used to generate homepage from .html file. In our example, it returns
main.html
in thetemplates
folder. StaticFiles
mounts the local drive for server to recognize — in themain.html
, we can reference local files byapp/scripts/example.js
.- CORS allows the communications between the front-end and back-end for WebSocket / socket.io.
[app.py]
from fastapi import FastAPI, WebSocket, Request
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddlewareimport ostemplates = Jinja2Templates(directory='templates')app = FastAPI()
app.mount(
'/app',
StaticFiles(
directory='/'.join(
os.path.abspath(__file__)
.replace('\\', '/')
.split('/')[:-1]
)
),
name='app',
)
app.add_middleware(
CORSMiddleware,
allow_origins=['*'],
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*'],
)# Tickers to watch
tickers = ['ESA Index', 'NQA Index', 'COA Comdty', 'UXA Index']@app.get('/')
async def home(request: Request):
return templates.TemplateResponse('markets.html', {
'request': request,
'tickers': tickers,
})
In main.html
we need to include the ag-Grid scripts, the placeholder for the monitor <div id='monitor'/>
, and the scripts monitor.js
that actually generates the monitor:
[main.html]
<script src='https://unpkg.com/ag-grid-community/dist/ag-grid-community.min.noStyle.js'></script>
<link rel='stylesheet' href='https://unpkg.com/ag-grid-community/dist/styles/ag-grid.min.css'>
<link rel='stylesheet' href='https://unpkg.com/ag-grid-community/dist/styles/ag-theme-alpine.min.css'><div id='tickers'>
<!-- Tickers is passed thru Jinja templates -->
{% for ticker in tickers %}
<span id='{{ ticker }}' style='display:None'></span>
{% endfor %}
</div><div id='monitor' class='ag-theme-alpine'></div>
<script src='/app/templates/monitor.js'></script>
To generate the monitor, we need to 1) define the columns, 2) initiate with empty data set, and 3) add subscriber to update the real-time data. Step 1&2 is below and Step 3 is mentioned in the data pipeline above.
[monitor.js]
let columnDefs = [
{
headerName: 'Ticker',
field: 'TICKER',
},
{
headerName: 'Time',
field: 'TIME',
},
{
headerName: 'Price',
field: 'LAST_PRICE',
cellClass: 'number',
cellRenderer: 'agAnimateShowChangeCellRenderer',
valueFormatter: params => params.value.toFixed(2)
.toString().replace(/\B(?=(\d{3})+(?!\d))/g, ","),
},
{
headerName: 'Change',
field: 'RT_PX_CHG_PCT_1D',
cellClass: 'number',
},
]
let tickers = Array.from(
document.querySelector('#tickers').querySelectorAll('span')
).map(span => span.id)
let globalRowData = tickers.map(ticker => {
return {
TICKER: ticker,
LAST_PRICE: NaN,
TIME: '',
RT_PX_CHG_PCT_1D: NaN,
}
})
let gridOptions = {
columnDefs: columnDefs,
animateRows: true,
asyncTransactionWaitMillis: 100,
// Use `TICKER` as row reference for grid updates
getRowNodeId: function (data) {
return data.TICKER
},
onGridReady: function (params) {
params.api.setRowData(globalRowData)
},
}
document.addEventListener('DOMContentLoaded', function() {
new agGrid.Grid(document.querySelector('#monitor'), gridOptions)
})
Putting everything together, start the FastAPI app:
[cmd]
uvicorn app:app --reload
Test the app by publishing random data to Redis or directly from blp.live
as mentioned above:
[Jupyter / IPython console]
import redis
import orjsonredis.Redis().publish('feeds', orjson.dumps({'TICKER': 'ESA Index', 'LAST_PRICE': 3600.0}))
Title graph credits: Meet MAT, Low Poly Tree, Low Poly Fox, Low Poly Rock, Low Poly Cloud, Shiba, and Among Us Astronaut.