I ended up going with a Docker based solution for my integration tests that automatically starts and kills a RabbitMQ container. You need to install the Docker Python library and (of course) have a Docker daemon running on your machine. I was already using Docker for other things so it wasn't a biggie for my setup; YMMV. After that, basically I do:
import docker
client = docker.from_env()
c = client.containers.run(
'rabbitmq:alpine', # use Alpine Linux build (smaller)
auto_remove=True, # Remove automatically when stopped
detach=True, # Run in daemon mode
ports={'5672/tcp' : ('127.0.0.1', None)} # Bind to a random localhost port
)
container = client.containers.get(c.id) # Re-fetch container for port
port = container.attrs['NetworkSettings']['Ports']['5672/tcp'][0]['HostPort']
# ... Do any set up of the RabbitMQ instance needed on (127.0.0.1:<port>)
# ... Run tests against (127.0.0.1:<port>)
container.kill() # Faster than 'stop'. Will also delete it so no need to be nice
Celery is not the answer. I don't want a anycodings_mocking client, I want a server. Like a mini python anycodings_mocking RabbitMq.,I don't need a full implementation nor anycodings_mocking something robust, just something that is anycodings_mocking easy to install and run for dev. PyPi anycodings_mocking package would be great.,I'm not aware of any AMQP broker anycodings_python implemented in Python. And I am not anycodings_python aware of a 'lite' implementation in anycodings_python general; I think that implementing an anycodings_python AMQP broker is sufficiently complex that anycodings_python those that try it either aim to be close anycodings_python to one of the versions of the AMQP anycodings_python specification, or don't bother at all. anycodings_python :),I also don't quite see how running a anycodings_python broker presents the same problems as anycodings_python running a test web server for your web anycodings_python application.
I ended up going with a Docker based anycodings_python solution for my integration tests that anycodings_python automatically starts and kills a anycodings_python RabbitMQ container. You need to install anycodings_python the Docker Python library and (of anycodings_python course) have a Docker daemon running on anycodings_python your machine. I was already using Docker anycodings_python for other things so it wasn't a biggie anycodings_python for my setup; YMMV. After that, anycodings_python basically I do:
import docker
client = docker.from_env()
c = client.containers.run(
'rabbitmq:alpine', # use Alpine Linux build (smaller)
auto_remove=True, # Remove automatically when stopped
detach=True, # Run in daemon mode
ports={'5672/tcp' : ('127.0.0.1', None)} # Bind to a random localhost port
)
container = client.containers.get(c.id) # Re-fetch container for port
port = container.attrs['NetworkSettings']['Ports']['5672/tcp'][0]['HostPort']
# ... Do any set up of the RabbitMQ instance needed on (127.0.0.1:<port>)
# ... Run tests against (127.0.0.1:<port>)
container.kill() # Faster than 'stop'. Will also delete it so no need to be nice
Last updated: 2019-09-03 , RabbitMQ Summit - 2022
Full code
# example_publisher.py import pika, os, logging logging.basicConfig() # Parse CLODUAMQP_URL(fallback to localhost) url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:guest@localhost/%2f') params = pika.URLParameters(url) params.socket_timeout = 5 connection = pika.BlockingConnection(params) # Connect to CloudAMQP channel = connection.channel() # start a channel channel.queue_declare(queue = 'pdfprocess') # Declare a queue # send a message channel.basic_publish(exchange = '', routing_key = 'pdfprocess', body = 'User information') print("[x] Message sent to consumer") connection.close()
# example_consumer.py import pika, os, time def pdf_process_function(msg): print(" PDF processing") print(" [x] Received " + str(msg)) time.sleep(5) # delays for 5 seconds print(" PDF processing finished"); return; # Access the CLODUAMQP_URL environment variable and parse it(fallback to localhost) url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:guest@localhost:5672/%2f') params = pika.URLParameters(url) connection = pika.BlockingConnection(params) channel = connection.channel() # start a channel channel.queue_declare(queue = 'pdfprocess') # Declare a queue # create a function which is called on incoming messages def callback(ch, method, properties, body): pdf_process_function(body) # set up subscription on the queue channel.basic_consume('pdfprocess', callback, auto_ack = True) # start consuming(blocks) channel.start_consuming() connection.close()
Tutorial source code - Publisher
# example_consumer.py import pika, os, logging # Parse CLODUAMQP_URL(fallback to localhost) url = os.environ.get('CLOUDAMQP_URL', 'amqp://guest:guest@localhost/%2f') params = pika.URLParameters(url) params.socket_timeout = 5
Start a channel
channel = connection.channel()
Declare a queue
channel.queue_declare(queue = 'pdfprocess') # Declare a queue