The Age of Micro-Stream ProcessingThe Age of Micro-Stream Processing

The Age of Micro-Stream Processing

Tags
development
stream processing
Published
Published January 12, 2023
Author

Abstract

Micro-stream processing is a kind of data processing pipeline that more granularly partitions data and colocated compute and storage enabling small compute workers to operate quickly on small subsets of the overall data set.
notion imagenotion image

The problem with stream processing

If you’re like me, then you think stream processing is a particularly tricky field: Mountains of infrastructure projects at great complexities exist to tackle the rivers of data users generate.
Many companies have converted to stream processing as the only way to handle such massive data at such low latency. But this has historically come at a cost. You’ll seldom find a small team building a EMR cluster to run Flink connected to Kafka fed by a DB’s changefeeds.
Want to process streaming data? Well you need to add a streaming data store (e.g. Kafka, RedPanda), something to run aggreations (e.g. Flink, Materialize), send that data back to the stream again, and then have some consumer handle the final results (your code). Furthermore many of those services need other services like Zookeeper just to wrangle them…
You can start to see what I mean 😅
notion imagenotion image
Not a particularly elegant world, requiring 2 or 3 completely new services just to connect your code to. Let’s not get started on either running them yourselves, prepare to pay thousands per month in cloud hosting fees for setups like these.
Don’t be deceived, this a greatly simplified diagram. It’s actually more of a rats nest than this in practice!
Hot take: Needing Zookeeper = Couldn’t bother to implement Raft 😬 (sorry Clickhouse I still love you)
To be clear, I love many solutions like RedPanda, I think they are great and have served many well at immense scales!
Every dev team knows that scaling policies, monitoring, and billing management become an issue as you introduce more technologies and vendors into your stack.
With the advent of Cloudflare’s Durable Objects, a new style of stream processing has been enabled with near-zero infrastructure overhead and low complexity: Micro-stream processing!

Introducing micro-stream processing

I have been a quiet fan of Durable Objects for a long time. While Durable Objects can achieve massive feats in the realm of coordiation, it can also serve for massive data processing workloads for data that can be granularly siloed into small partitions (for example: by user).
A cascading fan-out architecture allows for both low-latency and horizontally scalable writing and processing
Detailed example diagram of partition and processor mappings for micro-stream processingDetailed example diagram of partition and processor mappings for micro-stream processing
Detailed example diagram of partition and processor mappings for micro-stream processing
The method is simple:
When we create a new processor, we subscribe it to a set of events that it is interested in.
Next, we quickly write to a random partition in the “durability layer” so our event producers can get back to doing other things. How you handle this could be random, by event name, by tenant, which ever makes the most sense for your data.
From there, we say “Hey who is interested in event X, Y, or Z?“ by looking up the subscribers (processing layer) of various events, and route the events to those subscribers. Those will be partitioned by some processor ID (say “churn_detection”), and something like a User ID.
Finally, the Durable Objects (processing layer) then process the small streams of data. They can maintain a materialized state if needed, or at such small scales reasonably choose to re-process all of the rows every invocation until some condition is met. This processing is triggered when a new batch of rows is written, or on some interval post-insert (using alarms).
This method of handling data leverages the full power of Cloudflare’s network and colocated data and compute. By creating 2 colocated tiers, one for quick durability and one for partition processing, we have only the limits of Cloudflare to concern ourselves with.
The best part? Not even thinking about scale, monitoring, or infrastructure! Just bump that partition key range to modulo over!
Oh yeah, and only needing a handful of JS to achieve this is sensational!
We also now have the full power of JS thanks to the v8-based runtime. While this could be dangerous for those who don’t know how to efficiently handle data. At the micro-scales of a single granular data silo, it’s hardly consequential.
You can use this flexibility to customize the guarantees. Want to guarantee in-order processing? Persist historical events to allow new processors to read a history? All up to you! You can choose to trade features for simplicity how ever you’d like.
Now many developers who’d typically never be able to interface with such stream processing technologies now have an amazing new and simple option.

Micro-stream processing in action

So what can we do with micro-stream processing? A lot!
Now that we have a small subset of data processed per Durable Object and the flexibility of JS, we can define small state machines, fan-out queues.

Touching-base with users who are at risk of churning

In our pretend SaaS platform, we want to look out for users who are at risk of churning. We can create a simple processor that listens to the user_signin event.
We then check for when a user has not logged in for at least 7 days, and send them an email to re-engage them.
notion imagenotion image
The JS for this processor might look something like:
async fetch() { // ... if (this.storage.get("neverRunAgain")) { // we already ran, just shut down return } if (this.getAlarm()) { // cancel exiting alarm this.deleteAlarm() } // set our alarm for in the future this.setAlarm(new Date().getTime() + DAYS * 7) } // ... async alarm() { if (this.storage.get("neverRunAgain")) { // we already ran, just shut down return } // send the email await sendUserEmail() // mark to never run again this.storage.put("neverRunAgain", 1) }

Celebrating with users when they reach milestones

Many platforms like customer.io and braze can look for churning users, so let’s look at something they can’t do.
Now we want to send an “achievement” email for users that have earned at least $100 on our platform. We can do so by hooking into the events user_created and sale_complete.
It’s important to note that we only start processing for a user when we first get a user_created, so that in the event of adding this processor after our first users have registered we don’t send them potentially incorrect achievements (e.g. they’ve already earned $100).
notion imagenotion image
For this example we sum the event.price property of all sale_complete events until it exceeds $100, then we mark the processor to shut down and never worry about processing future rows, effectively retiring that instance.

A true horizontally scalable push queue

Segment’s Centrifuge was the inspiration for my first distributed queue design. The major advantage of this design is how quickly and efficiently data can be re-organized to prevent any form of blocking.
But could we design the same architecture of a push queue with Durable Objects?
Of course! Why would I say that if we couldn’t?
Pretending we are segment, we want to deliver events from certain sources to certain destinations (kind of like the relationship between layers 1 and 2 in the system design!). For a given tenant, we can store this mapping within the database, that we can refresh per-instance on some interval to determine what events need to be delivered to which destinations.
We can then create Durable Objects with the key schema (tenant_id, partition_id) where partition_id is [0, n) based on the throughput of that tenant, randomly selecting a partition to write a batch of data to as needed.
Within a partition (Durable Object) we can setup an alarm that fires on an interval to kick-off writing, batch events per destination, attempt delivery, and enter events into a back-off queue (on disk) if writes fail to happen within the allowed time period, continuing to retry them with some backoff in the future.
notion imagenotion image
Look, Mom, no durability layer needed for this one!
We write to a random partition for a tenant, for which each partition writes to destionations (based on some mapping, say in D1) on an interval. If the destionation refuses that write, then we can re-queue them without blocking delivery to other destinations by maintaining virtual sub-queues.

Weaknesses

There are many more uses for micro-stream processing, but they don’t cover all areas where stream processing is useful.
The ability to replay the log is one place where this system becomes more complex than traditional systems like Kafka and RedPanda.
Another weakness is that you need to build the fanout system based on your needs. While this is definitely also a benefit, sometimes you just want to move fast.
Finally, these processors stick around forever. They can be retired to never process data again, but as long as that processor definition exists listening for certain events, those events will be routed.
With thousands of processors definitions this can get a little chatty, but Cloudflare can definitely handle the scale :P

Parting Notes

Before you go building your own mesh of Durable Objects, there are a few more things to note:

Is this a replacement for existing streaming solutions?

No.
As mentioned above there are many situations where a typical RedPanda/Flink setup will do far better. For example looking at whole-product aggregations like the realtime total number of sales. That level of volume could easily overwhelm a Durable Object at scale.
Where a Durable Object based system would be better is if you were keeping track of real-time results for individual users.

Where should this be used?

Attached to something like Segment, where you are listening for all events from all systems, and performing said micro-stream processing on those event streams!
The convenience of hooking into Segment is immense, but you could surely send your own events manually to the system!

Similarity to Inngest

Many of these example use case can be achieved with Inngest. A few major benefits to the Durable Objects mesh are:
  1. Self-hosting is ridiculously easy to boot
  1. Far simpler infrastructure that can probably achieve greater scale with just a single knob
  1. Not needing to deliver the entire workflow history to the worker to rebuild linearizable state
  1. Able to fetch the current state of the machine
  1. We can’t drop objects while processing like Inngest currently does
At smaller scales with no risk of dropping events, Inngest can crush it, but I don’t know where the bottlenecks are are massive scale. Looking at their architecture diagram in Github, it doesn’t look to dissimilar to a traditional stream processing setup.
And to be precise: What micro-stream processing compares to is not the code you write for Inngest but the underlying infrastructure and services that manage and deliver the events to your workers.
That being said I think poorly of them at all. Inngest is sublime and run by intelligent folks, and I use Inngest in my projects because I know that I can save time not building a mesh of Durable Objects. But man is that mesh cool!
For example, here is some code for event stream processing with Inngest might look like:
// The user signup event "starts" the aggregation and waits for sales inngest.createStepFunction("$100 email", "user_signup", ({ event, tools }) => { let sum = 0; // Set the end threshold while (sum < 10000) { // Wait for the next sale where the user.id matches the // original event let nextEvent = tools.waitForEvent("sale_completed", { timeout: "7d", // abandon if we don't get anything for 7d match: "user.id", }) sum += nextEvent.data.price; } tool.run("Send email", () => { emailAPI.send("...") }) })
Thanks to Dan @ Inngest for the example :)
As you can see the code is extremely simple, and like micro-stream processing you could feed event sources like Segment right into it. Inngest has done a lot of the infra work so you can just write the functionality… Maybe I need to write some generic wrapper for the Durable Object based system 🤔

Update Jan 28th 2023

I hacked a version of this together over the weekend that uses arbitrary endpoints for the eval and exec handlers, and oh boy… it feels like magic.
1 worker, 2 durable object classes, KV, and D1 with <1000 lines of TypeScript.