This module provides a backend API for creating Consumers and Consumer Sets
carrot.consumer.
Consumer
(host, queue, logger, name, durable=True, queue_arguments=None, exchange_arguments=None)¶An individual Consumer object. This class is run on a detached thread and watches a specific RabbitMQ queue for
messages, and consumes them when they appear. Multiple Consumers can be linked to the same queue using a
ConsumerSet
object.
close_connection
()¶This method closes the connection to RabbitMQ.
connect
()¶Connects to the broker
Return type: | pika.SelectConnection |
---|
fail
(log, err)¶This function is called if there is any kind of error with the .consume() function
Parameters: |
|
---|
The exception message is logged, and the MessageLog is updated with the result
get_message_log
(properties, body)¶Finds and returns the carrot.models.MessageLog
object associated with a RabbitMQ message
By default, carrot finds this retrieving the MessageLog UUID from the RabbitMQ message properties.message_id attribute.
This method can be extended by custom consumers. For example, if you are attempting to consume from a RabbitMQ queue containing messages that do not come from your Carrot instance, you may want to extend this method to create, instead of get, a MessageLog object
Parameters: |
|
---|---|
Return type: | class:carrot.models.MessageLog or None |
In order to avoid different consumers picking up the same message, MessageLogs are only .. note:
This method does not use self.get_task_type as the intention is to get the MessageLog object before the
**consume** method tries to do anything else. This means that if any later part of the process fails,
the traceback and exception information can be stored to the MessageLog object for easier debugging.
Warning
If this method fails to find a matching MessageLog object, then the RabbitMQ message will be rejected. Depending on the configuration of your RabbitMQ queue, this may cause a loss of data. If you are implementing a custom consumer, then you should use dead letter exchange to preserve your message content
get_task_type
(properties, body)¶Identifies the task type, by looking up the attribute self.task_type in the message properties
Parameters: |
|
---|---|
Returns: | The task type as a string, e.g. myapp.mymodule.mytask |
on_bind
(*args)¶Invoked when the queue has been successfully bound to the exchange
Parameters are provided by Pika but not required by Carrot
on_cancel
(*args)¶Invoked when the channel cancel is completed.
Parameters provided by Pika but not required by Carrot
on_channel_closed
(channel, reply_code, reply_text)¶Called when the channel is closed. Raises a warning and closes the connection
Parameters are provided by Pika but not required by Carrot
on_channel_open
(channel)¶This function is invoked when the channel is established. It adds a callback in case of channel closure, and establishes the exchange
Parameters: | channel (pika.channel.Channel) – The channel object |
---|
on_connection_closed
(*args)¶Callback that gets called when the connection is closed. Checks for the self.shutdown_requested parameter first, which is used to idenfity whether the shutdown has been requested by the user or not. If not, carrot attempts to reconnect
All arguments sent to this callback come from Pika but are not required by Carrot
on_connection_open
(connection)¶Callback that gets called when the connection is opened. Adds callback in case of a closed connection, and establishes the connection channel
Parameters: | connection (pika.SelectConnection) – Sent by default by pika but not used by carrot |
---|
on_consumer_cancelled
(method_frame)¶Invoked by pika when RabbitMQ sends a Basic.Cancel for a consumer receiving messages.
Parameters: | method_frame (pika.frame.Method) – The Basic.Cancel frame |
---|
on_exchange_declare
(*args)¶Invoked when the exchange has been successfully established
Parameters are provided by Pika but not required by Carrot
on_message
(channel, method_frame, properties, body)¶The process that takes a single message from RabbitMQ, converts it into a python executable and runs it,
logging the output back to the assoicated carrot.models.MessageLog
Parameters: |
|
---|
on_queue_declare
(*args)¶Invoked when the queue has been successfully declared
Parameters are provided by Pika but not required by Carrot
reconnect
()¶Reconnect to the broker in case of accidental disconnection
run
()¶Process starts here
start_consuming
()¶The main consumer process. Attaches a callback to be invoked whenever there is a new message added to the queue
stop
()¶Cleanly exit the Consumer
stop_consuming
()¶Stops the consumer and cancels the channel
carrot.consumer.
ConsumerSet
(host, queue, logger, concurrency=1, name='consumer', consumer_class='carrot.consumer.Consumer')¶Creates and starts a number of Consumer
objects. All consumers must belong to the same queue
Parameters: |
|
---|
start_consuming
()¶Creates a thread for each concurrency level, e.g. if concurrency is set to 5, 5 threads are created.
A Consumer
is attached to each thread and is started
stop_consuming
()¶Stops all running threads. Loops through the threads twice - firstly, to set the signal to False on all threads, secondly to wait for them all to finish
If a single loop was used here, the latter threads could still consume new tasks while the parent process waited for the earlier threads to finish. The second loop allows for quicker consumer stoppage and stops all consumers from consuming new tasks from the moment the signal is received
carrot.consumer.
ListHandler
(thread_name, level)¶A logging.Handler
that records each log entry to a python list object, provided that the entry is coming
from the correct thread.
Allows for task-specific logging
emit
(record)¶Do whatever it takes to actually log the specified logging record.
This version is intended to be implemented by subclasses and so raises a NotImplementedError.
carrot.consumer.
LoggingTask
(task, logger, thread_name, *args, **kwargs)¶Turns a function into a class with run()
method, and attaches a ListHandler
logging handler