Source code for taskhawk.models
import enum
import time
import typing
import uuid
import arrow
from taskhawk.exceptions import TaskNotFound, ValidationError
if typing.TYPE_CHECKING:
from taskhawk.task_manager import Task # noqa # pragma: no cover
class Message:
"""
Model for Taskhawk messages. All properties of a message should be considered immutable.
"""
CURRENT_VERSION = '1.0'
VERSIONS = ['1.0']
def __init__(self, data: dict) -> None:
"""
data will look like this:
{
"id": "b1328174-a21c-43d3-b303-964dfcc76efc",
"metadata": {
"priority": "high",
"timestamp": 1460868253255,
"version": "1.0"
},
"headers": {
...
},
"task": "tasks.send_email",
"args": [
"email@automatic.com",
"Hello!"
],
"kwargs": {
"from_email": "spam@example.com"
}
}
"""
self._id = data['id']
self._metadata = data['metadata'] or {}
self._headers = data['headers']
self._task_name = data['task']
self._args = data['args']
self._kwargs = data['kwargs']
self.validate()
def validate(self) -> None:
"""
Validate that message object contains all the right things.
:raises exceptions.ValidationError: when message fails validation
"""
from taskhawk.task_manager import Task # noqa
# support string datetimes
if isinstance(self.timestamp, str):
try:
self.metadata['timestamp'] = int(arrow.get(self.timestamp).float_timestamp * 1000)
except (ValueError, arrow.parser.ParserError):
raise ValidationError
if (
not self.id
or not self.version
or self.version not in self.VERSIONS
or not self.timestamp
or self.headers is None
or not self.task_name
or self.args is None
or self.kwargs is None
):
raise ValidationError
try:
self._task = Task.find_by_name(self.task_name)
except TaskNotFound:
raise ValidationError
@classmethod
def _create_metadata(cls) -> dict:
return {'priority': Priority.default.name, 'timestamp': int(time.time() * 1000), 'version': cls.CURRENT_VERSION}
@classmethod
def new(
cls, task: str, args: tuple = None, kwargs: dict = None, msg_id: str = None, headers: dict = None
) -> 'Message':
"""
Creates Message object given type, schema version and data. This is typically used by the publisher code.
:param task: The task name
:param args: The list of args
:param kwargs: The dict of kwargs
:param msg_id: Optional message identifier. If unset, a random UUID4 will be generated.
:param headers: Optional additional headers
"""
return Message(
{
'id': msg_id or str(uuid.uuid4()),
'metadata': cls._create_metadata(),
'headers': headers or {},
'task': task,
'args': args or [],
'kwargs': kwargs or {},
}
)
def call_task(self, receipt: typing.Optional[str]) -> None:
"""
Call the task with this message
"""
self.task.call(self, receipt)
def __eq__(self, other) -> bool:
if not isinstance(other, self.__class__):
return NotImplemented
return self.as_dict() == typing.cast(Message, other).as_dict()
@property
def id(self) -> str:
return self._id
@property
def metadata(self) -> dict:
return self._metadata
@property
def timestamp(self) -> typing.Optional[int]:
return self._metadata.get('timestamp')
@property
def version(self) -> typing.Optional[int]:
return self._metadata.get('version')
@property
def priority(self) -> 'Priority':
return Priority[self._metadata['priority']]
@priority.setter
def priority(self, value: 'Priority') -> None:
self._metadata['priority'] = value.name
@property
def headers(self) -> dict:
return self._headers
@property
def task(self) -> 'Task':
return self._task
@property
def task_name(self) -> str:
return self._task_name
@property
def args(self) -> list:
return self._args
@property
def kwargs(self) -> dict:
return self._kwargs
def items(self) -> typing.ItemsView:
return self.as_dict().items()
def as_dict(self) -> dict:
return {
'id': self.id,
'metadata': self.metadata,
'headers': self.headers,
'task': self.task_name,
'args': self.args,
'kwargs': self.kwargs,
}
[docs]class Priority(enum.Enum):
"""
Priority of a task. This may be used to differentiate batch jobs from other tasks for example.
High and low priority queues provide independent scaling knobs for your use-case.
"""
default = enum.auto()
"""
This is the default priority of a task if nothing is specified. In most cases,
using just the default queue should work fine.
"""
high = enum.auto()
low = enum.auto()
bulk = enum.auto()
"""
Bulk queue will typically have different monitoring, and may be used for bulk jobs,
such as sending push notifications to all users. This allows you to effectively
throttle the tasks.
"""