A Simple Priority Queue With Redis in Ruby
At Reverb, we do a lot of search indexing. Anytime a listing is updated, added, or removed, we have to do an index operation to our Elastic Search Cluster. For a while, we were able to do this at near real-time by pushing our indexing work to a Sidekiq background jobs.
As we got larger, Sidekiq wasn’t working for us because of the sheer amount of index operations we were getting. Some sellers were importing 1000s of listings at a time and it was backing up other jobs in Sidekiq that had a lower priority (like sending out emails). And then there were situations where we wanted to re-index all of our listings because we, say, added another field to the Elastic Search document. Sidekiq was processing the jobs like a boss, but when you have 10s of thousands of Elastic Search index jobs suddenly inserted into Sidekiq, it takes some time to drain, which backs up all of the queues for the entire site.
What we really needed was a single queue for re-indexing our listings. But, not all listings are equal. A listing that was just updated by a user should be indexed ASAP, but listings that need to be indexed because the development team is doing a full re-index can probably wait. Fortunately, using Redis, we can create a simple priority queue to put our listings into.
For our Redis data structure, we want to use a sorted set. A sorted set is essentially like a set, but with a score. A sorted set has the following properties:
- Members are unique. In other words, if “foo” is a member and I put “foo” in again, “foo” is only in the set once.
- Each member has a score associated with it.
- When members are listed, members with lower scores are returned first.
Additionally for our queues, we have some requirements:
- When adding a new member, if there is already an existing member, we want to keep the one with the most important priority. In other words, if “foo” is put in the set score in Redis of -1 and then later I want to put “foo” in the set with a score of +1, I want to keep the score at -1 (lower scores are returned first). By default, Redis will just change the score, but we do not want that.
- We should be able to atomically pop members out of the Redis set.
For #1, we have to do some manual checking to make sure that we are not overriding the priority.
Note that there is a slight possibility here of a race condition but this is acceptable in our situation.
For #2, we just use the zpop command. Oh, wait, Redis doesn’t have a zpop command? You can’t just pop items from a sorted set in order? I guess it’s time to implement our own:
Here, we use WATCH to guarantee atomicity and to avoid the race condition of removing an element right after we add it. WATCH essentially means that if the key changes while we are watching (if something is added or removed from it), it will abort the transaction (here, anything in the multi block). This guarantees us atomicity.
Putting it all together, we have our priority queue:
Now all you need is some daemon or other process to poll the queue and to process its members. I’ll leave that exercise up to the reader :)