My producer sits in a loop and sends up some system properties every second. The problem is that I am only seeing the consumer read every 2nd message, it's as though every 2nd message is not being read. For example, my producer prints out this (timestamp, cpu pct used, RAM used):
2014 - 08 - 16 14: 36: 17.576000 - 0700, 16.0, 8050806784 2014 - 08 - 16 14: 36: 18.578000 - 0700, 15.5, 8064458752 2014 - 08 - 16 14: 36: 19.579000 - 0700, 15.0, 8075313152 2014 - 08 - 16 14: 36: 20.580000 - 0700, 12.1, 8074121216 2014 - 08 - 16 14: 36: 21.581000 - 0700, 16.0, 8077778944 2014 - 08 - 16 14: 36: 22.582000 - 0700, 14.2, 8075038720
but my consumer is printing out this:
Received '2014-08-16 14:36:17.576000 -0700,16.0,8050806784'
Received '2014-08-16 14:36:19.579000 -0700,15.0,8075313152'
Received '2014-08-16 14:36:21.581000 -0700,16.0,8077778944'
The code for the producer is:
import pika
import psutil
import time
import datetime
from dateutil.tz
import tzlocal
import logging
logging.getLogger('pika').setLevel(logging.DEBUG)
connection = pika.BlockingConnection(pika.ConnectionParameters(
host = '54.191.161.213'))
channel = connection.channel()
channel.queue_declare(queue = 'ems.data')
while True:
now = datetime.datetime.now(tzlocal())
timestamp = now.strftime('%Y-%m-%d %H:%M:%S.%f %z')
msg = "%s,%.1f,%d" % (timestamp, psutil.cpu_percent(), psutil.virtual_memory().used)
channel.basic_publish(exchange = '',
routing_key = 'ems.data',
body = msg)
print msg
time.sleep(1)
connection.close()
By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers.,This happens because RabbitMQ just dispatches a message when the message enters the queue. It doesn't look at the number of unacknowledged messages for a consumer. It just blindly dispatches every n-th message to the n-th consumer.,It's a common mistake to miss the basic_ack. It's an easy error, but the consequences are serious. Messages will be redelivered when your client quits (which may look like random redelivery), but RabbitMQ will eat more and more memory as it won't be able to release any unacked messages.,In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back by the consumer to tell RabbitMQ that a particular message had been received, processed and that RabbitMQ is free to delete it.
We will slightly modify the send.py code from our previous example, to allow arbitrary messages to be sent from the command line. This program will schedule tasks to our work queue, so let's name it new_task.py:
import sys
message = ' '.join(sys.argv[1: ]) or "Hello World!"
channel.basic_publish(exchange = '',
routing_key = 'hello',
body = message)
print(" [x] Sent %r" % message)
Our old receive.py script also requires some changes: it needs to fake a second of work for every dot in the message body. It will pop messages from the queue and perform the task, so let's call it worker.py:
import time
def callback(ch, method, properties, body):
print(" [x] Received %r" % body.decode())
time.sleep(body.count(b '.'))
print(" [x] Done")
You need three consoles open. Two will run the worker.py script. These consoles will be our two consumers - C1 and C2.
# shell 1 python worker.py # => [ * ] Waiting for messages.To exit press CTRL + C
In the third one we'll publish new tasks. Once you've started the consumers you can publish a few messages:
# shell 3 python new_task.py First message. python new_task.py Second message.. python new_task.py Third message... python new_task.py Fourth message.... python new_task.py Fifth message.....
Let's see what is delivered to our workers:
# shell 1 python worker.py # => [ * ] Waiting for messages.To exit press CTRL + C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....'
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
On Windows, drop the sudo:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
Producer client is created and connected to local RabbitMQ instance. From now on it can communicate with RabbitMQ freely., Producer client is created and connected to local RabbitMQ instance. From now on it can communicate with RabbitMQ freely. ,Both consumer and producer are created and connected to the same RabbitMQ server, residing on localhost, Both consumer and producer are created and connected to the same RabbitMQ server, residing on localhost
puka can be quickly installed using pip
– a Python package manager.
pip install puka
pip is not always bundled with Linux distributions. On Debian based distributions (including Ubuntu) it can be easily installed using:
apt - get install python - pip
On RHEL based, like CentOS:
yum install python - setuptools easy_install pip
and paste the script contents:
import puka # declare send and receive clients, both connecting to the same server on local machine producer = puka.Client("amqp://localhost/") consumer = puka.Client("amqp://localhost/") # connect sending party send_promise = producer.connect() producer.wait(send_promise) # connect receiving party receive_promise = consumer.connect() consumer.wait(receive_promise) # declare queue(queue must exist before it is being used - otherwise messages sent to that queue will be discarded) send_promise = producer.queue_declare(queue = 'rabbit') producer.wait(send_promise) # send message to the queue named rabbit send_promise = producer.basic_publish(exchange = '', routing_key = 'rabbit', body = 'Droplet test!') producer.wait(send_promise) print "Message sent!" # start waiting for messages, also those sent before(!), on the queue named rabbit receive_promise = consumer.basic_consume(queue = 'rabbit', no_ack = True) print "Starting receiving!" while True: received_message = consumer.wait(receive_promise) print "GOT: %r" % (received_message['body'], ) break
Running the script should print the message that was sent by the script to the RabbitMQ queue, since the test program receives the message immediately afterwards. The output should look like:
root @rabbitmq: ~# python rabbit_test.py Message sent! Starting receiving! GOT: 'Droplet test!' root @rabbitmq: ~#
In this part of the tutorial we’ll write two small programs in Python; a producer (sender) that sends a single message, and a consumer (receiver) that receives messages and prints them out. It’s a “Hello World” of messaging.,Receiving messages from the queue is more complex. It works by subscribing a callback function to a queue. Whenever we receive a message, this callback function is called by the Pika library. In our case this function will print on the screen the contents of the message.,Our first program send.py will send a single message to the queue. The first thing we need to do is to establish a connection with RabbitMQ server.,Before exiting the program we need to make sure the network buffers were flushed and our message was actually delivered to RabbitMQ. We can do it by gently closing the connection.
RabbitMQ speaks multiple protocols. This tutorial uses AMQP 0-9-1, which is an open,
general-purpose protocol for messaging. There are a number of clients for RabbitMQ
in many different languages. In this tutorial
series we’re going to use Pika 1.0.0,
which is the Python client recommended
by the RabbitMQ team. To install it you can use the
pip
package management tool:
python - m pip install pika--upgrade
RabbitMQ speaks multiple protocols. This tutorial uses AMQP 0-9-1, which is an open,
general-purpose protocol for messaging. There are a number of clients for RabbitMQ
in many different languages. In this tutorial
series we’re going to use Pika 1.0.0,
which is the Python client recommended
by the RabbitMQ team. To install it you can use the
pip
package management tool:
python - m pip install pika--upgrade
Our first program send.py
will send a single message to the queue.
The first thing we need to do is to establish a connection with
RabbitMQ server.
#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel()
Next, before sending we need to make sure the recipient queue exists. If we send a message to non-existing location, RabbitMQ will just drop the message. Let’s create a hello queue to which the message will be delivered:
channel.queue_declare(queue = 'hello')
Before exiting the program we need to make sure the network buffers were flushed and our message was actually delivered to RabbitMQ. We can do it by gently closing the connection.
connection.close()
The next step, just like before, is to make sure that the queue
exists. Creating a queue using queue_declare
is idempotent ‒ we
can run the command as many times as we like, and only one will be
created.
channel.queue_declare(queue = 'hello')
You may wish to see what queues RabbitMQ has and how many
messages are in them. You can do it (as a privileged user) using the rabbitmqctl
tool:
sudo rabbitmqctl list_queues
On Windows, omit the sudo:
rabbitmqctl.bat list_queues