Boto3: Support for Celery Message Protocol

Created on 13 Jul 2019  路  3Comments  路  Source: boto/boto3

I'm using SQS as broker for Celery Framework:

  • tasks.py
from celery import Celery

app = Celery(broker='sqs://x:x@localhost:9324', include=('tasks',))
app.conf.task_routes = {'tasks.*': {'queue': 'my_queue'}}


@app.task
def my_task(arg):
    print(f'Received: {arg}')

I want to connect to SQS with boto3 from a remote machine and send a message to run the task in Celery:

  • client.py
import base64
import json
from uuid import uuid4

import boto3

if __name__ == '__main__':
    client = boto3.resource('sqs',
                            endpoint_url='http://localhost:9324',
                            region_name='chess-land',
                            aws_secret_access_key='x',
                            aws_access_key_id='x',
                            use_ssl=False)
    queue = client.get_queue_by_name(QueueName='my_queue')

    body = {
        'task': 'tasks.my_task',
        'id': str(uuid4()),
        'args': [1]
    }

    message = base64.b64encode(json.dumps(body).encode('utf-8')).decode()

    response = queue.send_message(MessageBody=message)

I get this output in the worker:

[2019-07-13 03:29:07,135: WARNING/MainProcess] Received and deleted unknown message.  Wrong destination?!?

The full contents of the message body was: body: '{"task": "tasks.my_task", "id": "d8b9db91-e11d-4163-abab-e8dc5c45f15c", "args": [1]}' (88b)
 {content_type:None content_encoding:None
  delivery_info:{'sqs_message': {'MessageId': '667c5f2b-4a41-418f-8ae2-70000513ffd6', 'ReceiptHandle': '667c5f2b-4a41-418f-8ae2-70000513ffd6#eea1d655-895a-4aec-bf6b-0a7897030787', 'MD5OfBody': '46b788bc78e76a09615b5ed0d92e04d4', 'Body': 'eyJ0YXNrIjogInRhc2tzLmNoZWNrX2JhdGNoIiwgImlkIjogImQ4YjlkYjkxLWUxMWQtNDE2My1hYmFiLWU4ZGM1YzQ1ZjE1YyIsICJhcmdzIjogWzFdfQ=='}, 'sqs_queue': 'http://sqs:9324/queue/my_queue'} headers={}}

I'm trying to set content_type and content_encoding to have the format defined in Celery Message Protocol v1, those values require to be sent in a different parameter than body called properties. How is this possible with boto3.?

Versions:

Python 3.7
Celery 4.3.0
boto3 1.9.186
closing-soon sqs

All 3 comments

@chesstrian - Thank you for your post. You can accomplish this with boto3 by adding custom header which will contain all the required parameter you want to add to the API call. Here is a sample code of how to add a event to an api call:

import boto3

def inject_header(params,**kwargs):
         params['headers']['content_encoding']='value'

client = boto3.client('sqs')
client.meta.events.register('before-call.sqs.SendMessage',inject_header)

response = client.send_message(QueueUrl='https://sqs.us-west-2.amazonaws.com/412759918066/my_queue',MessageBody='testing for custom headers')

Hope it helps and please let me know if you have any questions.

This issue has been automatically closed because there has been no response to our request for more information from the original author. With only the information that is currently in the issue, we don't have enough information to take action. Please reach out if you have or find the answers we need so that we can investigate further.

@swetashre - I tried the same thing, but it didn't fix the issue. What should I be looking for to make sure I have it right?

Was this page helpful?
0 / 5 - 0 ratings