python rabbitmq - consumer only seeing every second message

  • Last Update :
  • Techknowledgy :

My producer sits in a loop and sends up some system properties every second. The problem is that I am only seeing the consumer read every 2nd message, it's as though every 2nd message is not being read. For example, my producer prints out this (timestamp, cpu pct used, RAM used):

2014 - 08 - 16 14: 36: 17.576000 - 0700, 16.0, 8050806784
2014 - 08 - 16 14: 36: 18.578000 - 0700, 15.5, 8064458752
2014 - 08 - 16 14: 36: 19.579000 - 0700, 15.0, 8075313152
2014 - 08 - 16 14: 36: 20.580000 - 0700, 12.1, 8074121216
2014 - 08 - 16 14: 36: 21.581000 - 0700, 16.0, 8077778944
2014 - 08 - 16 14: 36: 22.582000 - 0700, 14.2, 8075038720

but my consumer is printing out this:

Received '2014-08-16 14:36:17.576000 -0700,16.0,8050806784'
Received '2014-08-16 14:36:19.579000 -0700,15.0,8075313152'
Received '2014-08-16 14:36:21.581000 -0700,16.0,8077778944'

The code for the producer is:

import pika
import psutil
import time
import datetime
from dateutil.tz
import tzlocal
import logging
logging.getLogger('pika').setLevel(logging.DEBUG)

connection = pika.BlockingConnection(pika.ConnectionParameters(
   host = '54.191.161.213'))
channel = connection.channel()

channel.queue_declare(queue = 'ems.data')

while True:
   now = datetime.datetime.now(tzlocal())
timestamp = now.strftime('%Y-%m-%d %H:%M:%S.%f %z')
msg = "%s,%.1f,%d" % (timestamp, psutil.cpu_percent(), psutil.virtual_memory().used)
channel.basic_publish(exchange = '',
   routing_key = 'ems.data',
   body = msg)
print msg
time.sleep(1)
connection.close()

Suggestion : 2

By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers.,This happens because RabbitMQ just dispatches a message when the message enters the queue. It doesn't look at the number of unacknowledged messages for a consumer. It just blindly dispatches every n-th message to the n-th consumer.,It's a common mistake to miss the basic_ack. It's an easy error, but the consequences are serious. Messages will be redelivered when your client quits (which may look like random redelivery), but RabbitMQ will eat more and more memory as it won't be able to release any unacked messages.,In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back by the consumer to tell RabbitMQ that a particular message had been received, processed and that RabbitMQ is free to delete it.

We will slightly modify the send.py code from our previous example, to allow arbitrary messages to be sent from the command line. This program will schedule tasks to our work queue, so let's name it new_task.py:

import sys

message = ' '.join(sys.argv[1: ]) or "Hello World!"
channel.basic_publish(exchange = '',
   routing_key = 'hello',
   body = message)
print(" [x] Sent %r" % message)

Our old receive.py script also requires some changes: it needs to fake a second of work for every dot in the message body. It will pop messages from the queue and perform the task, so let's call it worker.py:

import time

def callback(ch, method, properties, body):
   print(" [x] Received %r" % body.decode())
time.sleep(body.count(b '.'))
print(" [x] Done")

You need three consoles open. Two will run the worker.py script. These consoles will be our two consumers - C1 and C2.

# shell 1
python worker.py
# => [ * ] Waiting
for messages.To exit press CTRL + C

In the third one we'll publish new tasks. Once you've started the consumers you can publish a few messages:

# shell 3
python new_task.py First message.
python new_task.py Second message..
python new_task.py Third message...
   python new_task.py Fourth message....
python new_task.py Fifth message.....

Let's see what is delivered to our workers:

# shell 1
python worker.py
# => [ * ] Waiting
for messages.To exit press CTRL + C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

On Windows, drop the sudo:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

Suggestion : 3

Producer client is created and connected to local RabbitMQ instance. From now on it can communicate with RabbitMQ freely., Producer client is created and connected to local RabbitMQ instance. From now on it can communicate with RabbitMQ freely. ,Both consumer and producer are created and connected to the same RabbitMQ server, residing on localhost, Both consumer and producer are created and connected to the same RabbitMQ server, residing on localhost

puka can be quickly installed using pip – a Python package manager.

pip install puka

pip is not always bundled with Linux distributions. On Debian based distributions (including Ubuntu) it can be easily installed using:

apt - get install python - pip

On RHEL based, like CentOS:

yum install python - setuptools
easy_install pip

and paste the script contents:

import puka

# declare send and receive clients, both connecting to the same server on local machine
producer = puka.Client("amqp://localhost/")
consumer = puka.Client("amqp://localhost/")

# connect sending party
send_promise = producer.connect()
producer.wait(send_promise)

# connect receiving party
receive_promise = consumer.connect()
consumer.wait(receive_promise)

# declare queue(queue must exist before it is being used - otherwise messages sent to that queue will be discarded)
send_promise = producer.queue_declare(queue = 'rabbit')
producer.wait(send_promise)

# send message to the queue named rabbit
send_promise = producer.basic_publish(exchange = '', routing_key = 'rabbit', body = 'Droplet test!')
producer.wait(send_promise)

print "Message sent!"

# start waiting
for messages, also those sent before(!), on the queue named rabbit
receive_promise = consumer.basic_consume(queue = 'rabbit', no_ack = True)

print "Starting receiving!"

while True:
   received_message = consumer.wait(receive_promise)
print "GOT: %r" % (received_message['body'], )
break

Running the script should print the message that was sent by the script to the RabbitMQ queue, since the test program receives the message immediately afterwards. The output should look like:

root @rabbitmq: ~# python rabbit_test.py
Message sent!
   Starting receiving!
   GOT: 'Droplet test!'
root @rabbitmq: ~#

Suggestion : 4

In this part of the tutorial we’ll write two small programs in Python; a producer (sender) that sends a single message, and a consumer (receiver) that receives messages and prints them out. It’s a “Hello World” of messaging.,Receiving messages from the queue is more complex. It works by subscribing a callback function to a queue. Whenever we receive a message, this callback function is called by the Pika library. In our case this function will print on the screen the contents of the message.,Our first program send.py will send a single message to the queue. The first thing we need to do is to establish a connection with RabbitMQ server.,Before exiting the program we need to make sure the network buffers were flushed and our message was actually delivered to RabbitMQ. We can do it by gently closing the connection.

RabbitMQ speaks multiple protocols. This tutorial uses AMQP 0-9-1, which is an open, general-purpose protocol for messaging. There are a number of clients for RabbitMQ in many different languages. In this tutorial series we’re going to use Pika 1.0.0, which is the Python client recommended by the RabbitMQ team. To install it you can use the pip package management tool:

python - m pip install pika--upgrade

RabbitMQ speaks multiple protocols. This tutorial uses AMQP 0-9-1, which is an open, general-purpose protocol for messaging. There are a number of clients for RabbitMQ in many different languages. In this tutorial series we’re going to use Pika 1.0.0, which is the Python client recommended by the RabbitMQ team. To install it you can use the pip package management tool:

python - m pip install pika--upgrade

Our first program send.py will send a single message to the queue. The first thing we need to do is to establish a connection with RabbitMQ server.

#!/usr/bin/env python

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

Next, before sending we need to make sure the recipient queue exists. If we send a message to non-existing location, RabbitMQ will just drop the message. Let’s create a hello queue to which the message will be delivered:

channel.queue_declare(queue = 'hello')

Before exiting the program we need to make sure the network buffers were flushed and our message was actually delivered to RabbitMQ. We can do it by gently closing the connection.

connection.close()

The next step, just like before, is to make sure that the queue exists. Creating a queue using queue_declare is idempotent ‒ we can run the command as many times as we like, and only one will be created.

channel.queue_declare(queue = 'hello')

You may wish to see what queues RabbitMQ has and how many messages are in them. You can do it (as a privileged user) using the rabbitmqctl tool:

sudo rabbitmqctl list_queues

On Windows, omit the sudo:

rabbitmqctl.bat list_queues