I’ve always wanted a JSON event data warehouse that I only pay for storage and query processing. A place that I can just dump Segment-like event data into without thinking about it, knowing that when I need to build off that data I have it all in super-efficient columnar format.
A cloud-native JSON-first serverless analytical database designed for low cost and arbitrary read capacity while maintaining the benefits of columnar data formats. It leverages object storage, serverless compute, and serverless databases to provide an infinitely scalable data warehouse for JSON data in which you only pay for storage when idle. It is called “IceDB” because it is designed for cold-starts: Going from idle to high-performance querying in milliseconds.
You can follow the development here https://github.com/danthegoodman1/icedb
- Truly serverless, like AWS Lambda serverless, not Snowflake serverless
- Highly cost and storage efficient (thanks to using object storage and serverless runtimes)
- Simple but performant
- A direct replacement for Athena/other “query files you already have” serverless engines - you must tell IceDB about your data
- A direct replacement for dedicated ClickHouse (or similar) nodes
- Trying to be “better” or “faster” than other OLAP solutions
- Meant to be used for data generating 100’s of GBs per day
And not at the manage service level, I’m talking at the self-hosted level.
We do this by optimizing IceDB to run on serverless runtimes like AWS Lambda, and
The purpose of IceDB is to be a general data warehouse for your JSON event data. Whether that be Segment (or similar) data, logs, or anything else that is JSON.
An evolving schema is expected, and should feel transparent.
While we do a bit of Hive and MergeTree reinventing (discussed later), I did not want to re-invent query engines like Spark.
DuckDB was the natural fit. With extensive features, a great community, and silly-fast performance it’s what makes IceDB possible.
A massive shout out to the DuckDB team, the contributors of essential Parquet processing features, and to the folks in the DuckDB Discord that let me bother them with a massive amount of questions :P
Parquet was the obvious choice here. It has native compression and allows DuckDB to only read the parts of the file that contain the columns of interest.
Being able to query the DB as much as you want due to the scalable nature of object storage, serverless compute, and serverless databases makes the read concurrency only limited by your cloud quotas.
DB’s like Athena and BigQuery are not meant to be queried by end-users, and it shows if you ever try to set it up that way.
Costs are also kept very low by keeping data all within the cloud provider. GCP doesn’t even charge for in-region object storage egress (c’mon AWS).
When it’s not being queried, you only pay for storage. This is done by having the query processing done on serverless runtimes like AWS Lambda and Google Cloud Run.
I hate when companies call managed nodes “serverless”… that’s still a bunch of servers. If I am invoking costs when I am not actively using a resource (like processing a request), then it’s not really serverless you’re just using a marketing buzz word.
The added benefit of serverless runtimes is that you get dedicated resources for every query.
SELECT 1gets the same resources as
SELECT count(distinct) FROM my_tableon demand with lightning-quick start times. If you give your Lambda 10GB RAM, then you get 10GB of RAM for every query you process!
While some queries that can take advantage of a 256vCPU 2TB RAM ClickHouse cluster, you rarely get all those resources for a single query.
By using the Parquet file format, IceDB uses a little storage and network bandwidth as possible.
It also means that you can read the data from external systems as well (more on that later).
Using a novel JSON-flattening technique, IceDB turns every possible path into a useful column (and reverts to a JSON string when impossible).
Even if you have arrays of nested objects that have arrays of arrays of objects with arrays in those objects, IceDB flattens those out to a consistent column name so it can take advantage of columnar speed and compression.
By flattening at write time, we avoid expensive JSON transformations during query time, and far less ugly queries.
In fact without the Parquet format we’d be reading ALL of the data from a file every time, rather than just the columns we need, which gets more wasteful the larger and more complex your JSON becomes.
Because it’s just Parquet files with the “available” files documented in CRDB, you can read it from anywhere.
You can easily spin up a 2TB RAM EC2 instance, pull all the files you need, and do massive data analysis, then kill the machine.
Query it through IceDB, pull parquet files directly from your code, or download the whole thing into another system.
When writing data, all JSON is flattened using a deterministic flattening algorithm. It is then transformed into Parquet files, and written to object storage and CockroachDB.
Hive partitioning is used to filter down the data that needs to be pulled before fetching data.
New files are always created with k-sortable random suffixes, and multiple files will be written if multiple partitions are present in the provided rows. It is best to use partitioning that changes gradually over time (like current time).
This means that IceDB is generally “write optimized”, meaning that the efficiency of queries is not the problem of the writer (yet it does do some helping in that regard by recording new items into the meta store).
The write throughput is virtually unlimited, as long as you don’t run into limits with the metastore or req/s to cloud storage, you can insert as fast an often as you want.
The more rows you insert per request, the more performant queries will be (rather the less merging is required to maintain performance). Having lots of small files as opposed to fewer larger ones is far less efficient, just like ClickHouse MergeTrees, however IceDB can ingest them at the same speed regardless (a dangerous illusion of performance).
Write batching is up to the user. Kafka/RedPanda, Kinesis, DB batching, and more work well, up to you to implement this.
For reading, DuckDB is used to process the Parquet files. IceDB creates a view for each table with all of the files provided.
Creating this view automatically has 2 benefits:
- It feels like querying a normal table
- It can leverage Hive partitioning before even pulling files to filter down the data set
DuckDB is only used for query processing. At the time of writing the parameter version of UNION ALL BY NAME does not support reading only the columns necessary for the query. This negates the ability to only read in the required rows, but once added will be a game-changer for performance. I’ve already opened up a discussion for this change.
CRDB serves as the remote serverless metastore that enables data stored in object storage without worrying about duplicate row reads during merges.
Without the store, we’d have many duplicate rows when files go to merge, or we could never merge files.
It’s also used to avoid unnecessary S3 ListObject API calls (which can only do 1,000 files at a time) before filtering down the files of interest.
As ClickHouse enthusaist, I spent a TON of time learning about how the MergeTree engine works under the hood. I would encourage and curious data nerd to as well.
The same concepts of parts, merging, and consistency inspired the model that IceDB uses. In fact, you can map many of the terms in to IceDB concepts:
- Part → Parquet File
- Partition → Hive Partitioning Path
In fact, I wanted to use
clickhouse-localas the query processing engine, but in comparison to DuckDB is lacks both features and performance. I have personally opened many issues in ClickHouse’s github repo, and I hope in the future I can offer ClickHouse as an alternative execution engine.
Notice how we don’t delete data immediately, that is very important incase a query was started before a merge happens. A configurable unavailable file deletion timeout enables IceDB to prune data from object storage that it’s confident there are no queries needing it.
Tuning how quickly merging happens is a factor of how often you are writing, and how large you are writing. The faster you can compact files the better, but you don’t want to use too many resources (DB, Lambda, S3) in doing so.
Like Clickhouse, merging does not actually delete any data, so running and retrying queries are not affected. Furthermore it gives the “time-travel” like capability so you can query what the data looked like in the past.
Partitions are immutable within a Namespace (basically a “table” in traditional OLAP). To calculate the partition, every insert request passes in a partition plan: A set of functions that calculate the partition for a given row.
The partition is determined for each row, and a file is created on insert for every partition.
IceDB (potentially dangerously) give the responsibility to the writer to ensure that they are writing rows within a namespace using the same partition plan, otherwise merging opportunities may be missed. This prevents an expensive lookup every time a write occurs to check the partition scheme. A few 10s of extra bytes per write request < an extra DB lookup per write request.
Generally it’s good to partition with something that either has a bounded set (like modulo over a number), or something that doesn’t change very often (like “year, month, day”).
It is best practice to partition down as far as reasonable so that your queries can pull the least amount of data possible.
CockroachDB is used as a consistent datastore that allows us to determine exactly what files are available.
While formats like Hudi and Delta Lake exist, their complexity make them undesirable for anyone but the largest of data wielders. It’s also far faster to query CRDB than a file in S3.
Using CockroachDB enables a few important things:
- Serverless option
- Consistency at scale
- General SQL access (so you can read the metadata too!)
Other serverless database offerings exist, but they have annoying limitations or are not cloud-agnostic.
Now CockroachDB is the choice for small scale. If you are running hundreds of queries per seconds, I might actually use Postgres for this because of read replicas. While follower reads exist in CRDB, the data is more stale than Postgres read replicas are (especially AWS Aurora), and still can’t exceed the replication factor in terms of read-replica-like behavior and performance.
Due to the nature of distributed Parquet files with potentially evolving schemas, there is no native way to track the schema of a namespace. To solve this, on every write the insert processor attempts to write a row for each column it found for a namespace, indicating its type (
list(x)due to the lack of JSON distinction).
This allows users to look up the schema of the namespace and determine what columns are available in what types before querying.
This could in theory become a performance bottleneck for insert req/s, but if you’re not inserting to the same namespace ~100 times per second it should not be an issue.
IceDB is currently very experimental and under heavy development. If you find any issues or have ideas for improvement please make a Github issue!