

River: a Fast, Robust Job Queue for Go + Postgres
source link: https://brandur.org/river
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

River: a Fast, Robust Job Queue for Go + Postgres

Years ago I wrote about my trouble with a job queue in Postgres, in which table bloat caused by long-running queries slowed down the workersā capacity to lock jobs as they hunted across millions of dead tuples trying to find a live one.
A job queue in a database can have sharp edges, but Iād understated in that writeup the benefits that came with it. When used well, transactions and background jobs are a match made in heaven and completely sidestep a whole host of distributed systems problems that otherwise donāt have easy remediations.
Consider:
- In a transaction, a job is emitted to a Redis-based queue and picked up for work, but the transaction that emitted it isnāt yet committed, so none of the data it needs is available. The job fails and will need to be retried later.
- A job is emitted from a transaction which then rolls back. The job fails and will also fail every subsequent retry, pointlessly eating resources despite never being able to succeed, eventually landing the dead letter queue.
- In an attempt to work around the data visibility problem, a job is emitted to Redis after the transaction commits. But thereās a brief moment between the commit and job emit where if the process crashes or thereās a bug, the job is gone, requiring manual intervention to resolve (if itās even noticed).
- If both queue and store are non-transactional, all of the above and more. Instead of data not being visible, it may be that itās in a partially ready state. If a job runs in the interim, all bets are off.
Work in a transaction has other benefits too. Postgresā NOTIFY
respects transactions, so the moment a job is ready to work a job queue can wake a worker to work it, bringing the mean delay before work happens down to the sub-millisecond level.
Despite our operational trouble, we never did replace our database job queue at Heroku. The price of switching wouldāve been high, and despite blemishes, the benefits still outweighed the costs. I then spent the next six years staring into a maelstrom of pure chaos as I worked on a non-transactional data store. No standard for data consistency was too low. Code was a morass of conditional statements to protect against a million possible (and probable) edges where actual state didnāt line up with expected state. Job queues āworkedā by brute force, bludgeoning jobs through until they could reach a point that could be tacitly called āsuccessfulā.
I also picked up a Go habit to the point where itās now been my language of choice for years now. Working with it professionally during that time, thereās been more than a few moments where I wished I had a good framework for transactional background jobs, but didnāt find any that I particularly loved to use.
River is born
So a few months ago, Blake and I did what one should generally never do, and started writing a new job queue project built specifically around Postgres, Go, and our favorite Go driver, pgx. And finally, after long discussions and much consternation around API shapes and implementation approaches, itās ready for beta use.
Iād like to introduce River (GitHub link), a job queue for building fast, airtight applications.
Designed for generics
One of the relatively new features in Go (since 1.18) that we really wanted to take full advantage of was the use of generics. A river worker takes a river.Job[JobArgs]
parameter that provides strongly typed access to the arguments within:
type SortWorker struct {
river.WorkerDefaults[SortArgs]
}
func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
sort.Strings(job.Args.Strings)
fmt.Printf("Sorted strings: %+v\n", job.Args.Strings)
return nil
}
No raw JSON blobs. No json.Unmarshal
boilerplate in every job. No type conversions. 100% reflect-free.
Jobs are raw Go structs with no embeds, magic, or shenanigans. Only a Kind
implementation that provides a unique, stable string to identify the job as it round trips to and from the database:
type SortArgs struct {
// Strings is a slice of strings to sort.
Strings []string `json:"strings"`
}
func (SortArgs) Kind() string { return "sort" }
Beyond the basics, River supports batch insertion, error and panic handlers, periodic jobs, subscription hooks for telemetry, unique jobs, and a host of other features.
Job queues are never really done, but weāre pretty proud of the API design and initial feature set. Check out the projectās README and getting started guide.
With performance in mind
One of the reasons we like to write things in Go is that itās fast. We wanted River to be a good citizen of the ecosystem and designed it to use fast techniques where we could:
It takes advantage of pgxās implementation of Postgresā binary protocol, avoiding a lot marshaling to and parsing from strings.
It minimizes round trips to the database, performing batch selects and updates to amalgamate work.
Operations like bulk job insertions make use of
COPY FROM
for efficiency.
We havenāt even begun to optimize it so I wonāt be showing any benchmarks (which tend to be misleading anyway), but on my commodity MacBook Air it works ~10k trivial jobs a second. Itās not slow.
What's different now?
You might be thinking: Brandur, youāve had trouble with job queues in databases before. Now youāre promoting one. Why?
A few reasons. The first is, as described above, transactions are really just a really good idea. Maybe the best idea in robust service design. For the last few years Iāve been putting my money where my mouth is and building a service modeled entirely around transactions and strong data constraints. Data inconsistencies are still possible, but especially in a relative sense, they functionally donāt exist. The amount of time this saves operators from having to manually mess around in consoles fixing things cannot be overstated. Itās the difference between night and day.
Single dependency stacks
Another reason is that dependency minimization is great. Iāve written previously about how at work we run a single dependency stack. No ElastiCache, no Redis, no bespoke queueing components, just Postgres. If thereās a problem with Postgres, we can fix it. No need to develop expertise in how to operate rarely used, black box systems.
This idea isnāt unique. An interesting development in Ruby on Rails 7.1 is the addition of Solid Cache, which 37 Signals uses to cache in the same database that they use for the rest of their data (same database, but different instances of it of course). Ten years ago this wouldāve made little sense because youād want a hot cache thatād serve content from memory only, but advancements in disks (SSDs) has been so great that they measured a real world difference in the double digits (25-50%) moving their cache from Redis to MySQL, but with a huge increase in cache hits because a disk-based system allows cache space to widen expansively.
Ruby non-parallelism
A big part of our queue problem at Heroku was the design of the specific job system we were using, and Ruby deployment. Because Ruby doesnāt support real parallelism, itās commonly deployed with a process forking model to maximize performance, and this was the case for us. Every worker was its own Ruby process operating independently.
This produced a lot of contention and unnecessary work. Running independently, every worker was separately competing to lock every new job. So for every new job to work, every worker contended with every other worker and iterated millions of dead job rows every time. Thatās a lot of inefficiency.
A River cluster may run with many processes, but thereās orders of magnitude more parallel capacity within each as individual jobs are run on goroutines. A producer inside each process consolidates work and locks jobs for all its internal executors, saving a lot of grief. Separate Go processes may still contend with each other, but many fewer of them are needed thanks to superior intra-process concurrency.
Improvements in Postgres
During my last queue problems we wouldāve been using Postgres 9.4. We have the benefits of nine new major versions since then, which have brought a lot of optimizations around performance and indexes.
The most important for a queue was the addition of
SKIP LOCKED
in 9.5, which lets transactions find rows to lock with less effort by skipping rows that are already locked. This feature is old (although no less useful) now, but we didnāt have it at the time.Postgres 12 brought in
REINDEX CONCURRENTLY
, allowing queue indexes to be rebuilt periodically to remove detritus and bloat.Postgres 13 added B-tree deduplication, letting indexes with low cardinality (of which a job queue has multiple of) be stored much more efficiently.
Postgres 14 brought in an optimization to skip B-tree splits by removing expired entries as new ones are added. Very helpful for indexes with a lot of churn like a job queueās.
And Iām sure thereās many Iāve forgotten. Every new Postgres release brings dozens of small improvements and optimizations, and they add up.
Also exciting is the potential addition of a transaction timeout setting. Postgres has timeouts for individual statements and being idle in a transaction, but not for the total duration of a transaction. Like with many OLTP operations, long-lived transactions are hazardous for job queues, and itāll be a big improvement to be able to put an upper bound them.
Try it
Anyway, check out River (see also the GitHub repo and docs) and weād appreciate it if you helped kick the tires a bit. We prioritized getting the API as polished as we could (weāre really trying to avoid a /v2
), but are still doing a lot of active development as we refactor internals, optimize, and generally nicen things up.
Recommend
-
10
PostgreSQL ē Job QueuećApplication Lock 仄å Pub/Sub Hacker News Daily äøēå°äøēÆč¬ PostgreSQL å Job QueuećApplication Lock 仄å...
-
10
How to build a job queue with Rust and PostgreSQL Tue, Sep 7, 2021 (or in any other language such as Go, Node.js or python) A job queue is a central part of almost any web application. It enables bac...
-
6
-
8
Laravel 8 queue and job example tutorial Ā Ā 33371 views Ā Ā 5 months ago Laravel In the previous article, we have discussed abou...
-
8
Laravel Job Queue: Peeking Behind the Curtain (Part 1)
-
6
September 15, 2022 #redis How to implement a job queue with Redis ...
-
6
October 4, 2022 ⢠5 minute readPostgres: a better message queue than Kafka?We shipped Dagster Cloud 1.0 in August. Itās been pretty...
-
9
Conveyor MQ A fast, robust and extensible distributed task/job queue for Node.js, powered by Redis. Introduction Conveyor MQ is a general purpose, distributed task/job queue for Node.js, powered by Redis. Conveyor M...
-
7
In How to implement a job queue with Redis, we explained how to implement a job queue mechanism with Redis and the new Redis API from Quarkus. The approach explored in th...
-
10
EngineeringSQL Maxis: Why We Ditched RabbitMQ And Replaced It With A Postgres QueueEmbracing our SQL Maxi nature: how we ripped out RabbitMQ from our infrastructure and replaced it with a Postgres queue.6 mins...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK