This deployment has been open-sourced: https://github.com/danthegoodman1/IceDBSegment
The (good) problem
Tangia started scaling fast.
Like, really fast.
Here are some numbers of end-viewers tracked with segment. I had to disable the source on the 7th because our bill jumped 10x from it’s baseline:
On top of that, we also started tracking new events from our streamers to implement better automations like churn risk, conversion segmentation, etc.
Knowing that this event was sent a lot while streamers were online, I initially sampled it at 33%, and immediately dropped it to 5% as it consumed our monthly capacity in 2 days:
IceDB to the rescue
I designed IceDB exactly for this.
IceDB is a truly serverless OLAP/data lake hybrid in-process database framework optimized for event-based data. It combines DuckDB, Parquet files, and S3 storage to fully decouple storage and compute, while only paying for compute during query execution time.
You can check it out on Github here: https://github.com/danthegoodman1/icedb
IceDB would now server as our data store for events from end-viewers, and analysis would be done in Python Notebooks (until my next project for turning those into dashboards was done).
The parquet files live in Cloudflare R2, as with their recent speed boosts it’s now the best way to manage object storage.
Designing the data model
IceDB benefits from a carefully designed data model, so to ensure that we were getting maximum performance and efficieny it was crucial to consider access patterns.
Access will be time-based, with queries focusing on a single event (with some focusing on a few). We are never interested in querying per-user, but being able to aggregate by user (sum, avg, etc.) is crucial. We also need to be able to join it on other data sets.
The rows will look have this schema:
event
: string, name of the event
ts
: int, UTC milliseconds
uid
: string, the user ID
properties
: string, arbitrary JSON encoded into a string (for more efficient extraction of event properties)
og_payload
: string, the full original JSON body of the request
The sorting order
Much like ClickHouse, the way data is sorted within a parquet file has a massive impact on performance.
Because we want to pull files for a time range (generally based on days), it’s better for us to first sort by
event
. This means we can look at fewer row groups.From there we will further scope down by time, but at this point it doesn’t really matter.
Our final sort order looks like
event, ts
where ts
is a timestamp in milliseconds.The partition scheme
The partition for the parquet files look like this:
table={table_name}/y=YYYY/m=MM/d=DD
This allows us to quickly grab time ranges, and let DuckDB shred down the actual columns and rows to pull from said files based on the sorting order.
We use tables for each source as well to keep lookups even more efficient.
Inserting events
Ingestion was done by batching inserts up in our API pods running on kubernetes, and flushing to a serverless function running in AWS Lambda as an IceDB insert worker.
The Cloudflare pages functions that emitted the events to our API pods did not wait for flushing to IceDB, as we have a high risk tolerance for lost events. The only case we’d lose those events are if the pods crashed, but worst case we’d only lost a few seconds of events.
By batching them in our API, we reduce the number of files being generated by IceDB, meaning we can be less aggressive with merging on the current partition.
Merging files
Merging was done with 2 workers: A DESC worker and an ASC worker.
The DESC worker runs frequently, and does small merges on the current partitions. This keeps queries on young data fast by reducing the file counts, but keeps a low probability of lock contention on the meta store. It runs every 30 seconds, because we aren’t inserting that frequently as described in the section above.
The ASC worker runs far less frequently, every 10 minutes to be exact. This worker is far more aggressive in terms of both file size, and file count. It reduces partitions to a max of a 100MB file to ensure that we only ever have a handful of files per partition (day). This helps keep queries that access more historical data very fast.
We also wrote a custom merge query that deduplicates on the
messageId
property of the event, that was captured as the insert row’s _row_id
:select any_value(user_id) as user_id, any_value(event) as event, any_value(properties) as properties, any_value(og_payload) as og_payload, any_value(ts) as ts, _row_id from source_files group by _row_id
This makes it so that even if we send the same event multiple times, we can easily deduplicate it, with deduplication automatically taking place every time we merge.
These workers actually run on GCP Cloud Run. I chose this because:
- I prefer Cloud Run to AWS Lambda (deployment, logging, etc.)
- There were no bandwidth cost differences between Lambda and Cloud Run for this case (insert bandwidth was cheaper with Lambda as we only leave the cloud provider once)
- We already have many services running in Cloud Run
Querying
We query IceDB from 2 places.
The first place is within Colab Python Notebooks. Within these notebooks, we initialize IceDB just like in the insert and merging containers, except we only run the
get_files
method to list the files for a given time range.This is both accessed as a list of files for Pandas to parse, as well as bound to a table function in DuckDB for easy acessiblility. DuckDB is orders of magnitude faster for analysis in Colab notebooks than Pandas, so when I can do something in SQL first, I prefer to. I know SQL better as well, so often times the code is faster to write and far more terse.
Performance
After running this setup for a few weeks, we can start to benchmark some queries against IceDB.
For these tests, I used the DuckDB table macros and SQL for clarify and simplicity. They were run on dedicated DigitalOcean instances with 4vCPU and 16GB ram, as Colab can be extremely variable in performance.