Problem and constraints
PostHog provides Apps for data imports, exports and transformation purposes.
App metrics helps users of apps want to know whether the apps are reliable and have tooling to debug errors.
When designing the schema, we needed ingestion of these stats to be as 'cheap' as possible. On the flip side, queries against the data did not need to support much beyond time-range filtering.
Schema
CREATE TABLE sharded_app_metrics(team_id Int64,timestamp DateTime64(6, 'UTC'),plugin_config_id Int64,category LowCardinality(String),job_id String,successes SimpleAggregateFunction(sum, Int64),successes_on_retry SimpleAggregateFunction(sum, Int64),failures SimpleAggregateFunction(sum, Int64),error_uuid UUID,error_type String,error_details String CODEC(ZSTD(3)),_timestamp DateTime,_offset UInt64,_partition UInt64)ENGINE = ReplicatedAggregatingMergeTree('/clickhouse/tables/{shard}/posthog.sharded_app_metrics', '{replica}')PARTITION BY toYYYYMM(timestamp)ORDER BY (team_id, plugin_config_id, job_id, category, toStartOfHour(timestamp), error_type, error_uuid)
Click here to see supporting tables
CREATE TABLE app_metrics(team_id Int64,timestamp DateTime64(6, 'UTC'),plugin_config_id Int64,category LowCardinality(String),job_id String,successes SimpleAggregateFunction(sum, Int64),successes_on_retry SimpleAggregateFunction(sum, Int64),failures SimpleAggregateFunction(sum, Int64),error_uuid UUID,error_type String,error_details String CODEC(ZSTD(3)),_timestamp DateTime,_offset UInt64,_partition UInt64)ENGINE=Distributed('posthog', 'posthog', 'sharded_app_metrics', rand())CREATE MATERIALIZED VIEW app_metrics_mvTO posthog.sharded_app_metricsAS SELECTteam_id,timestamp,plugin_config_id,category,job_id,successes,successes_on_retry,failures,error_uuid,error_type,error_detailsFROM posthog.kafka_app_metricsCREATE TABLE kafka_app_metrics(team_id Int64,timestamp DateTime64(6, 'UTC'),plugin_config_id Int64,category LowCardinality(String),job_id String,successes Int64,successes_on_retry Int64,failures Int64,error_uuid UUID,error_type String,error_details String CODEC(ZSTD(3)))ENGINE=Kafka('kafka:9092', 'clickhouse_app_metrics_test', 'group1', 'JSONEachRow')
Decision: Store errors in the same table as metrics
Error tracking is fundamentally different from metrics, but we wanted to avoid "failure counts" and errors we have data on going out of sync.
For this reason, the two are stored in the same table, with error_details
column containing JSON-encoded
metadata about the error including the relevant event payload, stack trace.
This runs the risk of data storage increasing significantly if a lot of large errors occur.
For this reason error_details
column uses ZSTD(3) codec.
Sorting by error_type also has a significance: error_details
of a given error type should be similar and compress well.
In the future, we might introduce TTL
s for the error columns if storage becomes a problem or periodically
wipe error data in other ways.
Decision: pre-aggregate metrics in app in memory
Apps act on events as they're processed and users might have dozens of apps installed at the same time.
For this reason, emitting a Kafka message per app per event ingested ends up being too expensive. We instead aggregate metrics (and errors) in memory and only periodically flush data to Kafka.
This runs the trade-off of counts being subtly off after deploys or restarts. If this becomes a significant user concern, we may reduce the precision of the numbers shown in the UI.
Decision: using AggregatingMergeTree
To make ingesting and storing this data cheaper, AggregatingMergeTree
is used.
Each time two parts are merged, rows with identical ORDER BY
values are collapsed into a single row in the new part.
In this setup, this means that:
- we aggregate data up to an hours granularity (since
toStartOfHour(timestamp)
is in theORDER BY
) - successes, successes_on_retry and failures columns get summed up for each unique value of
ORDER BY
- errors are not aggregated at all (since
error_uuid
is in theORDER BY
)
Even with all of this we still need to sum values in queries as merges may never occur.
Decision: sharding
To make data cheaper to store, this table is sharded.
Results
On US Cloud, the disk size of this table was 6 MB after aggregating nearly 2 billion metrics. For comparison, storage similar number of events can require hundreds of gigabytes.
Queries against this schema are also usually measured in milliseconds.
The reason we were able to leverage pre-aggregation to this extent was since we only needed to answer a few questions:
- What is the number of successes and failures in a given time range per day or hour, diced by plugin, method and job?
- How many errors of each type did we see in that time range?
- What are some examples of specific errors we saw?
These queries all lend itself well to pre-aggregation, meaning an expert schema could store this data very cheaply at the cost of some flexibility.
Next schema in the ClickHouse manual: person_distinct_id