10

Priority queue in Redis aka ZPOP

 3 years ago
source link: http://charlesnagy.info/it/python/priority-queue-in-redis-aka-zpop
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Redis list is great to use it as a processing queue. So great that it sort of became the standard and many people using that instead of dedicated queues such as ZeroMQ or RabbitMQ because it also gives further features that are useful. It’s main idea is to provide programming structures that you would normally use in any language. Variables, lists, sets, hashes (dictionaries), etc. But it can do even more than that by taking advantage of the functions it provides.

The use case

Sometimes it’s useful to not process items in the order of inserts and/or only necessary to process once but it may be put more than once in the queue. I faced with this problem when working with Django signals. Let’s say I want to do something with my parent class on child object being saved. That’s a very common use case for the post_save signal. Now there are usually more than one child class which has a foreign key to the same parent object. I actually don’t need to process every time an object is saved. If you’re familiar with Django imagine an admin form with inline related objects. Every time I would click save my main object would be updated N + 1 times in the database (N is the number of related objects) absolutely unnecessarily. I can do that a) in the background b) only once.

Priority queue

Sorted sets (ZSET) provides perfect way to remedy both the situation. Let’s say I have a process or cron job picks up items from the queue but instead of increasing the queue size everytime a new child class is saved it simply increases the priority for the parent class to be processed. You only need three commands to do that: ZINCRBY, ZREVRANGE, ZREM

Redis command line
Shell
redis 127.0.0.1:6379[5]> type prio_list
redis 127.0.0.1:6379[5]> ZINCRBY prio_list 1 test1
redis 127.0.0.1:6379[5]> ZINCRBY prio_list 1 test1
redis 127.0.0.1:6379[5]> ZINCRBY prio_list 1 test1
redis 127.0.0.1:6379[5]> ZINCRBY prio_list 1 test2
redis 127.0.0.1:6379[5]> ZINCRBY prio_list 1 test3
redis 127.0.0.1:6379[5]> ZINCRBY prio_list 1 test4
redis 127.0.0.1:6379[5]> ZRANGE prio_list 0 -1
1) "test2"
2) "test3"
3) "test4"
4) "test1"
redis 127.0.0.1:6379[5]> ZREVRANGE prio_list 0 0
1) "test1"
redis 127.0.0.1:6379[5]> ZREM prio_list test1
(integer) 1
redis 127.0.0.1:6379[5]> ZREVRANGE prio_list 0 -1
1) "test4"
2) "test3"
3) "test2"

Now you have a perfect priority queue implementation using basic redis commands.

ZREM vs ZREMRANGEBYRANK

ZREM has the time complexity (quoting redio.io):

O(M*log(N)) with N being the number of elements in the sorted set and M the number of elements to be removed.

ZREMRANGEBYRANK’s time complecity:

O(log(N)+M) with N being the number of elements in the sorted set and M the number of elements removed by the operation.

So hypothetically the latter is better because it’s the sum of the log(N) and M while ZREM has the complexity of the product of these two but in our case it doesn’t matter since we only pop one element a time from the queue it won’t make any different.

In python

Python PriorityQueue implementation
Python
import redis
class PriorityQueue(object):
    def __init__(self, queue):
        self.queue = queue
        self._r = redis.StrictRedis(host='localhost', port=6379, db=5)
    def push(self, item):
        return self._r.zincrby(self.queue, item, 1)
    def pop(self):
            _item = self._r.zrevrange(self.queue, 0, 0)[0]
            if self._r.zrem(self.queue, _item) == 1:
                # We manager to pop the item from the queue
                return _item
            else:
                # Somebody else also got the same item and removed before us
                # Try again
                return self.pop()
        except IndexError:
            # Queue is empty

Notice that we don’t need any special locking or concurrency control because we check the return value of zrem command. If it managed to remove the element it returns 1. If it didn’t it means we had a race condition and combody removed the element before our thread/process/script. In that case we retry.

Removing the top element from the sorted set is also referred to as ZPOP.  If you prefer a single eval command there is a Lua implementation also available on redisgreen: https://www.redisgreen.net/library/zpop.html

Update (2016-02-07) based on Itamar’s comment:

The above implementation uses recursion in the assumption that contention will be a rare case. However with high concurrency or fast processing time (we did the whole thing to offload some of the heavier updates from the web serving time to a background process) the probability of such collisions increases and then a while loop is a better option than a recursive call.

A while loop version would look like this:

While loop version instead of recursion
Python
class PriorityQueue(object):
    def __init__(self, queue):
        self.queue = u'amq:pq:%s' % queue
        self._r = redis.StrictRedis(host='localhost', port=6379, db=5)
    def push(self, item):
        return self._r.zincrby(self.queue, item, 1)
    @property
    def first(self):
        return self._r.zrevrange(self.queue, 0, 0)[0]
    def pop(self):
            _item = self.first
            while self._r.zrem(self.queue, _item) == 0:
                # Somebody else also got the same item and removed before us
                # Try again
                _item = self.first
            # We manager to pop the item from the queue
            return _item
        except IndexError:
            # Queue is empty

You might like these too


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK