Be careful using this function with non-zero amqp.timeout (you may check at AMQPConnection::getTimeout), because it looks like timeout value says how long to wait for a new message from broker before die in way like
Fatal error: Uncaught exception 'AMQPConnectionException' with message 'Resource temporarily unavailable' in /path/to/your/file.php:12
Stack trace:
#0 /path/to/your/file.php(12): AMQPQueue->consume(Object(Closure), 128)
#1 {main}
thrown in /path/to/your/file.php on line 12
As for notes about blocking, system resources greediness and so and so, you can investigate how it works by looking in amqp_queue.c for read_message_from_channel C function declaration and PHP_METHOD(amqp_queue_class, consume) method declaration. For me it works perfectly without any uncommon resources usage or I/O performance degradation under the load of 10k 64b message per second with delivery time for less than 0.001 sec.
OS: FreeBSD *** 8.2-RELEASE FreeBSD 8.2-RELEASE #0: Sat Mar ****** 2011 root@*****:**** amd64
PHP: PHP Version => 5.3.10, Suhosin Patch 0.9.10, Zend Engine v2.3.0
php AMQP extnsion:
amqp
Version => 1.0.9
Revision => $Revision: 327551 $
Compiled => Dec 2* 2012 @ *****
AMQP protocol version => 0-9-1
librabbitmq version => 0.2.0
Directive => Local Value => Master Value
amqp.auto_ack => 0 => 0
amqp.host => localhost => localhost
amqp.login => guest => guest
amqp.password => guest => guest
amqp.port => 5672 => 5672
amqp.prefetch_count => 3 => 3
amqp.timeout => 0 => 0
amqp.vhost => / => /
AMQP broker: RabbitMQ 3.0.1, Erlang R14B04
Definitely, such loop will block main thread, but due to single-thread PHP nature it's completely normal behavior. To exit this consumption loop your callback function or method (i prefer to use closures, btw) should return FALSE.
The benefit of this function is that you don't have manually iteration for all messages, and what is more important, if there is no unprocessed messages in queue it will wait for such for you.
So you have just to run you consumer (one or many) and optionally time to time check whether they still alive just for reason if you are not sure about callbacks or memory-limit-critical stuff
AMQPQueue::consume
(PECL amqp >= Unknown)
AMQPQueue::consume — Consomme les messages depuis la file d'attente
Description
Fonction bloquante qui récupère le prochain message depuis la file d'attente, le rendant ainsi disponible et le passe à la fonction de rappel.
Liste de paramètres
-
callback -
Une fonction de rappel qui recevra le message récupéré. La fonction doit accepter au minimum un paramètre, un objet AMQPEnvelope, et un second paramètre optionnel AMQPQueue représentant la file d'attente depuis laquelle le message est récupéré.
La méthode AMQPQueue::consume() ne retournera pas le thread exécutant la demande au script PHP tant que la fonction de rappel ne retournera pas
FALSE. -
flags -
Un masque contenant n'importe quel drapeau
AMQP_NOACK.
Erreurs / Exceptions
Emets une exception AMQPChannelException si le canal n'est pas ouvert.
Emets une exception AMQPConnectionException si la connexion au broker a été perdue.
Valeurs de retour
Exemples
Exemple #1 Exemple avec AMQPQueue::consume()
<?php
/* Création d'une connexion utilisant toutes les informations d'authentification par défaut : */
$connection = new AMQPConnection();
$connection->connect();
$channel = new AMQPChannel($connection);
/* création d'un objet de file d'attente */
$queue = new AMQPQueue($channel);
// Déclaration de la file d'attente
$queue->declare('myqueue');
$i = 0;
function processMessage($envelope, $queue) {
global $i;
echo "Message $i: " . $envelope->getBody() . "\n";
$i++;
if ($i > 10) {
// Bail after 10 messages
return false;
}
}
// Récupère les messages
$queue->consume("processMessage");
?>
To consume ALL messages stored DURABLE exchanges, you will need to set channel's prefetch size parameter to 0:
<?php $channel->setPrefetchCount(0); ?>
Be careful using consume() function on AMQP. It will catch all Exception and fall down in infinite loop (message will not be marked as readed and reput in queue)
Using AMQP_consume, against a RabbitMQ server, actually stuffs memory. It will work in a loop, or on a constant recall, so long as your exchange/queue and messages are set to durable. However, it will alo make the system unusable within a couple of minutes.
Using get(), all is fine. I think this may be a bugette in teh PHP access code... ff to take a look.
you shouldn't use AMQPQueue::consume() if you have to get _all_ incoming messages. you'll get only "max" number of messages and the queue will be destroyed then.
for my amqp.so v0.2.2 this weird behavior is true.
use AMQPQueue::get() and use "count" param instead.
