Taskhawk documentation

TaskHawk is a replacement for celery that works on AWS SQS/SNS, while keeping things pretty simple and straight forward. Any unbound function can be converted into a TaskHawk task.

Only Python 3.6+ is supported currently.

This project uses semantic versioning.

Quickstart

Quickstart

Getting started with Taskhawk is easy, but requires a few steps.

Installation

Install the latest taskhawk release via pip:

$ pip install taskhawk

You may also install a specific version:

$ pip install taskhawk==v1.0.0

The latest development version can always be found on Github

Configuration

Before you can use Taskhawk, you need to set up a few settings. For Django projects, simple use Django settings to configure Taskhawk, for non-Django projects, you must declare an environment variable called SETTINGS_MODULE that points to a module where settings may be found.

Required settings are:

AWS_ACCESS_KEY = <YOUR AWS KEY>
AWS_ACCOUNT_ID = <YOUR AWS ACCOUNT ID>
AWS_REGION = <YOUR AWS REGION>
AWS_SECRET_KEY = <YOUR AWS SECRET KEY>

TASKHAWK_QUEUE = <YOUR APP TASKHAWK QUEUE>

Provisioning

Taskhawk works on SQS and SNS as backing queues. Before you can publish tasks, you need to provision the required infra. This may be done manually, or, preferably, using Terraform. Taskhawk provides tools to make infra configuration easier: see Terraform and Taskhawk Terraform Generator for further details.

Using Taskhawk

To use taskhawk, simply add the decorator taskhawk.task() to your function:

@taskhawk.task
def send_email(to: str, subject: str, from_email: str = None) -> None:
    # send email

And then dispatch your function asynchronously:

send_email.dispatch('example@email.com', 'Hello!', from_email='example@spammer.com')

Tasks are held in SQS queue until they’re successfully executed, or until they fail a configurable number of times. Failed tasks are moved to a Dead Letter Queue, where they’re held for 14 days, and may be examined for further debugging.

Priority

Taskhawk provides 4 priority queues to use, which may be customized per task, or per message. For more details, see taskhawk.Priority.

Usage

Usage Guide

Tasks

Add taskhawk.task() decorator to convert any unbound function into an async task, as shown here:

@taskhawk.task
def send_email(to: str, subject: str, from_email: str = None) -> None:
    # send email

Optionally, pass in priority=taskhawk.Priority.high to mark the task as a high priority task.

If your task function accepts an kwarg called metadata (of type dict) or **kwargs, the function will be called with a metadata parameter as a dict with the following attributes:

id: task identifier. This represents a run of a task.

priority: the priority a task was dispatched with. This will be same as task’s priority, unless priority was customized on dispatch.

receipt: SQS receipt for the task. This may be used to extend message visibility if the task is running longer than expected using taskhawk.extend_visibility_timeout.

timestamp: task dispatch epoch timestamp (milliseconds)

version: message format version. Currently can only be 1.

If your task function accepts an kwarg called headers (of type dict) or **kwargs, the function will be called with a headers parameter which is dict that the task was dispatched with.

Publisher

You can run tasks asynchronously like so:

send_email.dispatch('example@email.com', 'Hello!', from_email='example@spammer.com')

If you want to include a custom headers with the message (for example, you can include a request_id field for cross-application tracing), or you want to customize priority, you can customize a particular task invocation using chaining like so:

send_email.with_headers(request_id='1234')\
          .with_priority(taskhawk.Priority.high)\
          .dispatch('example@email.com')

Consumer

A consumer for SQS based workers can be started as following:

taskhawk.listen_for_messages(taskhawk.Priority.high)

This is a blocking function, so if you want to listen to multiple priority queues, you’ll need to run these on separate processes (don’t use threads since this library is NOT guaranteed to be thread-safe).

A consumer for Lambda based workers can be started as following:

taskhawk.process_messages_for_lambda_consumer(lambda_event)

where lambda_event is the event provided by AWS to your Lambda function as described here.

Configuration

Add appropriate configuration to the app. If not using a Django app, ensure that SETTINGS_MODULE is defined to the path of a module where all settings can be found.

AWS_REGION

AWS region

required; string

AWS_ACCOUNT_ID

AWS account id

required; string

AWS_ACCESS_KEY

AWS access key

required; string

AWS_CONNECT_TIMEOUT_S

AWS connection timeout

optional; int; default: 2

AWS_READ_TIMEOUT_S

AWS read timeout

optional; int; default: 2

AWS_SECRET_KEY

AWS secret key

required; string

AWS_SESSION_TOKEN

AWS session token that represents temporary credentials (for example, for Lambda apps)

optional; string

IS_LAMBDA_APP

Flag indicating if this is a Lambda app

optional; string; default: False

TASKHAWK_DEFAULT_HEADERS

A function that may be used to inject custom headers into every message, for example, request id. This hook is called right before dispatch, and any headers that are explicitly specified when dispatching may override these headers.

If specified, it’s called with the following arguments:

default_headers(task=task)

where task is the task function, and its expected to return a dict of strings.

It’s recommended that this function be declared with **kwargs so it doesn’t break on new versions of the library.

optional; fully-qualified function name

TASKHAWK_MAX_DB_REUSE_LOOPS

Number of loops before database connections are recycled. Only applies to Django apps.

optional; int; default: 5

TASKHAWK_PRE_PROCESS_HOOK

A function which can used to plug into the message processing pipeline before any processing happens. This hook may be used to perform initializations such as set up a global request id based on message headers. If specified, this will be called with the following arguments for SQS apps:

pre_process_hook(queue_name=queue_name, sqs_queue_message=sqs_queue_message)

where sqs_queue_message is of type boto3.sqs.Message. And for Lambda apps as so:

pre_process_hook(sns_record=record)

where sns_record is a dict of a single record with format as described in lambda_sns_format.

It’s recommended that this function be declared with **kwargs so it doesn’t break on new versions of the library.

optional; fully-qualified function name

TASKHAWK_QUEUE

The name of the taskhawk queue (exclude the TASKHAWK- prefix).

required; string

TASKHAWK_SYNC

Flag indicating if Taskhawk should work synchronously. This is similar to Celery’s Eager mode and is helpful for integration testing.

optional; default False

TASKHAWK_TASK_CLASS

The name of a class to use as Task class rather than the default taskhawk.Task. This may be used to customize the behavior of tasks.

optional; fully-qualified class name

API reference

Release Notes

Current version: v1.0.5-dev

v1.0

  • Initial version

Taskhawk Migration Guide

CELERY → v1

Assuming publishers and workers are completely independent processes:

  1. Remove all celery task decorators from your task functions and replace them with taskhawk.task().
  2. Remove all celery related settings from your project.
  3. Provision infra required for taskhawk using taskhawk_terraform and taskhawk_terraform_generator, or manually.
  4. Add new processes for workers on each priority queue that your app publishes to (not all queues may be relevant for your app).
  5. Deploy Taskhawk worker processes (not publishers).
  6. Verify that Taskhawk workers pick up message by sending a test message.
  7. Deploy publisher processes.
  8. Let Celery queues drain to 0.
  9. Terminate Celery worker processes.

If Celery workers also publish async tasks:

  1. Remove all celery task decorators from your task functions and replace them with taskhawk.task().
  2. Remove all celery related settings from your project.
  3. Provision infra required for taskhawk using taskhawk_terraform and taskhawk_terraform_generator, or manually.
  4. Add new processes for workers on each priority queue that your app publishes to (not all queues may be relevant for your app).
  5. Deploy a test TaskHawk worker process.
  6. Verify that Taskhawk workers pick up message by sending a test message.
  7. Double publish to both Taskhawk and Celery in Celery workers.
  8. Deploy Taskhawk worker processes (not other publishers).
  9. Deploy other publisher processes.
  10. Remove double publish in Celery workers.
  11. Deploy Celery workers.
  12. Let Celery queues drain to 0.
  13. Terminate Celery worker processes.

Indices and tables