Build a Real-time Market Monitor with Bloomberg, xbbg, Redis, FastAPI, WebSocket and ag-Grid

Market Monitor

This article introduces a minimum example of a real-time market monitor developed by below frameworks / packages (source code not included):

  • xbbg is an intuitive Bloomberg python API.
  • Redis is an open source, in-memory data structure store, used as a database, cache and message broker.
  • FastAPI is a modern, fast, web framework to build APIs.
  • WebSocket communicates between the front-end and the back-end.
  • ag-Grid is a high-performance grid UI with all major Javascript frameworks supported, including Javascript, React, Vue, and Angular.

Data Feed Pipeline

The core of the design is the Pub/Sub system.

First, start the Redis server (check this article for details of how to compile from source in Windows):

redis-server redis.conf

Use xbbg for Bloomberg real-time data subscription: blp.live yields a python dict asynchronously (starting from 0.7.2). tickers is the list of tickers to monitor and info (case insensitive) is the list fields to watch. Live data feeds will be serialized by orjson and sent to Redis thru publish:

[Jupyter / IPython console]
import redis
import orjson
from xbbg import blp
r = redis.Redis()
async for data in blp.live(tickers, info=info):
r.publish(channel, orjson.dumps(data))

In FastAPI, subscribe to the Redis channel and start receiving data:

[app.py]
import redis
async def listen():
ps = redis.Redis().pubsub()
ps.subscribe(channel)
while True:
for msg in ps.listen():
yield orjson.loads(msg['data'])

and use WebSocket to sent to the back-end thru path /stream/{channel}. This path enables a two-way communication channel between the FastAPI back-end and the ag-Grid front-end:

[app.py]
from fastapi import FastAPI, WebSocket
app = FastAPI()@app.websocket('/stream/{channel}')
async def stream_data(websocket: WebSocket, channel: str):
await websocket.accept()
async for data in listen(channel=channel):
if websocket.client_state.CONNECTED:
await websocket.send_json(data)

FastAPI is not only choice — Node.js for example can achieve the same goal:

[monitor.js]
let subscriber = require('redis').createClient();
const io = require('socket.io')(server, options)
subscriber.on('message', function (channel, message) {
io.sockets.emit(channel, message)
})
subscriber.subscribe('feeds')

The final step, ag-Grid front-end receives data from the WebSocket channel (corresponded to the FastAPI path defined above). ag-Grid uses a list of dictionaries to generate the row data (column definitions are declared separately). Each row is a dictionary, with the key id (in our case, ticker) as the unique identifier for ag-Grid — it knows which exact row to update. If the the. The update is done asynchronously for the high-frequency data (not high enough in the context of trading of course):

[monitor.js]
let ws = new WebSocket(`ws://${host}:${port}/stream/${channel}`)
ws.onmessage = function(event) {
let data = JSON.parse(event.data)
gridOptions.api.applyTransactionAsync({ update: [data] })
}

Remember to close connection before closing:

[monitor.js]
window.onbeforeunload = function() {
ws.onclose = function () {}
ws.close()
}

Building the Monitor

Having learnt the pipeline of data feeds, next is to build the infrastructure of the monitor.

FastAPI is fairly easy to setup a server:

  • Jinja template is used to generate homepage from .html file. In our example, it returns main.html in the templates folder.
  • StaticFiles mounts the local drive for server to recognize — in the main.html, we can reference local files by app/scripts/example.js.
  • CORS allows the communications between the front-end and back-end for WebSocket / socket.io.
[app.py]
from fastapi import FastAPI, WebSocket, Request
from fastapi.templating import Jinja2Templates
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
import ostemplates = Jinja2Templates(directory='templates')app = FastAPI()
app.mount(
'/app',
StaticFiles(
directory='/'.join(
os.path.abspath(__file__)
.replace('\\', '/')
.split('/')[:-1]
)
),
name='app',
)
app.add_middleware(
CORSMiddleware,
allow_origins=['*'],
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*'],
)
# Tickers to watch
tickers = ['ESA Index', 'NQA Index', 'COA Comdty', 'UXA Index']
@app.get('/')
async def home(request: Request):
return templates.TemplateResponse('markets.html', {
'request': request,
'tickers': tickers,
})

In main.html we need to include the ag-Grid scripts, the placeholder for the monitor <div id='monitor'/>, and the scripts monitor.js that actually generates the monitor:

[main.html]
<script src='https://unpkg.com/ag-grid-community/dist/ag-grid-community.min.noStyle.js'></script>
<link rel='stylesheet' href='https://unpkg.com/ag-grid-community/dist/styles/ag-grid.min.css'>
<link rel='stylesheet' href='https://unpkg.com/ag-grid-community/dist/styles/ag-theme-alpine.min.css'>
<div id='tickers'>
<!-- Tickers is passed thru Jinja templates -->
{% for ticker in tickers %}
<span id='{{ ticker }}' style='display:None'></span>
{% endfor %}
</div>
<div id='monitor' class='ag-theme-alpine'></div>
<script src='/app/templates/monitor.js'></script>

To generate the monitor, we need to 1) define the columns, 2) initiate with empty data set, and 3) add subscriber to update the real-time data. Step 1&2 is below and Step 3 is mentioned in the data pipeline above.

[monitor.js]
let columnDefs = [
{
headerName: 'Ticker',
field: 'TICKER',
},
{
headerName: 'Time',
field: 'TIME',
},
{
headerName: 'Price',
field: 'LAST_PRICE',
cellClass: 'number',
cellRenderer: 'agAnimateShowChangeCellRenderer',
valueFormatter: params => params.value.toFixed(2)
.toString().replace(/\B(?=(\d{3})+(?!\d))/g, ","),
},
{
headerName: 'Change',
field: 'RT_PX_CHG_PCT_1D',
cellClass: 'number',
},
]

let tickers = Array.from(
document.querySelector('#tickers').querySelectorAll('span')
).map(span => span.id)
let globalRowData = tickers.map(ticker => {
return {
TICKER: ticker,
LAST_PRICE: NaN,
TIME: '',
RT_PX_CHG_PCT_1D: NaN,
}
})

let gridOptions = {
columnDefs: columnDefs,
animateRows: true,
asyncTransactionWaitMillis: 100,
// Use `TICKER` as row reference for grid updates
getRowNodeId: function (data) {
return data.TICKER
},
onGridReady: function (params) {
params.api.setRowData(globalRowData)
},
}

document.addEventListener('DOMContentLoaded', function() {
new agGrid.Grid(document.querySelector('#monitor'), gridOptions)
})

Putting everything together, start the FastAPI app:

[cmd]
uvicorn app:app --reload

Test the app by publishing random data to Redis or directly from blp.live as mentioned above:

[Jupyter / IPython console]
import redis
import orjson
redis.Redis().publish('feeds', orjson.dumps({'TICKER': 'ESA Index', 'LAST_PRICE': 3600.0}))