In this article, we are going to learn how to filter and delete specific messages in the SQS queue before they are read by the consumer
Sometimes we accidentally push the wrong messages to SQS or for some reason we want to stop the messages in the middle before the consumer read them
we are going to discuss the steps, we can perform to ensure that we remove specific messages before they are read by the consumer
Steps to filter and delete the messages in SQS
These are the steps I can recommend to stop the messages during the transit before it can be consumed by the worker process
- Stop the consumer or point the consumer to another dummy SQS queue for the time being
- Configure and increase the Delivery delay on the subjected SQS queue.
- Browse the Messages using our SQSCLI product and check the format of the message body
- Use the following Python InFlightSQS file to read and remove specific messages and keep the other messages back in the queue
Boto Python Script to filter messages and Delete
The script is designed to take four command line arguments during the invocation
--threads [or] -t
to define the number of threads between 1 to 10--search [or] -s
to pass the search string to look for in each message--queue [or] -q
queue name to browse and perform the tasks--delete [or] -d
boolen string value True (or) False to control whether the matching message should be deleted or just logged
It starts multiple threads defined by the -t
or --threads
during the invocation. it can differ from 1 to 10 ( you can change this limit in sourcecode)
Each worker thread would act as an instance of the InFlightSQS and try to consume messages from SQS independently
the consumed message would be checked against a condition. If it match, it would be written to a file named worker-<worker-no>.json
and deleted only If you have set the delete flag to True
If the message does not match the condition, it would simply be ignored and it would be visible to the consumer after the configured visibility timeout of the SQS
Note*: We recommend you to check the Visibility time configuration of SQS prior running this script.
Ideally this value is 30 seconds, In delayed queues and queues with high visibility time. you might have to wait little longer for the visibility time to elapse before your actual consumer can read the messages
Where to download and how to use
You can download the InFlightSQS source code from the following github repository
https://github.com/AKSarav/InFlightSQS
You can clone the project and make your own changes and submit a Pull request if you want to share that feature with the wider audience
git clone https://github.com/AKSarav/InFlightSQS.git
How it handles the Authentication
We are assuming that the Authentication AWS profile from the local environment, Either through the default IAM profile or the AWS CLI profile
Make sure you set your AWS CLI environment right if you are running from a local or IAM profile if you are running the script from an EC2 instance
InFlightSQS (Boto) use the default Environment variables and the profiles on the local environment.
Usage
usage: inFlightSQS.py [-h] [-t THREADS] [-s SEARCH] [-q QUEUE] [-d DELETE]
Examples
Here are some examples of InFlightSQS with Deletion option set to True and False
# Search for string "stringTOsearch" in SQS Queue "mysqsqueue" and delete the messages python InFlightSQS.py \ -t 10 \ -s stringTOsearch \ -q mysqsqueue \ -d True \ # Search for string "stringTOsearch" in SQS Queue "mysqsqueue" and do not delete the messages python InFlightSQS.py \ -t 10 \ -s stringTOsearch \ -q mysqsqueue \ -d False
Installation
Install the dependencies speicified on the requirements.txt file
pip3 install -r requirements.txt
Run the script with the following updates values
- Thread Count
- String to Search
- Queue Name
- Delete Flag ( False | True)
python InFlightSQS.py -t 10 -s stringTOsearch -q mysqsqueue -d False
Source code for quick reference
Here is the source code for your quick reference but please do check the latest code in the Github repository
import boto import boto3 import json import time import threading import re import parser import os import argparse import pdb def receivemessage(workername): print("Starting thread: " + workername) sqs = boto3.resource('sqs') SQSQ = sqs.get_queue_by_name(QueueName=QNAME) no_of_messages = SQSQ.attributes['ApproximateNumberOfMessages'] while int(no_of_messages) > 0 : try: messages=SQSQ.receive_messages(MaxNumberOfMessages=10,WaitTimeSeconds=20) except Exception as e: print(e) print("Failed to receive message from primary queue") exit(1) print ("Number of messages in Queue: " + no_of_messages) for message in messages: msgbody = message.body if re.search(r''+SEARCH_STRING, msgbody): with open(workername+'.json', 'a') as outfile: outfile.writelines(message.body) outfile.write("\n") # Mark the message as deleted if delete flag is set if DELETE: message.delete() outfile.close() else: pass # print("Message does not contain "+SEARCH_STRING+ " ignoring..") # uncomment the following block if you want to keep records of the messages that are not matching # with open(workername+'-normal.json', 'a') as outfile: # outfile.writelines(message.body) # outfile.write("\n") # # Mark the message as deleted # outfile.close() no_of_messages = SQSQ.attributes['ApproximateNumberOfMessages'] if __name__ == '__main__': # argparse - get number of threads and search string parser = argparse.ArgumentParser() parser.add_argument("-t", "--threads", help="Number of threads to spawn", type=int) parser.add_argument("-s", "--search", help="Search string to look for in the message body", type=str) parser.add_argument("-q", "--queue", help="Name of the queue to read from", type=str) parser.add_argument("-d", "--delete", help="Delete the message from the queue after reading", type=bool) args = parser.parse_args() parser.error = lambda err: print("Usage: python InFlightSQS.py -t <number of threads> -s <search string> -q <queue name> -d <delete message from queue>") # Check if the inputs are provided or print usage if not args.threads or not args.search or not args.queue or not args.delete: print("Usage: python InFlightSQS.py -t <number of threads> -s <search string> -q <queue name> -d <delete message from queue>") exit(1) # Check the datatype of the inputs if not isinstance(args.threads, int) or not isinstance(args.search, str) or not isinstance(args.queue, str) or not isinstance(args.delete, bool): print("Usage: python InFlightSQS.py -t <number of threads> -s <search string> -q <queue name> -d <delete message from queue>") exit(1) # Print the inputs print("Number of threads: " + str(args.threads)) print("Search string: " + args.search) print("Queue name: " + args.queue) print("Delete flag: " + str(args.delete)) # Check the inputs for valid values if args.threads < 1 or args.threads > 10: print("Number of threads should be between 1 and 10") exit(1) elif args.search == "": print("Search string cannot be empty") exit(1) elif args.queue == "": print("Queue name cannot be empty") exit(1) elif args.delete == "True" or args.delete == "False": print("Delete flag cannot be empty and should be either True or False") exit(1) SEARCH_STRING = args.search NUM_THREADS = args.threads QNAME = args.queue DELETE = args.delete # Create threads as per the number of threads specified for i in range(NUM_THREADS): t = threading.Thread(target=receivemessage, args=("worker-"+str(i),)) t.start() time.sleep(1) # Wait for all threads to finish main_thread = threading.currentThread() for t in threading.enumerate(): if t is not main_thread: t.join()
Conclusion
Hope this script helps you to find and delete the specific messages in SQS queue InFlight which are accidentally sent and you want to delete in mid air before it is consumed by the actual consumer
If you have any feedback or ideas to make this better. please do let me know in comments section
Cheers
Sarav AK
Follow me on Linkedin My Profile Follow DevopsJunction onFacebook orTwitter For more practical videos and tutorials. Subscribe to our channel
Signup for Exclusive "Subscriber-only" Content