This module provides a backend API for creating Consumers and Consumer Sets
carrot.consumer.
Consumer
(host: carrot.objects.VirtualHost, queue: str, logger: logging.Logger, name: str, durable: bool = True, queue_arguments: dict = None, exchange_arguments: dict = None)¶An individual Consumer object. This class is run on a detached thread and watches a specific RabbitMQ queue for
messages, and consumes them when they appear. Multiple Consumers can be linked to the same queue using a
ConsumerSet
object.
add_failure_callback
(cb: Callable) → None¶Registers a callback that gets called when there is any kind of error with the .consume() method
close_connection
() → None¶This method closes the connection to RabbitMQ.
connect
() → pika.adapters.select_connection.SelectConnection¶Connects to the broker
fail
(log: carrot.models.MessageLog, err: Union[str, Exception]) → None¶This function is called whenever there is a failure executing a specific MessageLog object
The exception message is logged, and the MessageLog is updated with the result
get_message_log
(properties: pika.spec.BasicProperties, body: bytes) → Optional[carrot.models.MessageLog]¶Finds a MessageLog based on the content of the RabbitMQ message
By default, carrot finds this retrieving the MessageLog UUID from the RabbitMQ message properties.message_id attribute.
This method can be extended by custom consumers. For example, if you are attempting to consume from a RabbitMQ queue containing messages that do not come from your Carrot instance, you may want to extend this method to create, instead of get, a MessageLog object
The body parameter is not used here but is included in as in some cases it is useful for customer Consumer objects
Note
This method does not use self.get_task_type as the intention is to get the MessageLog object before the consume method tries to do anything else. This means that if any later part of the process fails, the traceback and exception information can be stored to the MessageLog object for easier debugging.
Warning
If this method fails to find a matching MessageLog object, then the RabbitMQ message will be rejected. Depending on the configuration of your RabbitMQ queue, this may cause a loss of data. If you are implementing a custom consumer, then you should use dead letter exchange to preserve your message content
get_task_type
(properties: pika.spec.BasicProperties, body: bytes) → str¶Identifies the task type, by looking up the attribute self.task_type in the message properties
The parameter body is not used here - However, it is included as in some cases it is useful when extending the Consumer class
on_bind
(*args) → None¶Invoked when the queue has been successfully bound to the exchange
Parameters are require to match the signature used by Pika but are not required by Carrot
on_cancel
(*args) → None¶Invoked when the channel cancel is completed.
Parameters provided by Pika but not required by Carrot
on_channel_closed
(channel: pika.channel.Channel, reply_code: int, reply_text: str) → None¶Called when the channel is closed. Raises a warning and closes the connection
Parameters are require to match the signature used by Pika but are not required by Carrot
on_channel_open
(channel: pika.channel.Channel) → None¶This function is invoked when the channel is established. It adds a callback in case of channel closure, and establishes the exchange
on_connection_closed
(*args) → None¶Callback that gets called when the connection is closed. Checks for the self.shutdown_requested parameter first, which is used to idenfity whether the shutdown has been requested by the user or not. If not, carrot attempts to reconnect
All arguments sent to this callback come from Pika but are not required by Carrot
on_connection_open
(connection: pika.adapters.select_connection.SelectConnection) → None¶Callback that gets called when the connection is opened. Adds callback in case of a closed connection, and establishes the connection channel
The connection parameter here is not used, as self.connection is defined elsewhere, but is included so that the signature matches as per Pika’s requirements
on_consumer_cancelled
(method_frame: pika.frame.Method) → None¶Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer receiving messages.
on_exchange_declare
(*args) → None¶Invoked when the exchange has been successfully established
Parameters are require to match the signature used by Pika but are not required by Carrot
on_message
(channel: pika.channel.Channel, method_frame: pika.frame.Method, properties: pika.spec.BasicProperties, body: bytes) → None¶The process that takes a single message from RabbitMQ, converts it into a python executable and runs it,
logging the output back to the associated carrot.models.MessageLog
on_queue_declare
(*args) → None¶Invoked when the queue has been successfully declared
Parameters are require to match the signature used by Pika but are not required by Carrot
reconnect
() → None¶Reconnect to the broker in case of accidental disconnection
run
() → None¶Process starts here
serializer
¶start_consuming
() → None¶The main consumer process. Attaches a callback to be invoked whenever there is a new message added to the queue.
This method sets a channel prefetch count of zero to prevent dropouts
stop
() → None¶Cleanly exit the Consumer
stop_consuming
() → None¶Stops the consumer and cancels the channel
carrot.consumer.
ConsumerSet
(host: carrot.objects.VirtualHost, queue: str, logger: logging.Logger, concurrency: int = 1, name: str = 'consumer', consumer_class: str = 'carrot.consumer.Consumer')¶Creates and starts 1 or more .Consumer objects. All consumers must belong to the same queue
get_consumer_class
(consumer_class: str) → Type[carrot.consumer.Consumer]¶Returns a Consumer object from a string using dynamic imports
start_consuming
() → None¶Creates a thread for each concurrency level, e.g. if concurrency is set to 5, 5 threads are created.
A Consumer
is attached to each thread and is started
stop_consuming
() → None¶Stops all running threads. Loops through the threads twice - firstly, to set the signal to False on all threads, secondly to wait for them all to finish
If a single loop was used here, the latter threads could still consume new tasks while the parent process waited for the earlier threads to finish. The second loop allows for quicker consumer stoppage and stops all consumers from consuming new tasks from the moment the signal is received
carrot.consumer.
ListHandler
(thread_name: str, level: int)¶A logging.Handler
that records each log entry to a python list object, provided that the entry is coming
from the correct thread.
Allows for task-specific logging
emit
(record: logging.LogRecord)¶Do whatever it takes to actually log the specified logging record.
This version is intended to be implemented by subclasses and so raises a NotImplementedError.
carrot.consumer.
LoggingTask
(task: Callable, logger: logging.Logger, thread_name: str, *args, **kwargs)¶Turns a function into a class with run()
method, and attaches a ListHandler
logging handler