To install Boto3, you may use the following command:
pip install boto3
If your project requires a specific version of Boto3 or has compatibility concerns, you may choose to specify the same using the following constraints when installing:
# Install Boto3 version 1.0 specifically pip install boto3 == 1.0 .0 # Make sure Boto3 is no older than version 1.15 .0 pip install boto3 >= 1.15 .0 # Avoid versions of Boto3 newer than version 1.15 .3 pip install boto3 <= 1.15 .3
Once you execute the following set of commands, you’ll be displayed with an output something like this:
2021 - 05 - 01 17: 17: 17, 735: INFO: Standard Queue hevo - data - standard - queue created.Queue URL - https: //queue.amazonaws.com/967745397581/hevo-data-standard-queue
Here’s the request syntax to fetch the SQS Queue URL:
response = client.get_queue_url(
QueueName = 'string',
QueueOwnerAWSAccountId = 'string'
)
Output:
https: //us-east-1.queue.amazonaws.com/xxxx/hevo-data-new-queue
Replaying messages from a dead letter queue, Currently if an error occurs while processing a message, the whole consumer will crash. Instead, we'd like messages to be retried 3 times and then end up in a dead letter queue. You can read more about dead letter queues in the AWS docs here, By default, SQS makes messages invisible for 30 seconds after they have been read. If you have a redrive policy set up, SQS will automatically put your message on the dead letter queue after 3 unsuccessful reads. Therefore, we can handle errors by simply not deleting the message if an exception is thrown: ,Let's start with a basic SQS consumer:
import boto3 sqs = boto3.resource("sqs") queue = sqs.get_queue_by_name(QueueName = "your-queue-name") def process_message(message_body): print(f "processing message: {sqs_message}") # do what you want with the message here pass if __name__ == "__main__": while True: messages = sqs_queue.receive_messages() for message in messages: process_message(message.body) message.delete()
...
if __name__ == "__main__":
while True:
messages = sqs_queue.receive_messages()
for message in messages:
try:
process_message(sns_message.body)
except Exception as e:
print(f "exception while processing message: {repr(e)}")
continue
message.delete()
...
from signal
import signal, SIGINT, SIGTERM
...
class SignalHandler:
def __init__(self):
self.received_signal = False
signal(SIGINT, self._signal_handler)
signal(SIGTERM, self._signal_handler)
def _signal_handler(self, signal, frame):
print(f "handling signal {signal}, exiting gracefully")
self.received_signal = True
...
if __name__ == "__main__":
signal_handler = SignalHandler()
while not signal_handler.received_signal:
messages = sqs_queue.receive_messages()
...
...
if __name__ == "__main__":
signal_handler = SignalHandler()
while not signal_handler.received_signal:
messages = dlq.receive_messages(
MaxNumberOfMessages = 10,
WaitTimeSeconds = 1
)
...
...
from datadog
import statsd
...
queue = sqs.get_queue_by_name(QueueName = "your-queue-name")
dlq = sqs.get_queue_by_name(QueueName = "dead-letter-queue")
...
def send_queue_metrics(sqs_queue):
sqs_queue.load()
queue_name: str = sqs_queue.attributes["QueueArn"].split(":")[-1]
queue_length: int = sqs_queue.attributes["ApproximateNumberOfMessages"]
statsd.gauge("sqs.queue.message_count", queue_length, tags = [f "queue:{queue_name}"])
...
if __name__ == "__main__":
signal_handler = SignalHandler()
while not signal_handler.received_signal:
send_queue_metrics(queue)
send_queue_metrics(dlq)
messages = dlq.receive_messages(
MaxNumberOfMessages = 10,
WaitTimeSeconds = 1
)
...
...
def wait(seconds: int):
def decorator(fun):
last_run = time.monotonic()
def new_fun( * args, ** kwargs):
nonlocal last_run
now = time.monotonic()
if time.monotonic() - last_run > seconds:
last_run = now
return fun( * args, ** kwargs)
return new_fun
return decorator
...
@wait(seconds = 15)
def send_queue_metrics(sqs_queue):
...
Packages
npm install sqs - consumer--save
const {
Consumer
} = require('sqs-consumer');
const app = Consumer.create({
queueUrl: 'https://sqs.eu-west-1.amazonaws.com/account-id/queue-name',
handleMessage: async (message) => {
// do some work with `message`
}
});
app.on('error', (err) => {
console.error(err.message);
});
app.on('processing_error', (err) => {
console.error(err.message);
});
app.start();
const {
Consumer
} = require('sqs-consumer');
const AWS = require('aws-sdk');
const https = require('https');
const app = Consumer.create({
queueUrl: 'https://sqs.eu-west-1.amazonaws.com/account-id/queue-name',
handleMessage: async (message) => {
// do some work with `message`
},
sqs: new AWS.SQS({
httpOptions: {
agent: new https.Agent({
keepAlive: true
})
}
})
});
app.on('error', (err) => {
console.error(err.message);
});
app.on('processing_error', (err) => {
console.error(err.message);
});
app.start();
export AWS_SECRET_ACCESS_KEY = ...
export AWS_ACCESS_KEY_ID = ...
const {
Consumer
} = require('sqs-consumer');
const AWS = require('aws-sdk');
AWS.config.update({
region: 'eu-west-1',
accessKeyId: '...',
secretAccessKey: '...'
});
const app = Consumer.create({
queueUrl: 'https://sqs.eu-west-1.amazonaws.com/account-id/queue-name',
handleMessage: async (message) => {
// ...
},
sqs: new AWS.SQS()
});
app.on('error', (err) => {
console.error(err.message);
});
app.on('processing_error', (err) => {
console.error(err.message);
});
app.on('timeout_error', (err) => {
console.error(err.message);
});
app.start();