6

A prioritised micro-batch scheduler in rust

 2 years ago
source link: https://njk.onl/blog/gaffer.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

A prioritised micro-batch scheduler in rust

A prioritised micro-batch scheduler in rust

2021-11-25

In this entry I'm going to go through the scheduling problem we had with routing at Surve and how I went about solving it by building the micro-batching scheduler library gaffer.

The problem

Surve Mobility provides full-service fleet operations for shared mobility providers, we receive tasks from our customers, for things like charging, cleaning, refilling consumables, etc. Depending on the customer and the task, these are received one-by-one over the course of the day, in daily batches, or in rare cases in monthly batches. Our agents then travel around the city, by foot, with customer vehicles and by public transport fulfilling these tasks. Surve's app delivers tasks to agents, navigates to the tasks, and provides access to actions such as unlocking the doors.

We have a routing algorithm which plans tours of tasks for each active agent, optimising for distance to travel, task priority and deadline, and maintaining invariants such as the tasks an agent is equipped to do, what time of the day they can be done, and whether resources such as charging points are available to use.

A lot of times we can allow the tours to be a bit stale, but after some operations the user is waiting to see the result, in cases such as changing priority levels and rejecting tasks, the user is left looking at stale data until a reroute completes.

The routing algorithm can take a while to run, with around 10% taking more than 0.5 seconds and it puts quite a bit of load on our database. The long transaction and the large number of rows being written to and read from meant that the algorithm was run with a global lock to avoid serialisation failures. This had worked when the number of tasks and agents were low and the algorithm was simple. Then we grew, we got more tasks and more agents and the algorithm became more sophisticated. And this global lock was making the app too slow for those cases where the user is waiting.

The opportunities

There are some optimisation opportunities with this problem which meant that by improving the scheduling we could obtain much more reliable performance and we could (in the short term) avoid having to worry about optimising the routing algorithm itself.

Sharding

The first thing to notice about this problem is that it shards well by business area (cities). When an agent starts work in Hamburg, the effect of this is limited to tours in Hamburg. So we can effectively run multiple reroutes at the same time as long as they are in different cities. Sharding the problem means that app performance wont degrade as we service more cities.

(Micro-)batching

Our routing algorithm is not an iterative one where new tasks are inserted, it looks at all the tasks and optimises all tours to service them. So we only need to run the routing often enough that the data is correctly viewed. If we get 100 changes to the state in one city, we only need one run of the algorithm for the tours to be correct. This is particularly useful when we get a large batch of thousands of orders or if several users are changing things whilst a slow reroute is running - they will only need to wait until the end of the next reroute.

Differing priority levels

User waiting: The first priority is for situations such as that a coordinator has prioritised a task, or an agent marks that a task needs to be skipped, in these cases the coodinator or agent can't continue their job until they see the outcome of the reroute. It should be run as soon as possible. In order to do that we can keep reserved compute capacity for this priority level, so that when a reroute at this level is needed it can usually be scheduled immediately.

Invalidated data: The second priority is when there is new information which needs to be taken into account, but no one is particularly waiting on it. This is things like new tasks arriving from automated external systems, they may be sending more tasks directly afterwards and so we can wait a little.

Timeout: The last is just to pick up any other changes in the input which don't trigger the above, it sets an upper bound on how stale the data can be.

The solution

I looked at existing schedulers but didn't find any which had features which allowed us to take advantage of these optimisation opportunities, so I took this as an opportunity to build a new scheduler and publish my first rust crate, deciding that it needed this set of features:

  • Job enqueuing : jobs can be enqueued from other threads using a Clone + Send sender.
  • Recurring jobs : in case a job is not enqueued by some maximum timeout, a job can be automatically enqueued.
  • Futures : other threads which enqueue jobs should be able to await the results.
  • Prioritisation : the job queue is executed in priority, followed by submission order
  • Job merging : any jobs with the same effect should be merged in the queue to avoid unnecessary load. This is probably the most important feature and it brings with it some extra challenges for the other features:
    • merged jobs with differing priority levels must be merged into the highest priority level
    • merged jobs with futures awaiting them need to awaken both with the result of the combined job, so the result needs to implement `Clone` so that it can be provided to both.
  • Parallel execution : multiple worker threads are processing the jobs
  • Concurrent exclusion : but some jobs need to acquire some lock and so can't run at the same time as each other
  • Priority throttling : and to leave capacity ready for higher priority jobs we can limit the number of threads which run the lower priority jobs.

The result was gaffer, check out the documentation for specific examples and how to use it. Here I'll talk more about some of the challenges of combining these features as well as other design decisions.

SkipIterator

The core part was a PriorityQueue with a merge function, but the additional requirement for concurrent exclusion means that the job that's first in the queue might not be allowed to run, but another job further back in the queue might be able to. We need to inspect each item in the queue until we find one that is not excluded, but we can't remove them and put them back again as then they would be in the wrong order, so we want to borrow them. For this I found it a useful abstraction to have an iterator which can be used in a combination of consuming and non-consuming.


/// Like an iterator, call `.maybe_next()` to get the next item.
/// Unlike an iterator, only removes the item from the backing
/// collection when calling `.into_inner()` on the returned
/// `SkipableNext`. This way you can combine the behaviour of
/// `.iter()` and `.into_iter()`, `SkipIterator` is equivalent
/// to `.iter()` if `.into_inner()` is never called and
/// equivalent to `into_iter()` if `into_inner()` is always
/// called.
pub trait SkipIterator {
    type Item;
    // as `maybe_next` holds onto the mutable borrow (`'_`) it
    // can't be called again until the `SkipableNext` is dropped
    fn maybe_next(&mut self) -> Option<SkipableNext<'_, Self>> {
        if self.has_next() {
            Some(SkipableNext {
                iter: self,
                taken: false,
            })
        } else {
            None
        }
    }

    /// does the iterator have a next item, if true, `peek()`
    /// and `take()` must return `Some(T)`
    fn has_next(&self) -> bool;
    /// a reference to the next item
    fn peek(&self) -> Option<&Self::Item>;
    /// remove and return the next item
    fn take(&mut self) -> Option<Self::Item>;
    /// skip the next item
    fn skip(&mut self);
}

/// Wraps an item produced in the iteration
pub struct SkipableNext<'i, I: SkipIterator + ?Sized> {
    iter: &'i mut I,
    taken: bool,
}

/// `Drop` checks whether the element was taken, if not it will
/// skip
impl<'i, I: SkipIterator + ?Sized> Drop for SkipableNext<'i, I>
{
    fn drop(&mut self) {
        if !self.taken {
            self.iter.skip()
        }
    }
}

/// `Deref` allows handy borrow access to the item so we can
/// inspect it
impl<'i, I> Deref for SkipableNext<'i, I>
where
    I: SkipIterator,
{
    type Target = I::Item;
    fn deref(&self) -> &I::Item {
        self.iter.peek().unwrap()
        // `SkipableNext` is only created after a `has_next()`
        // check, so safe to unwrap
    }
}

impl<'i, I: SkipIterator> SkipableNext<'i, I> {
    /// `into_inner` To consume the item
    pub fn into_inner(mut self) -> I::Item {
        self.taken = true;
        self.iter.take().unwrap() 
        // `SkipableNext` is only created after a `has_next()`
        // check, so safe to unwrap
    }
}
    

Queue Synchronisation

The queue needs to be used by multiple producers and a consumer but the queue itself doesn't allow concurrent mutation. So I went for using a crossbeam mpsc channel as the input to the queue, the crossbeam sender can be cloned and shared with various threads and the producers wouldn't be blocked sending to the queue. This meant that I could leave all the queue processing to the consumer thread. There is no need for multiple threads to be consuming at the same time, so a Mutex is perfect for this. There are 2 cases to consider from the side of the consumer (the job runner):

  • If there is already something in the queue that could be worked on, then we should just check the crossbeam channel to see if there is anything else ready on it to be added to the queue which might be a higher priority
  • If there is nothing to work on in the queue, then the consumer actually wants to block until something arrives (or some timeout passes)

impl<T: Prioritised> Receiver<T> {
    /// Waits up to `timeout` for the first message, if none are
    /// currently available, if some are available (and
    /// `wait_for_new` is false) it returns immediately
    pub fn process_queue_timeout(
        &mut self,
        timeout: Duration,
        wait_for_new: bool,
        mut cb: impl FnMut(&T),
    ) {
        let has_new = self.process_queue_ready(&mut cb);
        if !has_new &&
            (wait_for_new || self.queue.lock().is_empty()) {

            match self.recv.recv_timeout(timeout) {
                Ok(item) => {
                    cb(&item);
                    self.queue.lock().enqueue(item);
                }
                Err(RecvTimeoutError::Timeout) => {}
                Err(RecvTimeoutError::Disconnected) => {
                    thread::sleep(timeout);
                }
            }
        }
    }

    /// Processes things ready in the queue without blocking
    pub fn process_queue_ready(
            &mut self,
            mut cb: impl FnMut(&T)
    ) -> bool {
        let mut has_new = false;
        let mut queue = self.queue.lock();
        for item in self.recv.try_iter() {
            cb(&item);
            queue.enqueue(item);
            has_new = true;
        }
        has_new
    }

    /// iterator over the currently available messages in
    /// priority order, any items not iterated when the iterator
    /// is dropped are left (uses the `SkipIterator`)
    pub fn drain(
        &mut self
    ) -> super::Drain<T, MutexGuard<'_, PriorityQueue<T>>> {
        PriorityQueue::drain_deref(self.queue.lock())
    }
}
    

The supervisor

Due to the concurrent exclusion and the priority throttling, we couldn't use parallel job scheduling patterns such as work-stealing, instead we need serial job scheduling, but parallel runners. To me, this meant that a single supervisor would need to pull from the queue and start the work among several workers. Wanting to follow rust's zero-cost ideal, I didn't like the idea of needing an extra thread for the supervisor which wouldn't be used for actually running jobs, so I went for a design whereby at most one of the workers can become a supervisor at a time when they don't have work to do. This is natural, as when all the workers are busy, there is no supervision to do. When a worker becomes a supervisor, it waits for a job to arrive, if there aren't any already, then it assigns jobs to any available workers, once all workers are occupied, it becomes a worker itself and self-assigns a job.

There is a lot of code involved in the supervisor state machine, but a core part is the assignment of jobs, which ties together several of gaffer's features.


// using the `SkipIterator` from above
while let Some(job) = jobs.maybe_next() {
    // check concurrency limit for this job's priority level for
    // the priority throttling feature
    if let Some(max_concurrency) =
            (self.concurrency_limit)(job.priority()) {
        if working_count as u8 >= max_concurrency {
            continue;
        }
    }
    // check exclusions
    if exclusions.contains(&job.exclusion()) {
        continue;
    }
    // the job will be scheduled now
    working_count += 1;
    exclusions.push(job.exclusion());
    let mut job = job.into_inner();
    // find an available worker
    loop {
        if let Some(worker) = workers_iter.next() {
            if let WorkerState::Available(send) = worker {
                let exclusion = job.exclusion();
                // send job to worker
                if let Err(SendError(returned_job)) =
                        send.send(job) {
                    // if a worker has died, the rest of the
                    // workers can continue
                    job = returned_job;
                } else {
                    // update state machine about worker
                    *worker = WorkerState::Working(exclusion);
                    break;
                }
            } else {
                continue;
            }
        } else {
            // no available worker for this job, supervisor to
            // become worker
            workers[self.worker_index] =
                WorkerState::Working(job.exclusion());
            return Some(job);
        }
    }
}
    

Panic recovery

The last bit of code I'd like to show is just a simple panic recovery. In case a job panics, I don't want it to kill a runner, so the runner has a Drop implementation which clone's the runner and spawns a new thread to run it.


impl<J, R> Drop for Runner<J, R>
where J: Job + 'static,
      R: RecurringJob<J> + Send + 'static {
    fn drop(&mut self) {
        if thread::panicking() {
            // spawn another thread to take over
            let runner = self.clone();
            thread::Builder::new()
                .name(
                    format!("gaffer#{}", self.state.worker_index))
                .spawn(move || {
                    runner.panic_recover();
                })
                .unwrap();
        }
    }
}
    

The retrospective

The number of features for this scheduler is quite large, and it was particularly useful to us because we wanted to use all of them, additionally some of the features interact with each other in complex ways which mean that if there is something you don't want to use this might not be a good solution for you. I prefer it when software can be composed, rather than being an all-in system. So possible future work would be to break this up into a few separate things.

One part would be the priority queue with merging, you would use this if you had use for job merging but if not you might want to use another priority queue, or if you don't need prioritisation, you might use a Vec.

Another part would be scheduling the recurring jobs, in our case we already have a timer in our app which could be used to enqueue these jobs, we would just need to reset a timer each time a job was enqueued.

The Future's used here are promises which could have general usage for other purposes and are not specific to this use case, apart from the support for merging. They possibly belong somewhere else.

For the runner, it would be nice to be able to run on existing threads in your application, or to run on an async scheduler if that's what your application is using. The runner would need to take care of the concurrent exclusion and priority throttling, but these are a separate problem from the other parts.

Check it out on github, crates.io or docs.rs


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK