Aws-sdk-java: Mutipart upload from InputStream (without supplying content-length) uses putObject().

Created on 30 Jul 2015  路  17Comments  路  Source: aws/aws-sdk-java

I am calling com.amazonaws.services.s3.transfer.TransferManager.upload(String, String, InputStream, ObjectMetadata) and expect it to upload the stream in chunks but the API tried to upload it in one chunk.

After digging in the code a bit what I discovered this is the problematic call order:
com.amazonaws.services.s3.transfer.internal.UploadCallable.call()
com.amazonaws.services.s3.transfer.internal.UploadCallable.isMultipartUpload()
com.amazonaws.services.s3.transfer.internal.TransferManagerUtils.shouldUseMultipartUpload(PutObjectRequest, TransferManagerConfiguration)
com.amazonaws.services.s3.transfer.internal.TransferManagerUtils.getContentLength(PutObjectRequest)

The getContentLength returns -1 if the input is a stream (and content-length wasn't supplied).
The shouldUseMultipartUpload returns true if contentLength > configuration.getMultipartUploadThreshold() and since -1 is not larger than that it doesn't use multi-part upload (and for me later fails because my stream is too big to buffer by the API).

feature-request

Most helpful comment

TransferManager definitely needs this feature. The size of the stream to be uploaded is often unknown, for example when the data is being generated by or queried from an external service/database.

All 17 comments

This is the expected behavior if you don't provide a content-length, at least for the moment. If you do know the content length, providing it up front will allow the TransferManager to be significantly more efficient.

S3 requires a content-length header to be specified on each putObject/uploadPart request - if the length isn't known up front (and the stream doesn't support mark/reset), the SDK attempts to buffer the stream into memory to find out how large it is. This obviously only works for relatively small streams - for anything large enough that it can't be buffered in memory, you need to provide a content-length up front. The TransferManager assumes that if you haven't provided a content-length, the stream is small enough that doing a single-part upload will be more efficient than a multi-part upload.

Theoretically we could add a configuration option to force a multi-part upload even when the content-length is unknown, using the configured minimum part size and hoping for the best? You'd still have to buffer 5MB at a time to calculate the content-length for each individual part (more if you want to go over ~50GB total, since S3 allows a maximum of 10K parts per MPU), but at least you wouldn't have to buffer the entire thing if you legitimately don't know how large it's going to be.

I'll add this to our backlog (please chime in here if this would be useful to you so we can prioritize appropriately), or I'd be happy to take a look at a pull request.

Hello David and thank you for the quick and detailed response.

I am trying to write to S3 from a ZipInputStream and the ZipEntry.getSize() may return -1.

Buffering the file in memory is not an option since it is quite large and I get an OutOfMemoryException.

Writing it to disk is probably what I do but I would have preferred not to do that.

Seems to me like a valid and common use case.

Yeah, buffering to disk is the best option in the short term. On the plus side, the TransferManager can upload multiple parts in parallel when you give it a file, as opposed to having to read/upload from the ZipInputStream serially - may even end up being faster.

I've written a library that automatically chunks data from a stream into parts so that you can avoid both running out of memory and writing to disk: https://github.com/alexmojaki/s3-stream-upload

Thank you Alex, looks nice. My company probably won't give me time to return to this feature now and fix it, but if it will happen I'll try and use your library.

TransferManager definitely needs this feature. The size of the stream to be uploaded is often unknown, for example when the data is being generated by or queried from an external service/database.

@david-at-aws David, is there any ETA for this change?

For anyone running into this issue while using AmazonS3#putObject(PutObjectRequest), we ended up overriding this method so that if it encounters an InputStream with unknown length, it will read a fixed amount of data into a buffer (1MB, for example). If it reaches the end of the stream before filling the buffer, then we can just upload the buffer. If we fill the buffer before exhausting the stream, then we write to a temporary file and upload that instead. The code is here if in case anyone wants to do something similar. (and once TransferManager uses NIO we'll probably switch to that)

@david-at-aws perhaps you could reconsider implementing support for unknown large file sizes using the transfermanager? it's an important feature.

Is there any progress in this direction? There is a lot of streams in a world which performs operations not knowing final length: some zip imlementations, serializations, encryptions, transfers, etc. This particular requirement not allow to make simple transition of stream between endpoints without buffering it somewhere.

This approach acceptable only for relatively small files, which can be sent like byte array to lower RPC even without using Streams. For streams this is unaccepatable.

+1 for buffering to a temp file (or a byte array buffer) up to the multipart threshold. Not all streams have a length known up front; in fact, this is part of the reason to use a stream and not just a file, so it's important to account for this.

For our use case this huge dealbreaker. Within single execution of our task we generate couple of files each of size couple of hundreds MB. Since Lambda would be best fit for that thanks to this missing feature it is not the case. We cannot buffer (size wise) neither to disk nor to memory. We can either switch language (node supports streaming to S3 but the task runs much slower) or switch computation service (EC2 supports larger disk for buffering or huge memory but is more expensive and it is more work to operate it) or switch cloud provider (the other one supports atleast getting OutputStream in Java). I am deeply disappointed. Didn't expect such important feature will be missing.

I hope this isn't considered spammy, but I'm gonna mention my library again (https://github.com/alexmojaki/s3-stream-upload) because (1) I think @svobol13 would find it very useful and (2) based on his comment, I think he and possibly others are not seeing my first comment lost in the middle of this thread.

Is there any progress in this? I'll be using the third-party library that @alexmojaki suggested but this seems like an important feature missing in the official AWS library.

I was using AWS MutlipartUpload (low-level API) for uploading InputStream without Content-Length, here is a link to my code snippet -> https://gist.github.com/prameshbhattarai/dbb19eb2518ab0554e6aeb36b92ee84a#file-s3multipartupload-java
https://medium.com/@pra4mesh/uploading-inputstream-to-aws-s3-using-multipart-upload-java-add81b57964e

@alexmojaki

驴How could I use your library?
I have a ByteArrayInputStream (or something similar).

ObjectMapper objectMapper = new ObjectMapper();

Map<String, Object> obj = objectMapper.readValue(is, HashMap.class);

// Execute some transformation tasks

ByteArrayOutputStream baos = new ByteArrayOutputStream();
objectMapper.writeValue(baos, obj);
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());

// How to invoke StreamTransferManager ???

@alexmojaki

Finally I got It working

      int numStreams = 4;
      final StreamTransferManager manager = new StreamTransferManager(bucketName, keyNameJSON, s3Client)
        .numStreams(numStreams)
        .numUploadThreads(numStreams)
        .queueCapacity(numStreams)
        .partSize(5);

      manager.customiseUploadPartRequest(new UploadPartRequest().withObjectMetadata(metadata));

      final List<MultiPartOutputStream> streams = manager.getMultiPartOutputStreams();

      final int UPLOAD_PART_SIZE = 1 * Constants.MB;

      ExecutorService pool = Executors.newFixedThreadPool(numStreams);
      for (int i = 0; i < numStreams; i++) {
        final int streamIndex = i;
        pool.submit(new Runnable() {
          public void run() {
            try {
              MultiPartOutputStream outputStream = streams.get(streamIndex);
              int nRead = 0;
              byte[] data = new byte[UPLOAD_PART_SIZE];
              while ((nRead = zipBais.read(data, 0, data.length)) != -1) {
                outputStream.write(data, 0, nRead);
              }

              // The stream must be closed once all the data has been written
              outputStream.close();
            } catch (Exception e) {
              e.printStackTrace();
              // Aborts all uploads
              manager.abort(e);
            }
          }
        });
      }

      pool.shutdown();

      boolean wait = true;

      while (wait) {
        try {
          wait = !pool.awaitTermination(2, TimeUnit.SECONDS);
          log.info("wait, {}", wait);
          if (wait) {
            log.info("Awaiting completion of bulk callback threads.");
          }
        } catch (InterruptedException e) {
          log.info("Interruped while awaiting completion of callback threads - trying again...");
        }
      }

      // Finishing off
      manager.complete();

_Thanks in Advance_
_Paco_

Was this page helpful?
0 / 5 - 0 ratings

Related issues

umangs9 picture umangs9  路  3Comments

gribbet picture gribbet  路  5Comments

kliakos picture kliakos  路  3Comments

dsilvasc picture dsilvasc  路  5Comments

bagovino picture bagovino  路  4Comments