Update: Dec 2022
v22.2.0 has release and the
SKIP LOCKED
wait policy is released! https://www.cockroachlabs.com/docs/stable/select-for-update.html#wait-policiesUpdate: Nov 2022
In 22.1-rc1 CRDB is added the
SKIP LOCKED
for SELECT ... FOR UPDATE
queries: https://www.cockroachlabs.com/docs/releases/v22.2#v22-2-0-rc-1This means we can get far superior performance with queries like the following rather than direct
UPDATE tasks
queries:UPDATE locked_tasks SET status = 'locked' FROM ( SELECT id, payload from tasks AND exec_time < NOW() AND crdb_internal_exec_time_id_shard_16 = n -- where n is the bucket FOR UPDATE -- manually locks the rows SKIP LOCKED -- skips already locked rows instead of blocking, a massive speed boost LIMIT 10 ) AS locked_tasks RETURNING id, payload ;
The Problem
Task queues are often hard to scale, especially when you have data sorted by time, and even priority too. Often times you are at odds with scale due to the nature of distributed databases.
On a DB like Postgres, you don’t have as many concerns about writing to a sequential index such as time, since all writes are going to the same place, they’ve optimized for this.
When you encounter distributed databases like ScyllaDB and CockroachDB, time can hurt in many ways.
One of the most notable ways is hotspots.
When you are writing to a time-based index, we all know that time values are going to be sequential. This means that you are probably going to be writing to the same place all the time.
Typical accomadation is to change the partition key to be something like the name of a unique entity, with such entities not producing significant amounts of data over time (maybe a few writes per second). This allows for the time-based writes to be distributed over many parts of many partitions/ranges because their primary key prefix, the entity ID, should be random (like a UUIDv4, or NanoID).
The issue with queues is that often times a single queue could easily see over thousands of operations per second. Within a partition, the DB starts to have very hot spots that causes major scaling pains.
So, let’s build a super simple priority task queue that scales linearly, and has at-least-once processing guarantees!
It is important to mention that the definition of ‘priority’ is stretched a bit here to mean in time, not an actual priority number. However the time value can be adjusted to change priority.
The Hash-sharded Index
CockroachDB typically partitions their data by a method known as “range partitioning”, where values are sequentially grouped, and ranges of values are split into their own partitions.
While this works very well for random primary keys, this means that all sequential data like a time index would continue to write at the same range, all the time. This turns that index into a single primary writer, with far less performance than something like Postgres.
To combat this, CRDB introduced hash-sharded indexes that allow automatic hashing of values to place them in evenly distributed buckets.
While significant, this is not enough to drive the scale that we need.
When we make a query for a time range, say
WHERE ts < NOW()
, the database doesn’t know the exact location of all the data like it would with a range index.As a result, CRDB will perform this lookup on every single bucket in the index, and join the results before returning.
This means that if you have 16 buckets, the DB will need to perform this on 16 different partitions, potentially across up to 16 nodes, join the result, and send that back.
Clearly as you scale up, this becomes a major problem for latency and performance across the cluster.
To combat this, we use a combination of the knowledge of how many partitions exist, and a hidden field in tables with hash-sharded indexes.
The Design
Each task consumer for a given queue (defined by table, or by a primary key prefix) will round-robin consume from each shard using the following query:
UPDATE tasks SET status = 'locked' WHERE status = 'available' AND exec_time < NOW() AND crdb_internal_exec_time_id_shard_16 = n -- where n is the bucket LIMIT 10 -- we are grabbing 10 items at a time RETURNING id, payload ;
To maintain consistency, we transition the state from
available
to locked
. By requiring this transition we ensure that only one node will get a given task at a time, due to CRDB’s serializable consistency.When we are done processing the task, we can delete it like so:
DELETE FROM tasks WHERE id = $1 AND exec_time = $2 ;
The column
crdb_internal_exec_time_id_shard_16
is defined likecrdb_internal_{COLUMN NAMES}_shard_{BUCKET COUNT}
You can find it exactly by looking at the output of
SHOW CREATE tasks
You can also customize the amount of items to grab at once with the
LIMIT
clause. Do not make this a very high number!The best part of this system is how easy it is to implement, meaning that porting this to different languages is extremely trivial.
The Testing Proceedure
First, we must setup the table:
SET experimental_enable_hash_sharded_indexes=on; CREATE TABLE tasks ( id TEXT NOT NULL DEFAULT gen_random_uuid()::TEXT, -- We are going to use int strings in our demo payload BYTEA NOT NULL, exec_time TIMESTAMPTZ NOT NULL, -- allows sorting by time status TEXT NOT NULL, -- 'available', 'locked' updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ON UPDATE NOW(), -- auto update PRIMARY KEY (exec_time DESC, id) USING HASH WITH BUCKET_COUNT = 16 ) ;
We are also setting up 16 buckets. Buckets are 0-indexed, so we have 0 through 15.
Now this may seem like a strange primary key, however based on our access pattern this makes perfect sense:
exec_time
is our priority key, we make sure that the oldest items get processed first.status
allows us to then filter efficiently by status.id
is a final random ID that ensures that we don’t have a primary key collision for rows inserted at the same time and status.USING HASH WITH BUCKET_COUNT = 16
ensures that this index is hash sharded, so writes aren’t all going to the same range due to the timestamp based primary key prefix.We can verify that our query uses the expected index by performing an
EXPLAIN
on the UPDATE
:EXPLAIN UPDATE tasks SET status = 'locked' WHERE status = 'available' AND exec_time < NOW() AND crdb_internal_exec_time_id_shard_16 = 1 LIMIT 10 RETURNING id, payload, exec_time ;
distribution: local vectorized: true • update │ table: tasks │ set: status, updated_at │ auto commit │ └── • render │ └── • limit │ count: 10 │ └── • filter │ filter: status = 'available' │ └── • scan missing stats table: tasks@primary spans: [/1 - /1/'2022-05-30 12:57:41.845412+00:00'] (20 rows)
Now we can insert some test data and check our round-robin strategy:
INSERT INTO tasks (id, exec_time, payload, status) SELECT i::TEXT, NOW(), 'a payload'::BYTEA, 'available' FROM generate_series(1, 1000) AS i;
And we can test that our round-robin works:
SELECT COUNT(*) FROM tasks WHERE crdb_internal_exec_time_id_shard_16 = 5 ; count ------- 62
SELECT COUNT(*) FROM tasks ; count ------- 1000
-- from one shard SELECT * FROM tasks WHERE status = 'available' AND exec_time < NOW() AND crdb_internal_exec_time_id_shard_16 = 1 LIMIT 10 ; id | payload | exec_time | status | updated_at -----+----------------------+-------------------------------+-----------+------------------------------- 106 | \x61207061796c6f6164 | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 115 | \x61207061796c6f6164 | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 120 | \x61207061796c6f6164 | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 159 | \x61207061796c6f6164 | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 16 | \x61207061796c6f6164 | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 164 | \x61207061796c6f6164 | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 173 | \x61207061796c6f6164 | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 227 | \x61207061796c6f6164 | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 238 | \x61207061796c6f6164 | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 241 | \x61207061796c6f6164 | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 -- from another shard SELECT * FROM tasks WHERE status = 'available' AND exec_time < NOW() AND crdb_internal_exec_time_id_shard_16 = 2 LIMIT 10 ; id | payload | exec_time | status | updated_at ------+-----------+-------------------------------+-----------+-------------------------------- 134 | a payload | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 141 | a payload | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 152 | a payload | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 178 | a payload | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 185 | a payload | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 196 | a payload | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 206 | a payload | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 215 | a payload | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 233 | a payload | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 24 | a payload | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 (10 rows)
The
id
field is in a strange order, we are missing chunks of sequences! That’s how we know the hashing works. Also it’s sorting like that because it is using string sorting.Resolving Abandoned Locks
If the consumer that has just locked a task dies, we need to be able to recover.
In order to do so we can set a in-flight timeout for each task, and perform a similar round-robin bucket update to find items that have timed out their locks.
This design also requires that the consumer will always abandon the task after the timeout as well, even if the consumer is still healthy. Even if this fails, this fits under the at-least-once processing guarantees.
UPDATE tasks SET status = 'available' WHERE status = 'locked' AND exec_time < NOW() - INTERVAL '30 seconds' -- the in-flight timeout AND crdb_internal_exec_time_id_shard_16 = n LIMIT 50 -- up to 50 at a time ;
This consumer does not claim the task itself to process, it purely re-publishes the task for later processing. Due to the sorting of time this should end up being the next item processed.
You can also adjust the
LIMIT
and number of cleanup consumers to change your failover latency tolerance.Exponential Backoff
Furthermore you could additionally add a
delivery_attempts
column that could be used to calculate an exponential backoff.Increment
delivery_attempts
and update exec_time
to fit your backoff strategy within this same UPDATE
statement!Tradeoffs
There is no perfect system, and one as simple as this must have some faults.
Fanout & Pub/Sub
This system does not account for any fanout functionality natively, you must write to multiple tables (queues) for that functionality. However, the cost to do so is quite small.
Multi-region complexity
While there are multi-region capabilities with distrubted persistence layers, you must intelligently consume from the same region you write to, otherwise you risk greatly increased latencies with publishers and consumers operating on very distant data. CRDB provides great capabilities for managing this elegantly.
Prioritization of tasks
With the time-based system, there is no way to insert a task with multiple levels of priority. The only real priorities that can be given are “do this next” and not.
You can set the
exec_time
to very far in the past to have it be the next task consumed (from that partition), however you otherwise do not have a good gague for when the task will be consumed, since the number of queued tasks with a lower exec_time
are not counted.This generally should not be an issue due to the “an empty queue is a happy queue” mentality, but it is something to consider if you need granular prioritization control.
Consumer Latency Management
As mentioned prior, it is important to have multiple consumers so that the round-robin process does not result in large latencies between processing a hash bucket. While the consumers could be as simple as an additional goroutine or process thread, you should probably have at least half as many active consumers as you do buckets, if not more.
Manual Intrumentation
If you need observability into your produce and consume rates, payload sizes, avg. processing latencies, etc. you will need to instrument these manually. This should be a fairly simple instrumentation however.
Manual Backoff Processing
While this could also be seen as a benefit, if you want any NACK functionality with backoff (or immediate retry), you will need to calculate and
UPDATE
the task. Otherwise the task will not process until a abandoned lock consumer finds it.Why CockroachDB?
The design here could easily be replicated for ScyllaDB with manual partition prefixing (say make the partition key integers 0 through n-1 buckets), however CRDB handles this very gracefully for us under the hood.
Furthermore, consistency. You know that during a state transition, only one statement successfully claimed that item to process, which would typically invoke a very expensive “lightweight transaction” on ScyllaDB.
If you add more nodes in the future, you can easily alter the amount of buckets that exist within the index using
ALTER PRIMARY KEY
and USING HASH WITH BUCKET_COUNT = num_buckets
. Keep in mind that the bucket column name will change, which means that upgrades are manual:Start with an
ALTER
command:ALTER TABLE tasks ALTER PRIMARY KEY USING COLUMNS (exec_time, id) USING HASH WITH BUCKET_COUNT=32;
Now we have an additional column in our primary index:
defaultdb=> SHOW INDEXES FROM tasks; table_name | index_name | non_unique | seq_in_index | column_name | direction | storing | implicit ------------+------------+------------+--------------+-------------------------------------+-----------+---------+---------- tasks | primary | f | 1 | crdb_internal_exec_time_id_shard_32 | ASC | f | t tasks | primary | f | 2 | exec_time | ASC | f | f tasks | primary | f | 3 | id | ASC | f | f tasks | primary | f | 4 | payload | N/A | t | f tasks | primary | f | 5 | status | N/A | t | f tasks | primary | f | 6 | updated_at | N/A | t | f tasks | primary | f | 7 | crdb_internal_exec_time_id_shard_16 | N/A | t | f
We may then switch over to using the new
crdb_internal_exec_time_id_shard_32
column in our UPDATE
statements, then drop the old hash column:ALTER TABLE tasks DROP COLUMN crdb_internal_exec_time_id_shard_16;
defaultdb=> EXPLAIN UPDATE tasks SET status = 'locked' WHERE exec_time < NOW() AND status = 'available' AND crdb_internal_exec_time_id_shard_32 = 1 LIMIT 10 RETURNING id, payload ; info --------------------------------------------------------------------------------------- distribution: local vectorized: true • update │ table: tasks │ set: status, updated_at │ auto commit │ └── • render │ └── • limit │ count: 10 │ └── • filter │ filter: status = 'available' │ └── • scan missing stats table: tasks@primary spans: [/1 - /1/'2022-05-30 13:00:09.456268+00:00']
Boom, online upgrade! (performance will be degraded during the process however, but it’s not offline!)
How Well It Scales
The throughput will scale linearly: More tasks? Add nodes and increase the bucket count.
Of course, you will need to add more consumers too, otherwise the time it will take to visit every partition in round-robin style will grow. Adding more consumers will reduce the amount of time that a partition remains unvisited while it has eligible tasks.
Performance Test
I’ve been sitting on this blog post for a few months now (as of 07/16/2022). The amazing team at Cockroach Labs (creators of CockroachDB) have given me a dedicated cluster to test this hypothesis out on. Due to business and life getting in the way as we got into YC (S22), I’ve had no time to take 3-4 hours and perform the test. I will update this post once I am able to!
From Here…
This is an amazingly simple option that fills most task queue needs, but I still believe there is room for more.
I’ve been working on something a bit exciting!
Over the last few years I’ve seen companies like Segment and Facebook make their own massively scalable task queues. Non-blocking, topics/virtual queues, and priorities.
Why are we constantly re-engineering the same thing? We’ve all agreed that existing open source systems are insufficient, so why has nobody made anything new?
Cloud providers have great proprietary options, but for any reasonable scale one might be paying tens of thousands of dollars a day… plus some of their limits are… weird, like SQS allowing no more than 15 minutes of delay for a task.
What if there was a task queue that:
- Multi-region, had no single point of failure
- Horizontally scalable
- Topics for fanout
- Priorities for optimizing dequeue order
- Did not need complex coordination
- At-least-once processing guarantees with guaranteed durability
- Scaled globally… like truly globally
- Fast failover at any scale, like Singapore going down to Virginia bringing up new nodes in seconds
- Partition self-discovery
- Native load balancing
- Dead-simple scale up and down
- Flexibility to use what ever backing data store you want: Disk-based KV, CRDB, ScyllaDB, etc. all without having to compromise on durability or consistency
That doesn’t exist right? Not yet 😉