IceDB v2 🧊 - An in-process Parquet merge engine to build dirt-cheap OLAPIceDB v2 🧊 - An in-process Parquet merge engine to build dirt-cheap OLAP

IceDB v2 🧊 - An in-process Parquet merge engine to build dirt-cheap OLAP

Tags
databases
Infrastructure
Published
Published June 10, 2023
Author
IceDB v2 is a massive work in progress. Check out the Github repo here: https://github.com/danthegoodman1/icedb
And a production example here: https://github.com/danthegoodman1/IceDBSegment

What is IceDB?

IceDB is an in-process parquet merge engine for S3. Inserts and merges are powered by Python and DuckDB, with querying portable to any language that can query Postgres and any DB/language that can parse parquet.
It enables truly serverless OLAP/data lake hybrid, optimized for event-based data. By combining DuckDB, Parquet files, and S3 storage to fully decouple storage and compute, you only have to pay for compute during query execution time.
It takes the concept of decoupling storage and compute a step further, and decouples meta data as well.

The benefits of IceDB

Serverless

People throw the word “serverless” around with far too many different meanings, but IceDB serverless is serverless in the classical AWS Lambda sense.
While it’s not required to be, it can run on stateless runtimes because all state is stored in the meta store. This results in two major benefits:
The first benefit is that we are only paying for compute time during query execution. If you only query or insert every few seconds or so, this already provides savings.
The second benefit is that each query can get its own resources. Say you use AWS Lambda with 10GB of ram per function, or use fly.io machines with 16vCPU 128GB ram. You can give each query all of those resources, whether it’s selecting a few columns or multiple joins with window functions. This results in predictable performance no matter how much load the data set is under.
And with the data being stored in S3 (or something more optimal like Cloudflare R2), the store costs are low and the read throughput is massive!
Being serverless means that IceDB can scale up and down with you, start with 1 user, and scale to 1 million!

Powered by DuckDB

Inserting, merging, and querying is primarily powered by DuckDB (with exceptions, more on that in the next section). This means that we get the lightning fast multi-threaded performance of DuckDB, from the simple and feature-rich SQL and Python interfaces.

Based on Parquet

By using the widely-used parquet file format, this means that other tools can process the files created IceDB, not just DuckDB.
Bind a Python function to ClickHouse for listing the active files in the meta store, and consume them there.
Query the meta store from python and consume the parquet files in Pandas with read_parquet, IceDB doesn’t care how you consume the parquet files!

Why not other solutions?

IceDB has evolved to be an in-process framework for storing, managing, and querying event data for both OLAP and large-scale analysis use cases. There are some great alternatives that exist, but they each have something that can cause pain at various scales.

ClickHouse

It’s no secret that I love ClickHouse. Like, a lot. Amazingly fast, amazing features and query language, amazing development velocity, what’s not to like?
Combining the Altinity operator with S3 backed merged trees is my favorite way to run it, but there is one issue with the architecture that can be hard for a certain use case.
If you are spending most of your time inserting, then running large queries infrequently, then ClickHouse’s relationship of decoupled storage and compute does not fit perfectly. This means that you either have to have massive amounts of resources waiting for a large query, being far under-utilized, or you need to scale the cluster when you want to start querying so you aren’t billing massive machines needlessly.
However, as mentioned previously, you can still use ClickHouse as the query execution engine for IceDB! This was intentional, as sometimes you just need the nice parametric aggregation functions 😄
In fact, the astute reader might realize in this post that IceDB handles parquet files nearly identically to how ClickHouse manages a part!

Spark, Flink, Hadoop, etc.

Oh boy, I hope I don’t make anyone angry…
I think platforms like Hudi and DeltaLake are really cool, the problem is that they are Spark-based.
The problem with MapReduce systems like Spark is that even if they are cloud-hosted, you need a team of 5 engineers (7 if you self-host) that all have PhD’s in Java stack traces to be able to run it.
As amazing as the ecosystem is, it’s a total mess. It has a horrible DX, scares people off from powerful tools, and is only accessible to those who have massive experience.

Presto and Trino

(See above)

Managed Solutions like BigQuery and Athena

If your budget is infinite, sure!
There are a ton of gotchas beyond the high data processing fees, like the fact that BigQuery bills you on uncompressed data size 😥
Beyond the insane costs, there’s a lot to be gained from open source that you can actively review, fix bugs, and add features too.

A cold shower 🥶: Why IceDB v1 was terrible

I did a lot wrong with the first version of IceDB.
I wanted to have dynamically changing schema so that every column could be filtered on, despite the little benefit and enormous pain around that feature.
I wanted to use Go, despite Python being clearly more appropriate for data manipulation (especially when using DuckDB for it).
I tracked far too much information in the meta store to be useful, which would result in immense scaling pains and redesign down the road.
I tried to use open formats to maintain cross-compatibility, yet designed the data within the parquet files such that many systems would not be able to handle such complex structures.

Why are you making this again?

I’ve never really been satisfied with the events storage world.
Jamming events in S3 feels super inefficient, considering that with just a bit of compute involved we can make it hundreds of times more efficient and queryable.
Existing serverless solutions are expensive, slow, and/or use some esoteric query language.
I’ve already mentioned the issues I see with ClickHouse (I still love you) and the Spark ecosystem.
I wanted to build a solution that could be used for event-based OLAP, storing arbitrary amounts of event data (think a segment.com sink), leverages the massive scale of object storage, and only requires compute to insert and query.
When you first set up event collection with something like like segment.com, it’s crucial you get it connected to a data store that you can SQL query to fill the holes in analysis from products with limited features (e.g. Mixpanel), or when you decide to add a new tool and need your historical event data.
💡
An aside on segment.com, their data warehouses only sync every 24 or 12 hours depending on your plan, but if you do something like the webhook sink connected to a system like IceDB, then you get the events within seconds!
I’m also highly interested in making a “replacement” for product analytics such as Mixpanel/Posthog that allow you to make dashboards in a UI builder, SQL, and/or with Python Notebooks. These tools often have unpredictable math and lack of transparency in how things are calculated. As much as I love Mixpanel, they love to put 40% at the bottom of the X axis. This hurts my heart:
Despite solutions such as Metabase and Apache Superset existing, they still miss something that I want… Python Notebooks in dashboards!
Anyway, I don’t need to validate my desire to build something, we know well by now that I’m easily nerd sniped.
notion imagenotion image

Redesign

The high-level concepts stay the same:
  1. Events come in as JSON, and are written as Parquet files
  1. Parquet files are merged together to reduce file count and optimize query performance
  1. Parquet files are immutable, and kept track of in a consistent meta store (e.g. CockroachDB)
  1. Avoid S3 ListObject calls
 
What has changed is the implementation:
  1. Indexing, row sorting, and file naming
  1. Make it a framework that is embeddable - make a thin wrapper as an API or deeply integrate it into your stack. This allows us to make configuration easier and more flexible by writing some Python istead of arcane configuration files and SQL statements
  1. Data formatting that anything can query - DuckDB, ClickHouse, Pandas, anything that can read parquet can read IceDB.

Indexing, row sorting, and file naming

This sounds boring, but is actually incredibly interesting!
How we name files, and sort said rows within parquet files, makes a MASSIVE impact on query performance.

Elevator pitch on parquet files

Parquet files are a sort of “hybrid” row/columnar file format that supports strong typing and nested structures.
They do this by having the concept of “Row Groups”.
Row Groups are a chunk of rows that track a smallest and largest value for each column in the row group. Each column from each row group can be read individually, but the row groups allow for more fine-grained access. The more rows in a row group, the more performant selective queries are but less performant large aggregations are. You can tune this balance depending on what is needed.
Parquet files store this meta data separately in the file from the rows so that we can read tiny amounts of data in each file before we start ripping through it to determine whether this file potentially has any data of interest to us.
This post by Micahel Berk does a great job at explaining them in a bit more detail.

Hive partitioning

Before we crack open a parquet file, we first need to know which files are of interest.
We can do this even before opening parquet files with clever naming.
For example, if we name files in the format: files/y=2023/m=06/d=05/uuid.parquet, we can easily get all files for a given time range from the meta store, and support ordered S3 ListObject calls as needed!

The Meta Store

The crucial feature of IceDB that makes it work is the meta store. This is very similar to the Hive store for the Hadoop ecosystem. The meta store keeps track of the “active” files in the data set.
Taking inspiration from the ClickHouse MergeTree engine family, the meta store allows us to use a database with serializable transactions (such as Postgres or CockroachDB) to give us an instant consistent snapshot of the data set.
The means we can merge files while queries are in-flight, without harming currently running queries because we did something silly like deleting a file.
It also means that when we merge files, we never risk that a query sees two of the same row across merged files because we have already taken a consistent snapshot of the “available” files at that time for the given query.
The final crucial feature is that we can avoid expensive (both in cost and time) S3 ListObject calls by simply querying the known list of files that potentially have the data we are interested in.
“Postgres and CockroachDB aren’t serverless though!”
I counter this attack with links!
But there’s also a good chance you are using CockroachDB or Postgres already (I certainly am!), so in that case you don’t even need to worry 😄

Tying it together

Now that we know how to cleverly sort data in the file, and how to cleverly name them, we can tie them together to make for highly performant querying without wasting too much data. Because we are optimizing for cost and low file counts, the strategy is the following:
  • We keep track of higher-level information within the file name, such as some namespace, year, month, day, etc. Depending on how much data you have you may choose to omit parts like day, and have a handful of files for the month
  • Inside of the file, we have columns for more granular querying. For example we might sort by event_id, then timestamp within the parquet file. This way we can more efficiently query for single events, which is so commonly done with event-based systems (or joining multiple queries of different events)
  • If you know your queries are always going to be on some other dimension (like event_id), we can move that to the first Hive partition in the file name as well. This flexibility is analogous to primary keys in OLTP, or partition and ordering keys in ClickHouse
  • We can choose how and when merging happens - for example we can merge all files that have a matching y, m, d in the hive partition
  • When inserting, we can control how aggressively we want to batch, and write each insert as one file. This is the same as systems like ClickHouse
  • We always add a unique row_id so that in the event we don’t use the meta store, we can deduplicate rows on that column
  • We can dynamically change the row_group_size of the parquet files depending on how many rows exist, how many unique values are present in a column, and more

Merging Files

IceDB is designed so that on some interval, you can scan for known files that are well under the specified size limit, and merge them together to reduce the number of files. Not only does this make querying faster, but it also means that you use less storage as larger files compress more efficiently!
By having IceDB manage the file merging, we can coordinate it through the meta store to ensure that we never are reading duplicate rows.
Merges use ... SKIP LOCKED to pass over files that are already locked, so we know there will never be an issue with duplicate merges. This also means we can run concurrent merges!
💡
Note that while merging files are locked from being selected, which can cause queries to have to wait until the merge is complete. If you are not able to merge quickly enough then your queries could time out or take substantially longer. Make sure to balance max file size with the performance you can get with merging!
Merging works like this:
First, we start by listing the files in order of how you want to merge (whether you are optimizing old files, or quickly merging newer partitions):
select partition, filename, filesize from known_files where active = true and filesize < {} order by partition {}
We can keep parsing through that list, summing the file size and count until we hit some exit condition for a single partition.
Then we begin merging.
First, we need to lock the files so that nobody else can tamper with them:
select filename from known_files where active = true and partition = '{}' and filename in ({}) for update
Next, we need to merge the files using DuckDB to create a single merged file in S3:
q = ''' COPY ( select * from read_parquet([{}], hive_partitioning=1) ) TO 's3://testbucket/{}' '''.format(','.join(list(map(lambda x: "'s3://testbucket/{}/{}'".format(x[0], x[1]), buf))), new_f_path) ddb.execute(q)
From there, we can update the old files as no longer active, and insert the new merged file as active:
update known_files set active = false where active = true and partition = '{}' and filename in ({}); insert into known_files (filename, filesize, partition) VALUES ('{}', {}, '{}');
You might now be thinking:
“If you never delete files, isn’t that wasteful?”
Yes! And we can manage that by having some background process that lists active=false files from the meta store that have not been updated in some long duration (say 7 days), and delete them from the meta store, then S3. With a time range like 7 days we can reasonably say there is no active query in progress 😄.

Custom Merge Queries

For advanced users, you can customize merge queries to get the AggregatingMergeTree and ReplacingMergeTree-like functionality from ClickHouse in IceDB!
The best part, you can combine this with double-writing to get materialized view functionality as well!
This results in super fast queries, and super small data.
For example, maybe you want to count the number of clicks a user does on your site. Rather than store each row and have that grow unbound over time, when merging you can write a custom merge query that aggregates the number of clicks:
select user_id, sum(clicks) as clicks, gen_random_uuid()::TEXT as _row_id from read_parquet(?, hive_partitioning=1) group by user_id
Now each merge only generates one row per user, meaning that over time you actually REDUCE that amount of data you store and process, despite inserting more rows! The amortized number of rows is just N, where N is the number of unique users. Without this it would be N*C, where C is the number of clicks they do.
You can also use this technique to effectively “update” data, by only accepting the newest row for a given user_id when merging, dropping older rows.

Inserting Rows

Similar to ClickHouse, inserting data is immutable. This means that data is not deleted or updated by modifying existing rows.
Inserting rows follows a similar format to merging.
First, we must break down the incoming rows into their respective partitions:
partmap = {} for row in rows: # merge the rows into same parts rowTime = datetime.utcfromtimestamp(row['ts']/1000) part = 'y={}/m={}/d={}'.format('{}'.format(rowTime.year).zfill(4), '{}'.format(rowTime.month).zfill(2), '{}'.format(rowTime.day).zfill(2)) if part not in partmap: partmap[part] = [] partmap[part].append(row)
Next, we can copy these partitions each into their own file in S3:
final_files = [] for part in partmap: # upload parquet file filename = '{}.parquet'.format(uuid4()) fullpath = part + '/' + filename final_files.append(fullpath) partrows = partmap[part] # use a DF for inserting into duckdb df = pa.Table.from_pydict({ 'ts': map(lambda row: row['ts'], partrows), 'event': map(lambda row: row['event'], partrows), 'properties': map(lambda row: json.dumps(row['properties']), partrows), # turn the properties into string 'row_id': map(lambda row: str(uuid4()), partrows) # give every row a unique id for dedupe }) ddb.sql(''' copy (select * from df order by event, ts) to '{}' '''.format('s3://testbucket/' + fullpath))
Finally, we can store each of those new files in the meta store:
# get file metadata obj = s3.head_object( Bucket='testbucket', Key=fullpath ) fileSize = obj['ContentLength'] # insert into meta store rowTime = datetime.utcfromtimestamp(partrows[0]['ts'] / 1000) # this is janky with conn.cursor() as cursor: cursor.execute(''' insert into known_files (filename, filesize, partition) VALUES ('{}', {}, '{}') '''.format(filename, fileSize, part))
Now these rows will show in any subsequent queries!
Because inserts create their own files (just like parts in ClickHouse), the more you can batch inserts the more efficient they become. A large number of small files will result in slower queries than a small number of large files, as well as less efficient compression and more rows in the meta store.

Querying IceDB

Because they are just paruqet files in S3, we can query from anything that can read parquet files. We can then dedupliate rows on the row_id to ensure that we prevent duplicate rows from being analyzed.
However if we are using an in-process framework like Pandas, an in-process DB like DuckDB, or a standalone OLAP beast like ClickHouse, we can bind functions for listing the available files and read consistent snapshots of the data!
For example if we are using DuckDB, we might bind a function that looks like:
select sum((properties::JSON->>'numtime')::int64) as agg, uid from icedb(start_month:=2, end_month:=8) where event = 'page_load' group by uid order by agg desc
Now how did this query process our data?
  1. First, it asked the meta store what files exist between 2023-02-01 and 2023-08-01, so that even if we had files for a million years, we’d only get the ones in the time range we were interested in
  1. Second, DuckDB read the metadata of all known parquet files in that time range from S3, and checked if any row_groups had the page_load event in them
  1. Third, DuckDB started reading the row_groups that might have the page_load event in them, stream the rows into memory just like normal query processing, reading 1 row_group per core concurrently.
  1. Finally, DuckDB gives us the query result in tens of milliseconds, even if the sum of the files in that range were in TBs, it knew how to shred down the read data efficiently to only read hundreds of MB at most.
Because we don’t delete files after a merge (at least not for a long time, say 3 days), with this snapshot of files we can still query even if they have been merged during the query, or 3 hours after we started analyzing them in a python notebook.
FWIW, the icedb table macro is defined like this:
def get_files(syear: int, smonth: int, sday: int, eyear: int, emonth: int, eday: int) -> list[str]: with conn.cursor() as mycur: q = ''' select partition, filename from known_files where active = true AND partition >= 'y={}/m={}/d={}' AND partition <= 'y={}/m={}/d={}' '''.format('{}'.format(syear).zfill(4), '{}'.format(smonth).zfill(2), '{}'.format(sday).zfill(2),'{}'.format(eyear).zfill(4), '{}'.format(emonth).zfill(2), '{}'.format(eday).zfill(2)) mycur.execute(q) rows = mycur.fetchall() return list(map(lambda x: 's3://testbucket/{}/{}'.format(x[0], x[1]), rows)) ddb.create_function('get_files', get_files, [ty.INTEGER, ty.INTEGER, ty.INTEGER, ty.INTEGER, ty.INTEGER, ty.INTEGER], list[str]) ddb.sql(''' create macro if not exists get_f(start_year:=2023, start_month:=1, start_day:=1, end_year:=2023, end_month:=1, end_day:=1) as get_files(start_year, start_month, start_day, end_year, end_month, end_day) ''') # the `select *` gets overridden by the select on the macro, and has the same explain plan as if you did the `read_parquet` select directly. It effectively drops the unused columns. ddb.sql(''' create macro if not exists icedb(start_year:=2023, start_month:=1, start_day:=1, end_year:=2023, end_month:=1, end_day:=1) as table select * from read_parquet(get_files(start_year, start_month, start_day, end_year, end_month, end_day), hive_partitioning=1) ''')
You can take that get_files function and use it anywhere: In ClickHouse, in Pandas, and more!
Because the meta store table schema is so simple, there is nothing stopping you from, for example, binding a Golang binary to ClickHouse instead!

Future Work

IceDB is actively being transformed from a PoC into a usable Python package. You can follow the development at https://github.com/danthegoodman1/icedb.
It proves the model, and shows that it can scale, but in this demo the data must strictly conform to the pre-defined data structure.
The flexibility of this system is not to be lost: For example perhaps you insert product events through a Python AWS Lambda with relatively low resources accessed through a JSON REST API, merge with a fleet of fly.io machines, and query from Python notebooks running in Google Colab.

Multi-Tenancy

The model IceDB provides also makes for a perfect database for multi-tenant OLAP workloads. For example the data store behind a product analytics tool like Mixpanel or PostHog.
IceDB can easily have dedicated query resources for each customer to provide predictable performance, your users aren’t wasting your instance time when they aren’t actively querying, and you can have granular permissions for file listing in the meta store so that users can only see their files!

Conclusion

IceDB allows us to store massive amounts of data at extremely low cost, yet maintaining the extreme performance that is expected from modern OLAP and data lake solutions.
It does so by building on the shoulders of giants, and gives developers working on projects of all sizes the ability to have extremely performant and cost-efficient analytics that can scale from 1 user to 1 million.
Because it’s designed to be a framework, keep in mind you’ll still have to do a little bit of extra work, like scheduling inserts, merges and query executions.
One might say I rewrote all of the work that the Spark, Hive, Hadoop ecosystem did but has less features, blah blah… I say that I took the amazing architecture that is so similar, and did it in <300 lines of Python. I’d say that I gave far more people that ability, plus it can run on a more cost-efficient serverless runtime.
notion imagenotion image