lite python amqp server implementation for dev and mock? [closed]

  • Last Update :
  • Techknowledgy :

I ended up going with a Docker based solution for my integration tests that automatically starts and kills a RabbitMQ container. You need to install the Docker Python library and (of course) have a Docker daemon running on your machine. I was already using Docker for other things so it wasn't a biggie for my setup; YMMV. After that, basically I do:

import docker

client = docker.from_env()
c = client.containers.run(
    'rabbitmq:alpine',                       # use Alpine Linux build (smaller)
    auto_remove=True,                        # Remove automatically when stopped
    detach=True,                             # Run in daemon mode  
    ports={'5672/tcp' : ('127.0.0.1', None)} # Bind to a random localhost port
)

container = client.containers.get(c.id)      # Re-fetch container for port
port = container.attrs['NetworkSettings']['Ports']['5672/tcp'][0]['HostPort']

# ... Do any set up of the RabbitMQ instance needed on (127.0.0.1:<port>)

# ... Run tests against (127.0.0.1:<port>)

container.kill()   # Faster than 'stop'. Will also delete it so no need to be nice

Suggestion : 2

Celery is not the answer. I don't want a anycodings_mocking client, I want a server. Like a mini python anycodings_mocking RabbitMq.,I don't need a full implementation nor anycodings_mocking something robust, just something that is anycodings_mocking easy to install and run for dev. PyPi anycodings_mocking package would be great.,I'm not aware of any AMQP broker anycodings_python implemented in Python. And I am not anycodings_python aware of a 'lite' implementation in anycodings_python general; I think that implementing an anycodings_python AMQP broker is sufficiently complex that anycodings_python those that try it either aim to be close anycodings_python to one of the versions of the AMQP anycodings_python specification, or don't bother at all. anycodings_python :),I also don't quite see how running a anycodings_python broker presents the same problems as anycodings_python running a test web server for your web anycodings_python application.

I ended up going with a Docker based anycodings_python solution for my integration tests that anycodings_python automatically starts and kills a anycodings_python RabbitMQ container. You need to install anycodings_python the Docker Python library and (of anycodings_python course) have a Docker daemon running on anycodings_python your machine. I was already using Docker anycodings_python for other things so it wasn't a biggie anycodings_python for my setup; YMMV. After that, anycodings_python basically I do:

import docker

client = docker.from_env()
c = client.containers.run(
    'rabbitmq:alpine',                       # use Alpine Linux build (smaller)
    auto_remove=True,                        # Remove automatically when stopped
    detach=True,                             # Run in daemon mode  
    ports={'5672/tcp' : ('127.0.0.1', None)} # Bind to a random localhost port
)

container = client.containers.get(c.id)      # Re-fetch container for port
port = container.attrs['NetworkSettings']['Ports']['5672/tcp'][0]['HostPort']

# ... Do any set up of the RabbitMQ instance needed on (127.0.0.1:<port>)

# ... Run tests against (127.0.0.1:<port>)

container.kill()   # Faster than 'stop'. Will also delete it so no need to be nice

Suggestion : 3

Last updated: 2019-09-03 , RabbitMQ Summit - 2022

Full code

# example_publisher.py
import pika, os, logging
logging.basicConfig()

# Parse CLODUAMQP_URL(fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:guest@localhost/%2f')
params = pika.URLParameters(url)
params.socket_timeout = 5

connection = pika.BlockingConnection(params) # Connect to CloudAMQP
channel = connection.channel() # start a channel
channel.queue_declare(queue = 'pdfprocess') # Declare a queue
# send a message

channel.basic_publish(exchange = '', routing_key = 'pdfprocess', body = 'User information')
print("[x] Message sent to consumer")
connection.close()
# example_consumer.py
import pika, os, time

def pdf_process_function(msg):
   print(" PDF processing")
print(" [x] Received " + str(msg))

time.sleep(5) # delays
for 5 seconds
print(" PDF processing finished");
return;

# Access the CLODUAMQP_URL environment variable and parse it(fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:guest@localhost:5672/%2f')
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel() # start a channel
channel.queue_declare(queue = 'pdfprocess') # Declare a queue

# create a
function which is called on incoming messages
def callback(ch, method, properties, body):
   pdf_process_function(body)

# set up subscription on the queue
channel.basic_consume('pdfprocess',
   callback,
   auto_ack = True)

# start consuming(blocks)
channel.start_consuming()
connection.close()

Tutorial source code - Publisher

# example_consumer.py
import pika, os, logging

# Parse CLODUAMQP_URL(fallback to localhost)
url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:guest@localhost/%2f')
params = pika.URLParameters(url)
params.socket_timeout = 5

Start a channel

channel = connection.channel()

Declare a queue

channel.queue_declare(queue = 'pdfprocess') # Declare a queue