The Inner Workings of: Arq

Arq is a job library for Python's asyncio. This article is up-to-date with Arq v0.19

The Point of Job Libraries

The main point of (what I colloquially call) a job library is, essentially, to execute a function (i.e. job) somewhere else, and potentially at a different time. When using a sync approach to web services (such as when using non-async Django or Flask), the limitations of the synchronous IO model basically require the use of a job library to execute logic outside of the context of a single request handler - if you don't want to do the logic in the scope of a request (and make the request take longer), you need to do it somewhere else, so you need a job library like Celery. A simple example might be an HTTP interface to send an email to a lot of recipients. You might not want the request to wait until all the emails have been sent to return a response since that might take a long time, so you would just schedule a job to run somewhere else to do the work.

Job libraries like Celery basically require you to run special worker processes in addition to your web handler processes, and the worker processes use a database to get instructions to run functions, and then they run them.

Now, technically, using asyncio means you don't need a job library to run logic independent of an individual request. You could simply spawn a new asyncio task (or use an in-process background job library like aiojobs) and return the request result to the caller without waiting for the task to finish. This might work for very simple cases. However, job libraries have some other benefits too.

A job library will generally enqueue the job in some kind of database or messaging queue. This makes the job durable and also allows you to potentially run the job at a later time, and this is very valuable. It also allows you to re-run the job at a later time if it fails, for a number of tries or until it succeeds. At first blush you might think you don't need a job library with a database to run a function later; you could just asyncio.sleep in the same process until the job needs to run, and then run it. This, however, is only true if you never restart your services (like for example when you deploy a new version), and they never crash. In other words, in production environments you need a job library.

Arq Interfaces

Arq has two interfaces: one for enqueing jobs, and one for executing them.

Arq uses Redis as its database. You need an instance of arq.ArqRedis to enqueue tasks, and you get one by calling arq.create_pool with your Redis settings. Assuming you have Redis running on localhost using the default port (6379), you need 3 lines of code to enqueue a job called add, to be called with parameters 1, 2.

>>> from arq import create_pool
>>> p = await create_pool()
>>> await p.enqueue_job('add', 1, 2)

enqueue_job can take some optional parameters:

  • _job_id - if omitted, this gets generated randomly but you can supply your own. It's used to potentially stop a job executing multiple times concurrently.
  • _queue_name - the name of a Redis sorted set to use as the job queue. You need to use this if you have several different services using the same Redis instance. Defaults to arq:queue.
  • _defer_until, _defer_by - used to schedule jobs for later. If omitted, the job will be executed ASAP.
  • _expires - skip executing the job if it hasn't run in this many seconds.
  • _job_try - manually specify the job try. Job try is a variable Arq uses to track how many times the job has been (re-)run.

arq.ArqRedis also has some other methods for retrieving job results and iterating over queued jobs, which are less useful in practice and inefficiently implemented:

  • get_all_job_results uses the KEYS Redis command, which is too inefficient to be used on Redis instances with a lot of keys (i.e. production instances)
  • get_all_job_results, queued_jobs use asyncio.gather on multiple Redis GETs instead of a single Redis MGET, making them inefficient for more than a handful of items

So, buyer beware.

Ok, now that we know how to enqueue jobs, we need to use the other interface to execute them. The job name (add) is the name of the function to run. We need to create an arq.Worker by providing the link to Redis and the functions we want it to run for us, and start it.

The function to run is expected to take an additional parameter, ctx, which is a dictionary with some job-specific metadata. This should be the first function parameter.

Here's a simple code snippet to get our worker up and running:

from arq.worker import run_worker

async def add(ctx, a, b):
    return a + b

class WorkerSettings:
    functions = [add]
    
run_worker(WorkerSettings)

run_worker creates the actual worker and uses asyncio to run it.

Job Queue Implementation

When you enqueue a job (await p.enqueue_job('add', 1, 2)), the following happens:

  1. A job ID is randomly generated, if not provided. This is a random 128-bit number in hex string form, such as 8ea628b67fab4ff0a115a54aed8b2495.
  2. A Redis job key is generated using the job ID. This is a string taking the form of arq:job:8ea628b67fab4ff0a115a54aed8b2495.
  3. A Redis transaction is started, watching the job key. Inside this transaction, Arq checks Redis for presence of the job key or result key (the result key is a string like arq:result:8ea628b67fab4ff0a115a54aed8b2495). If any of these exist, the enqueue operation is aborted.
  4. The job execution time is calculated as an integer, containing the number of milliseconds since the UNIX epoch. This is either the current moment, or calculated using _defer_by or _defer_until, if provided.
  5. The expiry time is calculated, either by the _expires parameter or the default duration of one day.
  6. The job is serialized into bytes so it can be stored in Redis, using the provided serializer or pickle by default. The serialized representation is a dictionary containing what you'd expect: the function name, positional and keyword arguments, the try counter and the enqueue time.
  7. The serialized job is stored at the job key (with expiry set), and the job ID is added to the queue (which is a Redis sorted set), with the score being the job execution time. The Redis transaction is executed.
  8. If the transaction executed with no errors, an arq.Job object is returned. Otherwise, None is returned.

Now let's take a look at how the worker dequeues and executes jobs.

  • The worker runs a coroutine, arq.Worker.main(), which basically runs an infinite loop of polling Redis.
  • Every poll_delay (default to 0.5) seconds, the worker does some bookkeeping (checking if it has capacity to run additional jobs), and if it does gets ready jobs (score less than now) from the queue sorted set using zrangebyscore. Then it attempts to start those jobs.

For each job Arq attempts to start, Arq does the following:

  • Calculate the in-progress Redis key, taking the form of arq:in-progress:8ea628b67fab4ff0a115a54aed8b2495.
  • Using a Redis transaction watching the in-progress key, check if the job has already been started somewhere and if it is still in the queue (If it's not in the queue, it got finished already). If any of these are true, give up on this job and continue to the next job.
  • Set the in-progress key and commit the transaction. If the transaction fails (due to it watching the progress key from before), continue to the next job since the job was started elsewhere.
  • Start the job in an asyncio task.

In the task, perform the following:

  • Fetch the job state from the job key.
  • Increment the retry key (arq:retry:8ea628b67fab4ff0a115a54aed8b2495).
  • Set the expiration on the retry key.
  • If the job is not found under the job key, the job expired. Clean up.
  • If the job retries have been exhausted, abort and clean up.
  • Create a job context to be passed in as the first argument, containing some job metadata: job_id, job_try, enqueue_time and score.
  • Run the actual function, using the configured timeout.
  • Finish using a Redis transaction:
    • If configured to keep the result, store the result under arq:result:8ea628b67fab4ff0a115a54aed8b2495. By default, the results are kept for a day.
    • Clean up the in-progress key, the retry key and the job key.
    • Remove the job from the queue.

Quite the ceremony. To recap, the Redis keys involved are:

  • The job key (arq:job:8ea628b67fab4ff0a115a54aed8b2495). Holds the serialized job. Created when the job is enqueued. Deleted when the job is done, or by Redis when it expires.
  • The in-progress key (arq:in-progress:8ea628b67fab4ff0a115a54aed8b2495). This exists while the job is being executed. Set when the job is started. Deleted when the job finishes.
  • The retry key (arq:retry:8ea628b67fab4ff0a115a54aed8b2495). This is an atomic counter for job retries. Created/incremented when a job is executed. Deleted when the job finishes or by Redis when it expires (~1 day).
  • The result key (arq:result:8ea628b67fab4ff0a115a54aed8b2495). Holds the result, along with some metadata. Set when the job is done, unless specifically configured not to.

Cron Jobs

Arq also supports cron jobs. The point of a cron job is to run a function periodically (on a schedule), and not in parallel (so only once per schedule slot). Here's a simple example, running a function once every second:

from arq.cron import cron
from arq.worker import run_worker

async def my_cron_job(ctx):
    print("I run by myself!")
    
class WorkerSettings:
    cron_jobs = [cron(my_cron_job, minute=0)]
    
run_worker(WorkerSettings)

The worker maintains a list of cron functions. Every poll iteration (~500 ms), the worker goes through this list and checks if any crons need to be enqueued.

If a cron needs to be enqueued, the job ID is generated taking the form of cron:my_cron_job:1591743780123, and a job is enqueued using the normal job queing code path. Crons don't keep results by default. Since the cron job ID is deterministic, normal Arq machinery will make sure it's only executed once even if multiple workers are being run.

Tin
Zagreb, Croatia