Boto3: Reusing S3 Connection in Threads

Created on 5 Apr 2018  路  5Comments  路  Source: boto/boto3

I am attempting an upload of files to S3 using concurrent.futures.ThreadPoolExecutor in AWS Lambda. This is a sample of my code:

from concurrent import futures


def my_lambda(event, context):
    def upload_to_s3(file, key):
        s3.Bucket(MY_BUCKET).upload_file(file, key)

    with futures.ThreadPoolExecutor(max_workers=1000) as executor:
        todo = []
        for f in files:
            future = executor.submit(upload_to_s3, f, key)
            todo.append(future)

        results = []
        for future in futures.as_completed(todo):
            res = future.result()
            results.append(res)

It seems all the files do get upload to S3, but there doesn't seem to be any time improvement over a sequential upload. I am also getting these logs in CloudWatch:

Connection pool is full, discarding connection: s3-us-west-2.amazonaws.com

Did some research and according to this article it is not possible to reuse the connection to S3 since boto3 isn't thread-safe.

Would like to know if this is indeed correct and if there is a solution to this?

Possibly related: #1128

closing-soon

Most helpful comment

@JordonPhillips Is there some documentation that describes boto3 S3 client thread safety?

I can only find this about the thread safety, and it isn't explicit about the thread safety. https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html?highlight=multithreading#multithreading-multiprocessing

This link refers to resources rather than clients. Each resource has a meta attribute which holds a boto client under the hood. It sounds to me that the resource holds additional information which makes it thread unsafe, while the client itself might be thread safe. @JordonPhillips Can you please elaborate?

All 5 comments

boto3 S3 clients are thread safe - that article is referring to boto 2. If you're running out of connections, you can increase the pool size with max_pool_connections. You could also try using s3transfer, which can handle all of that for you. It isn't fully GA yet, so you would need to make sure to lock your minor version. You can see examples of how we use it to upload / download single files in boto3. While we currently don't expose an interface for uploading multiple files in this way, it shouldn't be hard to extrapolate.

This issue has been automatically closed because there has been no response to our request for more information from the original author. With only the information that is currently in the issue, we don't have enough information to take action. Please reach out if you have or find the answers we need so that we can investigate further.

@JordonPhillips Is there some documentation that describes boto3 S3 client thread safety?

I can only find this about the thread safety, and it isn't explicit about the thread safety. https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html?highlight=multithreading#multithreading-multiprocessing

@JordonPhillips Is there some documentation that describes boto3 S3 client thread safety?

I can only find this about the thread safety, and it isn't explicit about the thread safety. https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html?highlight=multithreading#multithreading-multiprocessing

This link refers to resources rather than clients. Each resource has a meta attribute which holds a boto client under the hood. It sounds to me that the resource holds additional information which makes it thread unsafe, while the client itself might be thread safe. @JordonPhillips Can you please elaborate?

You can successfully create multiple threads that share the same session, and thereby can asynchronously download from an S3 bucket for example without problems.

This is how I do it:

import concurrent.futures
import boto3
import json

# setup client and session
sess = boto3.session.Session()
client = sess.client("s3")

files = ["path-to-file.json", "path-to-file2.json"] 

def download_from_s3(file_path):
    obj = client.get_object(Bucket="<your-bucket>", Key=file_path)
    resp = json.loads(obj["Body"].read())
    return resp

with concurrent.futures.ThreadPoolExecutor() as executor:
     executor.map(download_from_s3, files)

Creating a session for each thread in my case results in a big slow down, whereas with this approach I am seeing an up to 7x improvement in performance compared to synchronous downloads.

Was this page helpful?
0 / 5 - 0 ratings