person_distinct_id
table makes for an interesting case study on how initial schema design flaws were exposed over time
and how they were fixed.
Problem being solved
PostHog needs to know who are the users associated with each event.
In frontend libraries like posthog-js, when persons land on a site they're initially anonymous with a random distinct ID.
As persons log in or sign up, posthog.identify
should be called to signal that the anonymous person is actually some
logged in person and their prior events should be grouped together.
The semantics of this have changed significantly with person-on-events project.
Schema
CREATE TABLE person_distinct_id(distinct_id VARCHAR,person_id UUID,team_id Int64,_sign Int8 DEFAULT 1,is_deleted Int8 ALIAS if(_sign==-1, 1, 0),_timestamp DateTime,_offset UInt64) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/noshard/posthog.person_distinct_id', '{replica}-{shard}', _sign)ORDER BY (team_id, distinct_id, person_id)
The is_deleted
column is not actually being written to, it is dynamically calculated based on the _sign
column.
This table was queried often joined with events
table along the following lines:
SELECT avg(count())FROM eventsINNER JOIN (SELECT distinct_id, argMax(person_id, _timestamp) as person_idFROM (SELECT distinct_id, person_id, max(_timestamp) as _timestampFROM person_distinct_idWHERE team_id = 2GROUP BY person_id, distinct_id, team_idHAVING max(is_deleted) = 0)GROUP BY distinct_id) AS pdi ON (pdi.distinct_id = events.distinct_id)WHERE team_id = 2GROUP BY pdi.person_id
Design decision: no sharding
Since this table was almost always joined against the events table, this table was not sharded.
Sharding it means that each shard would need to send back all the events and person_distinct_id sub-query result rows to coordinator node to execute queries, which would be expensive and slow.
Design decision: CollapsingMergeTree
The given distinct_id belonging to a person can change over time as posthog.identify
or posthog.alias
are called.
For this reason the data needs to be constantly updated, yet updating data in ClickHouse requires rewriting large chunks of data.
Rather than rewriting data, we opted to use CollapsingMergeTree
.
CollapsingMergeTree adds special behavior to ClickHouse merge operation: if on a merge rows with identical ORDER BY
values are seen, they are collapsed according to
_sign
column:
- If sum of signs
_sign
was positive, new row has_sign
of 1. - Otherwise, the row was removed.
This was used to update-via-insert:
- On a change, the old person_id was discarded via emitting a row
_sign
of -1 - On a change, the new person_id row was emitted with
_sign
of 1 - At query-time, the resulting rows were aggregated together to find the current state of the world
Due to this logic, both person_id and distinct_id needed to be in the ORDER BY
key.
Problem: CollapsingMergeTree for updates
CollapsingMergeTree is not ideal for frequently updating a single row as merges occur in an non-deterministic order and that will cause trouble if subsequent rows signifying deletes get discarded before being merged with an "insert" row.
When updating columns ReplacingMergeTree
engine tables with an
explicit version
column has proven to be reliable.
Problem: Expensive queries
In December 2021, PostHog started seeing significant performance problems and out-of-memory errors due to this schema for largest users.
The problem was two-fold:
- JOINs are inherently expensive in ClickHouse as the right-hand-side of the join (person_distinct_id subquery) would be loaded into memory
- The schema was inefficient, emitting multiple rows per person and requiring post-aggregation
Improved schema
To fix both problems, a new table was created:
CREATE TABLE person_distinct_id2(team_id Int64,distinct_id VARCHAR,person_id UUID,is_deleted Int8,version Int64 DEFAULT 1,_timestamp DateTime,_offset UInt64,_partition UInt64)ENGINE = ReplicatedReplacingMergeTree('/clickhouse/tables/noshard/posthog.person_distinct_id2', '{replica}-{shard}', version)ORDER BY (team_id, distinct_id)SETTINGS index_granularity = 512
JOINs with this table look something like this:
SELECT avg(count())FROM eventsINNER JOIN (SELECT distinct_id,argMax(person_id, version) as person_idFROM person_distinct_id2WHERE team_id = 2GROUP BY distinct_idHAVING argMax(is_deleted, version) = 0) AS pdi ON e.distinct_id = pdi.distinct_idWHERE team_id = 2GROUP BY pdi.person_id
This schema:
- Was over 2x faster to query for large teams while requiring less memory
- Had explicit versioning logic built in
- Required fewer Kafka messages and traffic
- Lowered
index_granularity
for faster point queries - Leveraged
ReplacingMergeTree
to ensure data consistency
Closing notes
Even with improvements JOINs still are expensive and after the person-on-events project we were able to store person_id column on the events table to great effect.