Infrastructure
Market Data Infrastructure: WebSocket Patterns That Scale
Deep dive into WebSocket reliability, sequence gap detection, Kubernetes patterns, and monitoring for multi-exchange market data systems.
Exchanges send orderbook updates at microsecond intervals. If your infrastructure can’t keep up, your trading system is flying blind.
I’ve built market data infrastructure handling 100K+ messages/second across 12+ exchanges. The difference between “it works” and “production-ready” is infrastructure reliability-not algorithm cleverness.
This post covers the infrastructure patterns: WebSocket reliability, Kubernetes deployment, monitoring, and recovery.
The Problem {#the-problem}
Market data infrastructure challenges:
| Challenge | Impact |
|---|---|
| WebSocket disconnect | Silent, stale data |
| Sequence gaps | Wrong orderbook state |
| Rate limits | Forced reconnection |
| Multi-exchange | 12+ connections, each different |
| High message rates | 10K-100K msgs/sec peak |
The mistake: Teams focus on parsing speed while WebSocket connections silently die. Your 1µs parser is useless if the data is 10 seconds stale.
For kernel-level optimization, see Network Deep Dive. For monitoring patterns, see Trading Metrics.
WebSocket Reliability Patterns {#websocket}
The Core Problem
WebSockets silently disconnect. TCP keepalive isn’t reliable for application-level health. Exchanges can:
- Rate-limit you
- Drop connections without FIN
- Send stale data during issues
Pattern 1: Heartbeat Monitoring
import asyncio
import websockets
import time
from prometheus_client import Gauge, Counter
WS_LATENCY = Gauge('md_heartbeat_latency_ms', 'WebSocket heartbeat latency', ['exchange'])
LAST_MESSAGE = Gauge('md_last_message_timestamp', 'Last message Unix timestamp', ['exchange'])
RECONNECTS = Counter('md_reconnects_total', 'WebSocket reconnections', ['exchange'])
class ResilientWebSocket:
def __init__(self, url: str, exchange: str, heartbeat_interval: float = 30):
self.url = url
self.exchange = exchange
self.heartbeat_interval = heartbeat_interval
self.ws = None
self.last_message_time = 0
async def connect(self):
while True:
try:
async with websockets.connect(
self.url,
ping_interval=20,
ping_timeout=10,
close_timeout=5
) as ws:
self.ws = ws
await self._run_loop()
except Exception as e:
RECONNECTS.labels(exchange=self.exchange).inc()
await asyncio.sleep(self._backoff())
async def _run_loop(self):
heartbeat_task = asyncio.create_task(self._heartbeat_loop())
staleness_task = asyncio.create_task(self._staleness_check())
try:
async for message in self.ws:
self.last_message_time = time.time()
LAST_MESSAGE.labels(exchange=self.exchange).set(self.last_message_time)
await self._process_message(message)
finally:
heartbeat_task.cancel()
staleness_task.cancel()
async def _heartbeat_loop(self):
while True:
await asyncio.sleep(self.heartbeat_interval)
start = time.time()
try:
pong = await self.ws.ping()
await asyncio.wait_for(pong, timeout=10)
latency = (time.time() - start) * 1000
WS_LATENCY.labels(exchange=self.exchange).set(latency)
except:
raise Exception("Heartbeat failed")
async def _staleness_check(self):
while True:
await asyncio.sleep(5)
if time.time() - self.last_message_time > 10:
raise Exception("Stale connection")
def _backoff(self):
# Exponential backoff: 1s, 2s, 4s, 8s, max 30s
return min(30, 2 ** self._reconnect_attempts)
Pattern 2: Connection Pool
Multiple connections for redundancy:
class ConnectionPool:
def __init__(self, url: str, exchange: str, pool_size: int = 2):
self.connections = [
ResilientWebSocket(url, exchange, f"{exchange}-{i}")
for i in range(pool_size)
]
self.active = 0
async def start(self):
tasks = [conn.connect() for conn in self.connections]
await asyncio.gather(*tasks)
def get_best_message(self, messages: list):
# Use message with highest sequence number
return max(messages, key=lambda m: m.get('sequence', 0))
Kubernetes Deployment {#kubernetes}
Fault Isolation: Per-Exchange Deployments
# binance-connector.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: md-binance
namespace: market-data
spec:
replicas: 2 # Hot standby
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 0 # Zero downtime
maxSurge: 1
selector:
matchLabels:
app: market-data
exchange: binance
template:
metadata:
labels:
app: market-data
exchange: binance
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9090"
spec:
terminationGracePeriodSeconds: 30
containers:
- name: connector
image: market-data-connector:v1.2.3
env:
- name: EXCHANGE
value: "binance"
- name: SYMBOLS
value: "BTCUSDT,ETHUSDT,SOLUSDT"
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
resources:
requests:
cpu: "500m"
memory: "512Mi"
limits:
cpu: "1000m"
memory: "1Gi"
ports:
- containerPort: 9090
name: metrics
readinessProbe:
httpGet:
path: /ready
port: 9090
periodSeconds: 2
failureThreshold: 3
livenessProbe:
httpGet:
path: /health
port: 9090
initialDelaySeconds: 30
periodSeconds: 10
Terraform for Multi-Exchange
locals {
exchanges = {
binance = { ws = "wss://stream.binance.com:9443", symbols = ["BTCUSDT", "ETHUSDT"] }
coinbase = { ws = "wss://ws-feed.exchange.coinbase.com", symbols = ["BTC-USD", "ETH-USD"] }
kraken = { ws = "wss://ws.kraken.com", symbols = ["XBT/USD", "ETH/USD"] }
}
}
resource "kubernetes_deployment" "market_data" {
for_each = local.exchanges
metadata {
name = "md-${each.key}"
namespace = "market-data"
}
spec {
replicas = 2
selector {
match_labels = {
app = "market-data"
exchange = each.key
}
}
template {
metadata {
labels = {
app = "market-data"
exchange = each.key
}
}
spec {
container {
name = "connector"
image = "market-data-connector:v1.2.3"
env {
name = "EXCHANGE"
value = each.key
}
env {
name = "WS_ENDPOINT"
value = each.value.ws
}
env {
name = "SYMBOLS"
value = join(",", each.value.symbols)
}
}
}
}
}
}
For StatefulSet patterns (when needed), see Kubernetes for Trading.
Sequence Gap Detection {#sequence}
The Problem
Missing a delta update means wrong orderbook state. You buy based on a stale price.
Implementation
from prometheus_client import Counter
SEQUENCE_GAPS = Counter('md_sequence_gaps_total', 'Sequence gap events', ['exchange', 'symbol'])
class OrderbookManager:
def __init__(self, exchange: str, symbol: str):
self.exchange = exchange
self.symbol = symbol
self.expected_sequence = 0
self.book = {'bids': {}, 'asks': {}}
self.recovering = False
async def handle_message(self, msg: dict):
seq = msg.get('sequence', msg.get('u', msg.get('lastUpdateId')))
if self.expected_sequence == 0:
# First message, just store
self.expected_sequence = seq
return await self._apply_snapshot(msg)
if seq != self.expected_sequence + 1:
# Gap detected
SEQUENCE_GAPS.labels(
exchange=self.exchange,
symbol=self.symbol
).inc()
await self._resync()
return
await self._apply_delta(msg)
self.expected_sequence = seq
async def _resync(self):
if self.recovering:
return
self.recovering = True
try:
snapshot = await self.client.get_depth(self.symbol)
self.book = self._parse_snapshot(snapshot)
self.expected_sequence = snapshot['lastUpdateId']
finally:
self.recovering = False
def _apply_delta(self, delta: dict):
for price, qty in delta.get('bids', []):
if qty == 0:
self.book['bids'].pop(price, None)
else:
self.book['bids'][price] = qty
# Same for asks...
Recovery Strategies
| Strategy | When to Use |
|---|---|
| Full snapshot | Single gap, quick recovery |
| Reconnect | Persistent gaps |
| Switch exchange | Exchange issues |
Monitoring {#monitoring}
Essential Metrics
from prometheus_client import Histogram, Gauge, Counter
# Message processing latency
MSG_LATENCY = Histogram(
'md_message_latency_seconds',
'Message processing time',
['exchange'],
buckets=[0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05]
)
# Message rate
MSG_RATE = Counter(
'md_messages_total',
'Total messages processed',
['exchange', 'type']
)
# Connection state
CONNECTION_STATE = Gauge(
'md_connection_state',
'WebSocket state (1=connected, 0=disconnected)',
['exchange']
)
# Orderbook depth
BOOK_DEPTH = Gauge(
'md_orderbook_depth',
'Number of price levels',
['exchange', 'symbol', 'side']
)
Grafana Dashboard
{
"title": "Market Data Infrastructure",
"panels": [
{
"title": "Message Rate by Exchange",
"type": "graph",
"targets": [{
"expr": "rate(md_messages_total[1m])"
}]
},
{
"title": "Sequence Gaps (1h)",
"type": "stat",
"targets": [{
"expr": "increase(md_sequence_gaps_total[1h])"
}]
},
{
"title": "Connection State",
"type": "table",
"targets": [{
"expr": "md_connection_state"
}]
},
{
"title": "Message Latency P99",
"type": "stat",
"targets": [{
"expr": "histogram_quantile(0.99, rate(md_message_latency_seconds_bucket[5m]))"
}]
}
]
}
For complete monitoring patterns, see Trading Metrics.
AWS Network Optimization {#aws}
Placement for Low Latency
resource "aws_placement_group" "market_data" {
name = "market-data-cluster"
strategy = "cluster"
}
resource "aws_eks_node_group" "market_data" {
cluster_name = aws_eks_cluster.main.name
node_group_name = "market-data"
instance_types = ["c6in.xlarge"] # Network-optimized
scaling_config {
desired_size = 3
min_size = 2
max_size = 5
}
# Use placement group
launch_template {
id = aws_launch_template.market_data.id
version = "$Latest"
}
}
resource "aws_launch_template" "market_data" {
name_prefix = "market-data-"
instance_type = "c6in.xlarge"
placement {
group_name = aws_placement_group.market_data.name
}
}
For kernel-level network tuning, see Network Deep Dive.
Design Philosophy {#design-philosophy}
Reliability Over Speed
Market data parsing can be sub-microsecond. Doesn’t matter if:
- WebSocket disconnected 10 seconds ago
- Sequence gaps corrupted your orderbook
- Rate limits prevented reconnection
Priority order:
- Reliability: Data is current and correct
- Completeness: No gaps in sequence
- Speed: Fast processing
Fault Isolation
Each exchange is different:
- Different rate limits
- Different message formats
- Different failure modes
Design principle: Binance down shouldn’t affect Coinbase. Separate deployments, separate failure domains.
Audit Your Infrastructure
Sub-millisecond orderbook reconstruction requires a properly tuned kernel. Run latency-audit to check your Linux settings before optimizing your matching engine.
pip install latency-audit && latency-audit Up Next in Linux Infrastructure Deep Dives
Measuring Latency in the Browser: A Meta Demo
The RTT badge on this site measures real latency. Here's how, using the Performance API, Vercel edge, and the same principles that apply to trading.
Reading Path
Continue exploring with these related deep dives:
| Topic | Next Post |
|---|---|
| NIC offloads, IRQ affinity, kernel bypass | Network Optimization: Kernel Bypass and the Art of Busy Polling |
| THP, huge pages, memory locking, pre-allocation | Memory Tuning for Low-Latency: The THP Trap and HugePage Mastery |
| Measuring without overhead using eBPF | eBPF Profiling: Nanoseconds Without Adding Any |
| Design philosophy & architecture decisions | Trading Infrastructure: First Principles That Scale |
| SLOs, metrics that matter, alerting | Trading Metrics: What SRE Dashboards Miss |