Design for a Simple, Scalable, Priority Task Queue

Tags
development
databases
queues
distributed systems
engineering
Published

The Problem

Task queues are often hard to scale, especially when you have data sorted by time, and even priority too. Often times you are at odds with scale due to the nature of distributed databases.
On a DB like Postgres, you don’t have as many concerns about writing to a sequential index such as time, since all writes are going to the same place, they’ve optimized for this.
When you encounter distributed databases like ScyllaDB and CockroachDB, time can hurt in many ways.
One of the most notable ways is hotspots.
Fig. A - Hot ranges as a result of bad indexing for task queues
Fig. A - Hot ranges as a result of bad indexing for task queues
When you are writing to a time-based index, we all know that time values are going to be sequential. This means that you are probably going to be writing to the same place all the time.
Typical accomadation is to change the partition key to be something like the name of a unique entity, with such entities not producing significant amounts of data over time (maybe a few writes per second). This allows for the time-based writes to be distributed over many parts of many partitions/ranges because their primary key prefix, the entity ID, should be random (like a UUIDv4, or NanoID).
The issue with queues is that often times a single queue could easily see over thousands of operations per second. Within a partition, the DB starts to have very hot spots that causes major scaling pains.
So, let’s build a super simple priority task queue that scales linearly, and has at-least-once processing guarantees!
It is important to mention that the definition of ‘priority’ is stretched a bit here to mean in time, not an actual priority number. However the time value can be adjusted to change priority.

The Hash-sharded Index

CockroachDB typically partitions their data by a method known as “range partitioning”, where values are sequentially grouped, and ranges of values are split into their own partitions.
While this works very well for random primary keys, this means that all sequential data like a time index would continue to write at the same range, all the time. This turns that index into a single primary writer, with far less performance than something like Postgres.
To combat this, CRDB introduced hash-sharded indexes that allow automatic hashing of values to place them in evenly distributed buckets.
While significant, this is not enough to drive the scale that we need.
When we make a query for a time range, say WHERE ts < NOW(), the database doesn’t know the exact location of all the data like it would with a range index.
As a result, CRDB will perform this lookup on every single bucket in the index, and join the results before returning.
Fig. B - Efficient publish, suboptimal consume
Fig. B - Efficient publish, suboptimal consume
This means that if you have 16 buckets, the DB will need to perform this on 16 different partitions, potentially across up to 16 nodes, join the result, and send that back.
Clearly as you scale up, this becomes a major problem for latency and performance across the cluster.
To combat this, we use a combination of the knowledge of how many partitions exist, and a hidden field in tables with hash-sharded indexes.

The Design

Fig. C - Efficient publish and consume
Fig. C - Efficient publish and consume
Each task consumer for a given queue (defined by table, or by a primary key prefix) will round-robin consume from each shard using the following query:
UPDATE tasks SET status = 'locked' WHERE status = 'available' AND exec_time < NOW() AND crdb_internal_exec_time_id_shard_16 = n -- where n is the bucket LIMIT 10 -- we are grabbing 10 items at a time RETURNING id, payload ;
To maintain consistency, we transition the state from available to locked. By requiring this transition we ensure that only one node will get a given task at a time, due to CRDB’s serializable consistency.
When we are done processing the task, we can delete it like so:
DELETE FROM tasks WHERE id = $1 AND exec_time = $2 ;
The column crdb_internal_exec_time_id_shard_16 is defined like
crdb_internal_{COLUMN NAMES}_shard_{BUCKET COUNT}
You can find it exactly by looking at the output of SHOW CREATE tasks
You can also customize the amount of items to grab at once with the LIMIT clause. Do not make this a very high number!
The best part of this system is how easy it is to implement, meaning that porting this to different languages is extremely trivial.

The Testing Proceedure

First, we must setup the table:
SET experimental_enable_hash_sharded_indexes=on; CREATE TABLE tasks ( id TEXT NOT NULL DEFAULT gen_random_uuid()::TEXT, -- We are going to use int strings in our demo payload BYTEA NOT NULL, exec_time TIMESTAMPTZ NOT NULL, -- allows sorting by time status TEXT NOT NULL, -- 'available', 'locked' updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ON UPDATE NOW(), -- auto update PRIMARY KEY (exec_time DESC, id) USING HASH WITH BUCKET_COUNT = 16 ) ;
We are also setting up 16 buckets. Buckets are 0-indexed, so we have 0 through 15.
Now this may seem like a strange primary key, however based on our access pattern this makes perfect sense:
exec_time is our priority key, we make sure that the oldest items get processed first.
status allows us to then filter efficiently by status.
id is a final random ID that ensures that we don’t have a primary key collision for rows inserted at the same time and status.
USING HASH WITH BUCKET_COUNT = 16 ensures that this index is hash sharded, so writes aren’t all going to the same range due to the timestamp based primary key prefix.
We can verify that our query uses the expected index by performing an EXPLAIN on the UPDATE:
EXPLAIN UPDATE tasks SET status = 'locked' WHERE status = 'available' AND exec_time < NOW() AND crdb_internal_exec_time_id_shard_16 = 1 LIMIT 10 RETURNING id, payload ;
distribution: local vectorized: true • update │ table: tasks │ set: status, updated_at │ auto commit │ └── • render │ └── • limit │ count: 10 │ └── • filter │ filter: status = 'available' │ └── • scan missing stats table: tasks@primary spans: [/1 - /1/'2022-05-30 12:57:41.845412+00:00'] (20 rows)
Now we can insert some test data and check our round-robin strategy:
INSERT INTO tasks (id, exec_time, payload, status) SELECT i::TEXT, NOW(), 'a payload'::BYTEA, 'available' FROM generate_series(1, 1000) AS i;
And we can test that our round-robin works:
SELECT COUNT(*) FROM tasks WHERE crdb_internal_exec_time_id_shard_16 = 5 ; count ------- 62
SELECT COUNT(*) FROM tasks ; count ------- 1000
-- from one shard SELECT * FROM tasks WHERE status = 'available' AND exec_time < NOW() AND crdb_internal_exec_time_id_shard_16 = 1 LIMIT 10 ; id | payload | exec_time | status | updated_at -----+----------------------+-------------------------------+-----------+------------------------------- 106 | \x61207061796c6f6164 | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 115 | \x61207061796c6f6164 | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 120 | \x61207061796c6f6164 | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 159 | \x61207061796c6f6164 | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 16 | \x61207061796c6f6164 | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 164 | \x61207061796c6f6164 | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 173 | \x61207061796c6f6164 | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 227 | \x61207061796c6f6164 | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 238 | \x61207061796c6f6164 | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 241 | \x61207061796c6f6164 | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 -- from another shard SELECT * FROM tasks WHERE status = 'available' AND exec_time < NOW() AND crdb_internal_exec_time_id_shard_16 = 2 LIMIT 10 ; id | payload | exec_time | status | updated_at ------+-----------+-------------------------------+-----------+-------------------------------- 134 | a payload | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 141 | a payload | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 152 | a payload | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 178 | a payload | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 185 | a payload | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 196 | a payload | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 206 | a payload | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 215 | a payload | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 233 | a payload | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 24 | a payload | 2022-05-21 19:04:25.664604+00 | available | 2022-05-21 19:04:25.664604+00 (10 rows)
The id field is in a strange order, we are missing chunks of sequences! That’s how we know the hashing works. Also it’s sorting like that because it is using string sorting.

Resolving Abandoned Locks

If the consumer that has just locked a task dies, we need to be able to recover.
In order to do so we can set a in-flight timeout for each task, and perform a similar round-robin bucket update to find items that have timed out their locks.
This design also requires that the consumer will always abandon the task after the timeout as well, even if the consumer is still healthy. Even if this fails, this fits under the at-least-once processing guarantees.
UPDATE tasks SET status = 'available' WHERE status = 'locked' AND exec_time < NOW() - INTERVAL '30 seconds' -- the in-flight timeout AND crdb_internal_exec_time_id_shard_16 = n LIMIT 50 -- up to 50 at a time ;
This consumer does not claim the task itself to process, it purely re-publishes the task for later processing. Due to the sorting of time this should end up being the next item processed.
You can also adjust the LIMIT and number of cleanup consumers to change your failover latency tolerance.

Exponential Backoff

Furthermore you could additionally add a delivery_attempts column that could be used to calculate an exponential backoff.
Increment delivery_attempts and update exec_time to fit your backoff strategy within this same UPDATE statement!

Tradeoffs

There is no perfect system, and one as simple as this must have some faults.

Fanout & Pub/Sub

This system does not account for any fanout functionality natively, you must write to multiple tables (queues) for that functionality. However, the cost to do so is quite small.

Multi-region complexity

While there are multi-region capabilities with distrubted persistence layers, you must intelligently consume from the same region you write to, otherwise you risk greatly increased latencies with publishers and consumers operating on very distant data. CRDB provides great capabilities for managing this elegantly.

Prioritization of tasks

With the time-based system, there is no way to insert a task with multiple levels of priority. The only real priorities that can be given are “do this next” and not.
You can set the exec_time to very far in the past to have it be the next task consumed (from that partition), however you otherwise do not have a good gague for when the task will be consumed, since the number of queued tasks with a lower exec_time are not counted.
This generally should not be an issue due to the “an empty queue is a happy queue” mentality, but it is something to consider if you need granular prioritization control.

Consumer Latency Management

As mentioned prior, it is important to have multiple consumers so that the round-robin process does not result in large latencies between processing a hash bucket. While the consumers could be as simple as an additional goroutine or process thread, you should probably have at least half as many active consumers as you do buckets, if not more.

Manual Intrumentation

If you need observability into your produce and consume rates, payload sizes, avg. processing latencies, etc. you will need to instrument these manually. This should be a fairly simple instrumentation however.

Manual Backoff Processing

While this could also be seen as a benefit, if you want any NACK functionality with backoff (or immediate retry), you will need to calculate and UPDATE the task. Otherwise the task will not process until a abandoned lock consumer finds it.

Why CockroachDB?

The design here could easily be replicated for ScyllaDB with manual partition prefixing (say make the partition key integers 0 through n-1 buckets), however CRDB handles this very gracefully for us under the hood.
Furthermore, consistency. You know that during a state transition, only one statement successfully claimed that item to process, which would typically invoke a very expensive “lightweight transaction” on ScyllaDB.
 
If you add more nodes in the future, you can easily alter the amount of buckets that exist within the index using ALTER PRIMARY KEY and USING HASH WITH BUCKET_COUNT = num_buckets. Keep in mind that the bucket column name will change, which means that upgrades are manual:
Start with an ALTER command:
ALTER TABLE tasks ALTER PRIMARY KEY USING COLUMNS (exec_time, id) USING HASH WITH BUCKET_COUNT=32;
Now we have an additional column in our primary index:
defaultdb=> SHOW INDEXES FROM tasks; table_name | index_name | non_unique | seq_in_index | column_name | direction | storing | implicit ------------+------------+------------+--------------+-------------------------------------+-----------+---------+---------- tasks | primary | f | 1 | crdb_internal_exec_time_id_shard_32 | ASC | f | t tasks | primary | f | 2 | exec_time | ASC | f | f tasks | primary | f | 3 | id | ASC | f | f tasks | primary | f | 4 | payload | N/A | t | f tasks | primary | f | 5 | status | N/A | t | f tasks | primary | f | 6 | updated_at | N/A | t | f tasks | primary | f | 7 | crdb_internal_exec_time_id_shard_16 | N/A | t | f
We may then switch over to using the new crdb_internal_exec_time_id_shard_32 column in our UPDATE statements, then drop the old hash column:
ALTER TABLE tasks DROP COLUMN crdb_internal_exec_time_id_shard_16;
defaultdb=> EXPLAIN UPDATE tasks SET status = 'locked' WHERE exec_time < NOW() AND status = 'available' AND crdb_internal_exec_time_id_shard_32 = 1 LIMIT 10 RETURNING id, payload ; info --------------------------------------------------------------------------------------- distribution: local vectorized: true • update │ table: tasks │ set: status, updated_at │ auto commit │ └── • render │ └── • limit │ count: 10 │ └── • filter │ filter: status = 'available' │ └── • scan missing stats table: tasks@primary spans: [/1 - /1/'2022-05-30 13:00:09.456268+00:00']
Boom, online upgrade! (performance will be degraded during the process however, but it’s not offline!)

How Well It Scales

The throughput will scale linearly: More tasks? Add nodes and increase the bucket count.
Of course, you will need to add more consumers too, otherwise the time it will take to visit every partition in round-robin style will grow. Adding more consumers will reduce the amount of time that a partition remains unvisited while it has eligible tasks.

From Here…

This is an amazingly simple option that fills most task queue needs, but I still believe there is room for more.
I’ve been working on something a bit exciting!
Over the last few years I’ve seen companies like Segment and Facebook make their own massively scalable task queues. Non-blocking, topics/virtual queues, and priorities.
Why are we constantly re-engineering the same thing? We’ve all agreed that existing open source systems are insufficient, so why has nobody made anything new?
Cloud providers have great proprietary options, but for any reasonable scale one might be paying tens of thousands of dollars a day… plus some of their limits are… weird, like SQS allowing no more than 15 minutes of delay for a task.
What if there was a task queue that:
  • Multi-region, had no single point of failure
  • Horizontally scalable
  • Topics for fanout
  • Priorities for optimizing dequeue order
  • Did not need complex coordination
  • At-least-once processing guarantees with guaranteed durability
  • Scaled globally… like truly globally
  • Fast failover at any scale, like Singapore going down to Virginia bringing up new nodes in seconds
  • Partition self-discovery
  • Native load balancing
  • Dead-simple scale up and down
  • Flexibility to use what ever backing data store you want: Disk-based KV, CRDB, ScyllaDB, etc. all without having to compromise on durability or consistency
That doesn’t exist right? Not yet 😉