This is currently an open exploration on ideas for solving a subset of sharding-related issues with data at scale.
Problem
Resharding a system like Kakfa/RedPanda or ClickHouse is a pain because you have to rewrite all the data. There are solutions for single-shard methods (e.g. S3-backed MergeTrees) but they are less than ideal - we still need traditional sharding some some level.
Systems like CockroachDB and Cassandra/Scylla keep much smaller “shards” so that moving data around is much simpler during events such as adding or removing nodes from a cluster. They also tend to do this automatically.
The issue for systems like Kafka/RedPanda and ClickHouse is that when you change the number of shards, you are also changing which server data is stored on as shards tend to map to physical nodes, not virtual ones (e.g. ranges in CRDB or vNodes in Cassandra/Scylla).
Solution
Finding a method for hashing such that we can determine which hashing method (or how many hashes) to use via timestamps).
High Level Overview
- At t=0, some hashing algorithm with some number of nodes representing the modulo (N=5) is used
- Some k-sortable ID (such as KSUID) that allows for a reverse lookup of time (when it was created), which we will call a “hash-space” (K1) is mapped to shard S1 using this hash and modulo
- At t=10, an additional node is added to the cluster. All nodes now will use use N=5 for any value to hash where t<10, and N=6 when t≥10
- A row for K1 is inserted again, it is mapped to S1 using N=5 for modulo
- A row for hash-space K2 is inserted, which was created at t>10 so N=6 is used for modulo
Limitations
This only applies where related data is sharded by the same identifier, such as a tenant ID, namespace, org, or user.
Because this system is not designed to re-allocate data, tricks will have to be played to weight more of the newer shards to newer machines that do not have as much storage consumed.
Older shards can fill up their data because it is never re-divided. In theory individual keys that are sharded could be remapped to a future timestamp, but there’s simply nothing to be done about a single hash value becoming too large. This makes it important to project current usage by some hash-space.
Cannot reduce number of shards (but I don’t think you ever could).
For something like clickhouse, and external system inserting data should insert directly to the Replicated table rather than on a distributed table, as ClickHouse does some extra hashing on top which ruins that. This means the inserting nodes need to be aware of all shards, and of which N value to use based on t.
Partitioning rebalancing is essential after time, this could cause issues on a system like ClickHouse where you’d effectively have to re-insert all the data for a given shard. However this can be done per-shard when done manually.
Reads for inserts need to be consistent with the number of active shards, however we can cache values by time: We only need to refresh the cache when a value comes in at a t>last cache time. In a system where new users are created less frequently (say less than once per second) this is not an issue, especially if inserts are done in batches. For high frequency new values (e.g. time series data) manual updating at the inserting level (e.g. hard-coding in the “shard log”) might be required, where the timestamp is generated when it boots then is persisted to local disk, and restored on start. But then if you have multiple hosts coordinating that time will be hard, so maybe you need to instead pick a time well in the future so that all hosts have an agreed time and are online before inserts start to new shards. Then even a remote store can be used as long as the caching interval is well below the offset at which new shards will start to be applied.