The django-carrot consumer backend

This module provides a backend API for creating Consumers and Consumer Sets

class carrot.consumer.Consumer(host: carrot.objects.VirtualHost, queue: str, logger: logging.Logger, name: str, durable: bool = True, queue_arguments: dict = None, exchange_arguments: dict = None)

An individual Consumer object. This class is run on a detached thread and watches a specific RabbitMQ queue for messages, and consumes them when they appear. Multiple Consumers can be linked to the same queue using a ConsumerSet object.

add_failure_callback(cb: Callable) → None

Registers a callback that gets called when there is any kind of error with the .consume() method

close_connection() → None

This method closes the connection to RabbitMQ.

connect() → pika.adapters.select_connection.SelectConnection

Connects to the broker

fail(log: carrot.models.MessageLog, err: Union[str, Exception]) → None

This function is called whenever there is a failure executing a specific MessageLog object

The exception message is logged, and the MessageLog is updated with the result

get_message_log(properties: pika.spec.BasicProperties, body: bytes) → Optional[carrot.models.MessageLog]

Finds a MessageLog based on the content of the RabbitMQ message

By default, carrot finds this retrieving the MessageLog UUID from the RabbitMQ message properties.message_id attribute.

This method can be extended by custom consumers. For example, if you are attempting to consume from a RabbitMQ queue containing messages that do not come from your Carrot instance, you may want to extend this method to create, instead of get, a MessageLog object

The body parameter is not used here but is included in as in some cases it is useful for customer Consumer objects

Note

This method does not use self.get_task_type as the intention is to get the MessageLog object before the consume method tries to do anything else. This means that if any later part of the process fails, the traceback and exception information can be stored to the MessageLog object for easier debugging.

Warning

If this method fails to find a matching MessageLog object, then the RabbitMQ message will be rejected. Depending on the configuration of your RabbitMQ queue, this may cause a loss of data. If you are implementing a custom consumer, then you should use dead letter exchange to preserve your message content

get_task_type(properties: pika.spec.BasicProperties, body: bytes) → str

Identifies the task type, by looking up the attribute self.task_type in the message properties

The parameter body is not used here - However, it is included as in some cases it is useful when extending the Consumer class

on_bind(*args) → None

Invoked when the queue has been successfully bound to the exchange

Parameters are require to match the signature used by Pika but are not required by Carrot

on_cancel(*args) → None

Invoked when the channel cancel is completed.

Parameters provided by Pika but not required by Carrot

on_channel_closed(channel: pika.channel.Channel, reply_code: int, reply_text: str) → None

Called when the channel is closed. Raises a warning and closes the connection

Parameters are require to match the signature used by Pika but are not required by Carrot

on_channel_open(channel: pika.channel.Channel) → None

This function is invoked when the channel is established. It adds a callback in case of channel closure, and establishes the exchange

on_connection_closed(*args) → None

Callback that gets called when the connection is closed. Checks for the self.shutdown_requested parameter first, which is used to idenfity whether the shutdown has been requested by the user or not. If not, carrot attempts to reconnect

All arguments sent to this callback come from Pika but are not required by Carrot

on_connection_open(connection: pika.adapters.select_connection.SelectConnection) → None

Callback that gets called when the connection is opened. Adds callback in case of a closed connection, and establishes the connection channel

The connection parameter here is not used, as self.connection is defined elsewhere, but is included so that the signature matches as per Pika’s requirements

on_consumer_cancelled(method_frame: pika.frame.Method) → None

Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer receiving messages.

on_exchange_declare(*args) → None

Invoked when the exchange has been successfully established

Parameters are require to match the signature used by Pika but are not required by Carrot

on_message(channel: pika.channel.Channel, method_frame: pika.frame.Method, properties: pika.spec.BasicProperties, body: bytes) → None

The process that takes a single message from RabbitMQ, converts it into a python executable and runs it, logging the output back to the associated carrot.models.MessageLog

on_queue_declare(*args) → None

Invoked when the queue has been successfully declared

Parameters are require to match the signature used by Pika but are not required by Carrot

reconnect() → None

Reconnect to the broker in case of accidental disconnection

run() → None

Process starts here

serializer

alias of carrot.objects.DefaultMessageSerializer

start_consuming() → None

The main consumer process. Attaches a callback to be invoked whenever there is a new message added to the queue.

This method sets a channel prefetch count of zero to prevent dropouts

stop() → None

Cleanly exit the Consumer

stop_consuming() → None

Stops the consumer and cancels the channel

class carrot.consumer.ConsumerSet(host: carrot.objects.VirtualHost, queue: str, logger: logging.Logger, concurrency: int = 1, name: str = 'consumer', consumer_class: str = 'carrot.consumer.Consumer')

Creates and starts 1 or more .Consumer objects. All consumers must belong to the same queue

static get_consumer_class(consumer_class: str) → Type[carrot.consumer.Consumer]

Returns a Consumer object from a string using dynamic imports

start_consuming() → None

Creates a thread for each concurrency level, e.g. if concurrency is set to 5, 5 threads are created.

A Consumer is attached to each thread and is started

stop_consuming() → None

Stops all running threads. Loops through the threads twice - firstly, to set the signal to False on all threads, secondly to wait for them all to finish

If a single loop was used here, the latter threads could still consume new tasks while the parent process waited for the earlier threads to finish. The second loop allows for quicker consumer stoppage and stops all consumers from consuming new tasks from the moment the signal is received

class carrot.consumer.ListHandler(thread_name: str, level: int)

A logging.Handler that records each log entry to a python list object, provided that the entry is coming from the correct thread.

Allows for task-specific logging

emit(record: logging.LogRecord)

Do whatever it takes to actually log the specified logging record.

This version is intended to be implemented by subclasses and so raises a NotImplementedError.

class carrot.consumer.LoggingTask(task: Callable, logger: logging.Logger, thread_name: str, *args, **kwargs)

Turns a function into a class with run() method, and attaches a ListHandler logging handler