Hangfire - A Tale of Several Queues
If you've used Hangfire you know it's a really quick and easy way to give your app a queue of durable background jobs, with automatic retrying and a very nifty dashboard to let you see what's happening right now. Jobs can trigger further jobs and so a complex series of processing stages can be decoupled and spill out into a queue of little units of work.
You can setup one database (such as Redis) to store the state of all your jobs, and then multiple identical workers can attach to that database and munch through the jobs, taking them through the lifecycle:
[Enqueued] -> [Processing] -> [Finished]
Or maybe:
[Enqueued] -> [Processing] -> [Failed] -> [Enqueued] -> [Processing] -> [Finished]
It's all good. Once enqueued, either the job finishes or you'll be able to see it in the dashboard and read all the lovely exceptions.
I recently hit the problem of having multiple tenants (totally separate customers in this case) executing jobs in the same Hangfire instance. This works fine until a low priority tenant floods the queue with jobs and stops higher priority work from getting done.
Under-the-Sea Analogy
You run a service that washes sea creatures. A whale stopped by because he wants the krill cleaned out of his mouth and also his tail could use a service. It's not that urgent though and you aren't charging him as he's a friend. You get started on that, but then a shark comes along (he happens to be one of your best customers) and he needs his teeth cleaned for a photo session starting in ten minutes. Oh no!
What you need are two different queues, like a fast lane and slow lane. I could have just said it's like a grocery shop with a "5 items or fewer" sign at the checkout, but I've already written all that stuff about the sea creatures and I'm not deleting it now.
Hangfire queues and their limitations
So how can Hangfire help us here? It has the concept of queues. When you start a "server" (nothing to do with a physical box or even a process: you can start multiple of them inside one process) you specify what queue(s) it will read from and how many dedicated worker threads it has. So you could start separate servers for fast
and slow
, each dedicated to their one queue.
But there's a problem. by design, Hangfire doesn't assign jobs to queues. Rather, when a job is enqueued, a queue name such as fast
can (optionally) be specified. The choice of queue is not stamped on the job, but stored as a property inside the state object representing the Enqueued
state. This means that when the job transitions to Processing
it will be picked up by the fast
server, but now nothing is storing the queue name. Why is that a problem?
Well, jobs fail sometimes, and they need to be retried. In fact this is part of what makes a system like Hangfire so valuable; occasional transient problems (a database glitch) don't stop your work from getting done. But obviously when that happens to our jobs, we want them to be enqueued for their second attempt on the same queue as last time.
Extension points
Hangfire supports several extension points, and the relevant ones here are:
- You can add your own custom properties to jobs. So it is possible to stamp the queue name on the job in a way that persists throughout its life.
- You can install a filter call a
IClientFilter
that lets you hook into the point where a job is created at the client, and get the queue name from its initial state and copy it into a custom property. This is the part that stashes the queue name. - There's another kind of filter called
IElectStateFilter
that lets you hook into the event of a job changing state. If it's changing to theEnqueued
state, you can copy the queue name from the custom property you stashed in the previous step and fix the state to have the queue name in it, thus forcing it into the right queue.
By combining these we can basically brute-force jobs into running on the queue we've stamped them with.
So the IClientFilter
might look like this:
public class QueueNameRecorder : IClientFilter
{
public void OnCreating(CreatingContext filterContext)
{
var enqueuedState = filterContext.InitialState as EnqueuedState;
if (string.IsNullOrWhiteSpace(enqueuedState?.Queue))
{
filterContext.SetJobParameter("stashedQueueName", enqueuedState.Queue);
}
}
public void OnCreated(CreatedContext filterContext) { }
}
And in the start-up of your clients:
GlobalJobFilters.Filters.Add(new QueueNameRecorder());
This means that if we enqueue a job on the fast
queue:
_backgroundJobs.Create<Toothbrush>(j => j.CleanTeeth(), new EnqueuedState("fast"));
the above OnCreating
handler will kick in and store the value fast
in a custom property stashedQueueName
, permanently associating the job with that queue.
Meanwhile in the worker processes, you need the IElectStateFilter
:
public class QueueNameFixer : IElectStateFilter
{
public void OnStateElection(ElectStateContext context)
{
var queueName = context.GetJobParameter<string>("stashedQueueName");
if (context.CandidateState.Name == "Enqueued" &&
!string.IsNullOrWhiteSpace(queueName))
{
context.CandidateState = new EnqueuedState(queueName);
}
}
}
… registered in GlobalJobFilters
exactly the same way as before, but in worker processes. And OnStateElection
will fire whenever any job is changing to any new state. Here we're only interested in jobs that are becoming Enqueued
and that have a stashedQueueName
.
What you then see in your Hangfire dashboard is a bit odd - the job will first be enqueued on the DEFAULT
queue and then immediately enqueued a second time on the correct queue.
Now you're all set to clean a fish, or whatever it was I said before!