I'm using SQS as broker for Celery Framework:
tasks.pyfrom celery import Celery
app = Celery(broker='sqs://x:x@localhost:9324', include=('tasks',))
app.conf.task_routes = {'tasks.*': {'queue': 'my_queue'}}
@app.task
def my_task(arg):
print(f'Received: {arg}')
I want to connect to SQS with boto3 from a remote machine and send a message to run the task in Celery:
client.pyimport base64
import json
from uuid import uuid4
import boto3
if __name__ == '__main__':
client = boto3.resource('sqs',
endpoint_url='http://localhost:9324',
region_name='chess-land',
aws_secret_access_key='x',
aws_access_key_id='x',
use_ssl=False)
queue = client.get_queue_by_name(QueueName='my_queue')
body = {
'task': 'tasks.my_task',
'id': str(uuid4()),
'args': [1]
}
message = base64.b64encode(json.dumps(body).encode('utf-8')).decode()
response = queue.send_message(MessageBody=message)
I get this output in the worker:
[2019-07-13 03:29:07,135: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?
The full contents of the message body was: body: '{"task": "tasks.my_task", "id": "d8b9db91-e11d-4163-abab-e8dc5c45f15c", "args": [1]}' (88b)
{content_type:None content_encoding:None
delivery_info:{'sqs_message': {'MessageId': '667c5f2b-4a41-418f-8ae2-70000513ffd6', 'ReceiptHandle': '667c5f2b-4a41-418f-8ae2-70000513ffd6#eea1d655-895a-4aec-bf6b-0a7897030787', 'MD5OfBody': '46b788bc78e76a09615b5ed0d92e04d4', 'Body': 'eyJ0YXNrIjogInRhc2tzLmNoZWNrX2JhdGNoIiwgImlkIjogImQ4YjlkYjkxLWUxMWQtNDE2My1hYmFiLWU4ZGM1YzQ1ZjE1YyIsICJhcmdzIjogWzFdfQ=='}, 'sqs_queue': 'http://sqs:9324/queue/my_queue'} headers={}}
I'm trying to set content_type and content_encoding to have the format defined in Celery Message Protocol v1, those values require to be sent in a different parameter than body called properties. How is this possible with boto3.?
Versions:
Python 3.7
Celery 4.3.0
boto3 1.9.186
@chesstrian - Thank you for your post. You can accomplish this with boto3 by adding custom header which will contain all the required parameter you want to add to the API call. Here is a sample code of how to add a event to an api call:
import boto3
def inject_header(params,**kwargs):
params['headers']['content_encoding']='value'
client = boto3.client('sqs')
client.meta.events.register('before-call.sqs.SendMessage',inject_header)
response = client.send_message(QueueUrl='https://sqs.us-west-2.amazonaws.com/412759918066/my_queue',MessageBody='testing for custom headers')
Hope it helps and please let me know if you have any questions.
This issue has been automatically closed because there has been no response to our request for more information from the original author. With only the information that is currently in the issue, we don't have enough information to take action. Please reach out if you have or find the answers we need so that we can investigate further.
@swetashre - I tried the same thing, but it didn't fix the issue. What should I be looking for to make sure I have it right?