The django-carrot consumer backend

This module provides a backend API for creating Consumers and Consumer Sets

class carrot.consumer.Consumer(host, queue, logger, name, durable=True, queue_arguments=None, exchange_arguments=None)

An individual Consumer object. This class is run on a detached thread and watches a specific RabbitMQ queue for messages, and consumes them when they appear. Multiple Consumers can be linked to the same queue using a ConsumerSet object.

close_connection()

This method closes the connection to RabbitMQ.

connect()

Connects to the broker

Return type:pika.SelectConnection
fail(log, err)

This function is called if there is any kind of error with the .consume() function

Parameters:
  • log (MessageLog) – the associated MessageLog object
  • err (str) – the exception

The exception message is logged, and the MessageLog is updated with the result

get_message_log(properties, body)

Finds and returns the carrot.models.MessageLog object associated with a RabbitMQ message

By default, carrot finds this retrieving the MessageLog UUID from the RabbitMQ message properties.message_id attribute.

This method can be extended by custom consumers. For example, if you are attempting to consume from a RabbitMQ queue containing messages that do not come from your Carrot instance, you may want to extend this method to create, instead of get, a MessageLog object

Parameters:
  • properties – the message properties
  • body – the message body. This is not used by default, but is included so that the function can be extended in custom consumers.
Return type:

class:carrot.models.MessageLog or None

In order to avoid different consumers picking up the same message, MessageLogs are only .. note:

This method does not use self.get_task_type as the intention is to get the MessageLog object before the
**consume** method tries to do anything else. This means that if any later part of the process fails,
the traceback and exception information can be stored to the MessageLog object for easier debugging.

Warning

If this method fails to find a matching MessageLog object, then the RabbitMQ message will be rejected. Depending on the configuration of your RabbitMQ queue, this may cause a loss of data. If you are implementing a custom consumer, then you should use dead letter exchange to preserve your message content

get_task_type(properties, body)

Identifies the task type, by looking up the attribute self.task_type in the message properties

Parameters:
  • properties – the message properties
  • body – the message body. Not used by default, but provided so that the method can be extended if necessary
Returns:

The task type as a string, e.g. myapp.mymodule.mytask

on_bind(*args)

Invoked when the queue has been successfully bound to the exchange

Parameters are provided by Pika but not required by Carrot

on_cancel(*args)

Invoked when the channel cancel is completed.

Parameters provided by Pika but not required by Carrot

on_channel_closed(channel, reply_code, reply_text)

Called when the channel is closed. Raises a warning and closes the connection

Parameters are provided by Pika but not required by Carrot

on_channel_open(channel)

This function is invoked when the channel is established. It adds a callback in case of channel closure, and establishes the exchange

Parameters:channel (pika.channel.Channel) – The channel object
on_connection_closed(*args)

Callback that gets called when the connection is closed. Checks for the self.shutdown_requested parameter first, which is used to idenfity whether the shutdown has been requested by the user or not. If not, carrot attempts to reconnect

All arguments sent to this callback come from Pika but are not required by Carrot

on_connection_open(connection)

Callback that gets called when the connection is opened. Adds callback in case of a closed connection, and establishes the connection channel

Parameters:connection (pika.SelectConnection) – Sent by default by pika but not used by carrot
on_consumer_cancelled(method_frame)

Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer receiving messages.

Parameters:method_frame (pika.frame.Method) – The Basic.Cancel frame
on_exchange_declare(*args)

Invoked when the exchange has been successfully established

Parameters are provided by Pika but not required by Carrot

on_message(channel, method_frame, properties, body)

The process that takes a single message from RabbitMQ, converts it into a python executable and runs it, logging the output back to the assoicated carrot.models.MessageLog

Parameters:
  • channel (pika.channel.Channel) – not used
  • method_frame (pika.Spec.Basic.Deliver) – contains the delivery tag
  • properties (pika.Spec.BasicProperties) – the message properties
  • body (bytes) – The message body
on_queue_declare(*args)

Invoked when the queue has been successfully declared

Parameters are provided by Pika but not required by Carrot

reconnect()

Reconnect to the broker in case of accidental disconnection

run()

Process starts here

start_consuming()

The main consumer process. Attaches a callback to be invoked whenever there is a new message added to the queue

stop()

Cleanly exit the Consumer

stop_consuming()

Stops the consumer and cancels the channel

class carrot.consumer.ConsumerSet(host, queue, logger, concurrency=1, name='consumer', consumer_class='carrot.consumer.Consumer')

Creates and starts a number of Consumer objects. All consumers must belong to the same queue

Parameters:
  • host – The virtual host where the queue belongs
  • queue – The queue name
  • concurrency – the number of consumers to create. Defaults to 1
  • name – the name to assign to the individual consumers. Will be rendered as Consumer-1, Consumer-2, etc.
  • logfile – the path to the log file. Defaults to carrot.log
  • loglevel – the logging level. Defaults to logging.DEBUG
start_consuming()

Creates a thread for each concurrency level, e.g. if concurrency is set to 5, 5 threads are created.

A Consumer is attached to each thread and is started

stop_consuming()

Stops all running threads. Loops through the threads twice - firstly, to set the signal to False on all threads, secondly to wait for them all to finish

If a single loop was used here, the latter threads could still consume new tasks while the parent process waited for the earlier threads to finish. The second loop allows for quicker consumer stoppage and stops all consumers from consuming new tasks from the moment the signal is received

class carrot.consumer.ListHandler(thread_name, level)

A logging.Handler that records each log entry to a python list object, provided that the entry is coming from the correct thread.

Allows for task-specific logging

emit(record)

Do whatever it takes to actually log the specified logging record.

This version is intended to be implemented by subclasses and so raises a NotImplementedError.

class carrot.consumer.LoggingTask(task, logger, thread_name, *args, **kwargs)

Turns a function into a class with run() method, and attaches a ListHandler logging handler