Queueing systems

Queueing systems manage and process queues of jobs. The presence of a queue implies that something cannot be done immediately and one has to wait until some resource is available. When you respond to an HTTP request you usually want to do it interactively, that is, you want to respond within some reasonable amount of time. What can prevent you from doing this? Various things: external data sources, long-running tasks. Your request might consume too much memory and the system might decide to swap out something, which is time-consuming.

From web app’s point of view external data sources are out of influence: there is no way you can make external data sources respond faster other than by making external data sources respond faster. For example, if you are unsatisfied with how long a MySQL database responds to certain query, fixing this requires changing something in the database. Fixing the query would produce a new query, which is a different story.

When there is a long running task, however, and it cannot be redesigned to run interactively, it can be queued into a queueing system for later processing. Queueing systems allow you to enqueue a task and expect control immediately returned back to you. Your web app can go on with doing other stuff. When the queueing system finds available worker suitable for your task, it passes your task’s data to that worker. Worker then does the things you requested and returns a result. After that the queueing system marks the task as completed and forgets about it.

Should the worker die while executing a task, queueing system might notice it through some mechanism. Perhaps it waits on a TCP connection with the worker, ready to restart if connection is lost…

Another reason for farming out jobs using queueing systems is excessive memory usage. This is because jobs that require excessive amount of memory are not scalable: only limited number of simultaneous jobs of this kind can be processed at a time.

A job that requires excessive amount of memory is not a show stopper by itself: even fully loaded web servers usually have a lot of spare RAM available for batch processing. This is where queueing systems come into play: jobs that requires excessive amount of memory will be taken out of the web server and processed sequentially without eating up all available memory.

Now, from theory to practice. The first queueing system I worked with that functions in the way described above is Oracle Advanced Queuing feature of Oracle Database. It’s an industry-ready system with many interesting features. In Oracle Advanced Queuing a client that dequeues something becomes a worker, any other client can enqueue something for background processing. A job is not removed from the queue until worker issues COMMIT, so the entire background job is consistent and re-doable.

In Oracle Advanced Queuing queues can run on a single server or propagate to other servers via database links. In this way you can build a system with arbitrary complexity.

We are looking for a simpler solution, however, that doesn’t require an entire Oracle Database server. A system of my choice is Gearman: simple, fast, reliable, with few dependencies. Gearman can use different backends as storages for its queues: memory, sqlite, mysql, memcached, Postgres.

Internally Gearman is implemented as an event-based server. It has a simple protocol for which you can easily make a custom implementation. Gearman has a simple API for various languages: PHP (via PEAR module), Perl, C, Java, Python. Without exaggeration I tried all of them (except for Python).

I tried Gearman in 2 big project and another smaller one and I can say that Gearman is production-ready. Gearman is almost never a problem, it’s a reliable gear in your workflow. Gearman is scalable — you can set up multiple instances of it and submit jobs into all of them.

Now let’s try Gearman in action: let’s write a simple perl script that resizes images to pre-defined size. Notice that unlike web in apps, in background jobs invoking other processes is completely okay, so we’ll just invoke convert utility from ImageMagic package to do all the job for us.

Suppose we want to resize images to avatar size: 64×64. First, let’s write a subroutine that invokes convert:

use warnings;
use strict;
use Gearman::Worker;
sub resize_image {
    my $path = $_[0]->arg;
    my $outpath = $path;
    $outpath =~ s/^(.*)\.(.*)$/$1_thumb.$2/;
    print "processing $path -> $outpath\n";
    my @args = ("/usr/bin/convert", $path, "-resize", "64x64", $outpath);
    system(@args) == 0 or die "system @args failed: $?"
}

Here we grab the job’s argument and assume it’s a path to the original image. We add “_thumb” suffix to create thumbnail path and simply invoke convert.

my $worker = Gearman::Worker->new;
$worker->job_servers('127.0.0.1:4730');
$worker->register_function('resize' => \&resize_image);
$worker->work while 1;

In order to create a Gearman worker we instantiate a Gearman::Worker object. We use job_servers method in order to set the address of Gearman server. The method register_function is used to register code that will be associated with certain function. Functions in Gearman are purely symbolic, they don’t have to match with function names in your language. Finally we call $worker->work in order to start our worker.

To enqueue a job Gearman command line client can be used:

gearman -f resize -b "/home/valery/domain_pict.png"

And worker can be started from command line:

$ perl resize_worker.pl 
processing /home/valery/domain_pict.png -> /home/valery/domain_pict_thumb.png
processing /home/valery/back.jpg -> /home/valery/back_thumb.jpg
^C

As every Gearman worker can simultaneously be a Gearman client, jobs can be chained. A worker can also split a work unit into smaller ones and submit a job for each of them. Using an external database it is possible to track status of individual work units or the progress of the job as a whole.

Occasionally you might find that almost all of your jobs have depended jobs. Lack of dependency tracking is a disadvantage of Gearman, but it can be easily implemented via an external database as I said.

This entry was posted in scalability. Bookmark the permalink.

Comments are closed.