Real-time aggregate of multiple connectors¶

1- Import the data¶

This ran locally for about 2 days on a couple of pairs and multiple exchanges. The API used is CCXT watch_ticker. As an improvement, watch_trades would be more appropriate and would also track the volume.

In [1]:
import pandas as pd

data = pd.read_parquet('conector_latency_and_true_price.parquet')

2- Data overview (5 minutes downsample)¶

In [2]:
import datetime
import plotly.express as px

downsampled = (
    data.set_index("timestamp", drop=False)
    .groupby(["connector", "symbol"])
    .resample("5T")
    .last()
)

for symbol in data.symbol.unique():
    symbol_data = downsampled[downsampled.symbol == symbol]
    display(
        px.line(
            symbol_data,
            x="timestamp",
            y="price",
            color="connector",
            title=symbol
        )
    )

3- Latency¶

In [3]:
import plotly.express as px

# Per-connector latency
data["connector_latency_ms"] = (
    data["fetch_timestamp"] - data["timestamp"]
).dt.total_seconds() * 1000

# display(
#     px.box(
#         data,
#         x="connector",
#         y="connector_latency_ms",
#         log_y=True,
#         title="Fetch latency by connector, log scale",
#         labels={
#             "connector": "Exchange",
#             "connector_latency_ms": "Fetch latency (ms)",
#         },
#         points=False,
#         width=800,
#         height=400
#     )
# )
# NOTE: this plot is not included because it's very large (in MB) as it includes all data points (px.box is pretty innefficient)
In [4]:
# DB insertion median over time

data["db_insertion_latency"] = (
    data["update_timestamp"] - data["fetch_timestamp"]
).dt.total_seconds() * 1000

db_insertion_latency_resample = data.set_index("timestamp")[
    "db_insertion_latency"
].resample("10T")

db_insertion_latency = pd.DataFrame(
    {
        "max": db_insertion_latency_resample.max(),
        "percentile_95": db_insertion_latency_resample.quantile(0.95),
        "median": db_insertion_latency_resample.median(),
    }
)

fig = px.scatter(
    db_insertion_latency,
    x=db_insertion_latency.index,
    y=db_insertion_latency.columns,
    log_y=True
)
fig.update_layout(
    title="Database insertion latencies over time",
    yaxis_title="DB insertion latency (ms)",
    xaxis_title="Timestamp (10min buckets)",
    hovermode="x unified",
)
fig.update_traces(marker_line_width=0, selector=dict(type="bar"))
display(fig)

4- Analysis of CRV/USDT w/ aggregate algorithm¶

The algorithm uses the spread and frequency of ticker changes by connector to estimate the weight each connector should have. It then uses a weighted average of the last prices to output the true price.

In [5]:
SYMBOL = 'CRV/USDT'
from_index = datetime.datetime(2023, 8, 2, 15, 0)
to_index = datetime.datetime(2023, 8, 2, 16, 0)

data_symbol = data.loc[(data.symbol == SYMBOL) & (data.timestamp > from_index) & (data.timestamp < to_index)]
In [6]:
import numpy as np

weight_by_connector = {}
for connector in data_symbol.connector.unique():
    connector_prices = data_symbol[data_symbol.connector == connector].copy()
    connector_prices["price_diff"] = connector_prices["price"].diff().abs()
    connector_prices = connector_prices.loc[connector_prices.price_diff != 0]
    diff = connector_prices["price_diff"]
    weight_by_connector[connector] = 1.0 / diff.apply(float).var()

# We use the server-provided timestamp for the real prices, but the update_timestamp
# (websocket latency + database insertion latency) to compute the true price.
# The true price therefore has a realistic latency that depends on the connector.
real_prices_by_connector = pd.pivot_table(
    data=data_symbol,
    index="timestamp",
    columns="connector",
    values="price"
).ffill()

latency_prices_by_connector = pd.pivot_table(
    data=data_symbol,
    index="update_timestamp",
    columns="connector",
    values="price"
).ffill()


def row_avg(row):
    row = row.dropna()
    weights = pd.Series(
        [weight_by_connector[connector] for connector in row.index], index=row.index
    )
    valid_weights = weights[~weights.isnull()]
    # Remove nan weights
    row = row[~weights.isnull()]
    return np.average(row.to_numpy(), weights=valid_weights)


latency_true_price = latency_prices_by_connector.apply(row_avg, axis=1)
In [7]:
latency_true_price_fig = px.line(
    latency_true_price,
    x=latency_true_price.index,
    y=latency_true_price,
    title=SYMBOL
)
connectors_fig = px.line(
    real_prices_by_connector,
    x=real_prices_by_connector.index,
    y=real_prices_by_connector.columns,
)


def hex_string_to_rgba(hex_string: str, alpha: str):
    hex_string = hex_string.lstrip("#")
    rgb = tuple(int(hex_string[i : i + 2], 16) for i in (0, 2, 4))
    color_rgb_str = ", ".join([str(x) for x in rgb])
    return f"rgba({color_rgb_str}, {alpha})"


for trace in connectors_fig.data:
    color = trace.line.color
    trace.line.color = hex_string_to_rgba(color, "0.2")
    latency_true_price_fig.add_trace(trace)

display(latency_true_price_fig)