You can use S3AsyncClient and provide your own AsyncResponseTransformer.
I see. And do you have plan to add an async response transfomer which exposes the ByteBuffer publisher?
I'm thinking about something like this:
public class ByteBufferSdkPublisherAsyncResponseTransformer<T extends SdkResponse>
implements AsyncResponseTransformer<T, PublisherWrapper<T, ByteBuffer>>
{
private volatile CompletableFuture<SdkPublisher<ByteBuffer>> cf;
private volatile T response;
@Override
public CompletableFuture<PublisherWrapper<T, ByteBuffer>> prepare()
{
cf = new CompletableFuture<>();
return cf.thenApply(publisher -> new PublisherWrapper<>(publisher, response));
}
@Override
public void onResponse(T response)
{
this.response = response;
}
@Override
public void onStream(SdkPublisher<ByteBuffer> publisher)
{
cf.complete(publisher);
}
@Override
public void exceptionOccurred(Throwable error)
{
cf.completeExceptionally(error);
}
}
Although, the ByteBuffer inside the CompletableFuture is not very nice to say the least. Instead I would imagine a method (getObjectByteBufferStream) in the S3AsyncClient class which could return SdkPublisher<ByteArray>.
What do you think?
Hmm, can you tell us a bit more of your use case? We might consider adding it if that's a common use case.
Sure.
I have a text file in s3 which contains separate/individual data in each line like this:
useful information 1
useful information 2
useful information 3
...
The file is quite big, so I wouldn't like to load the whole file into memory. Instead, I would like to process it reactively line by line with backpressure.
I'm struggling with this too, I have code that does some S3 operations that I want to be able to do asynchronously, but for some of them the result needs to be handed off to for example Commons CSV that expects a Reader. I have written an AsyncResponseTransformer implementation that uses a LinkedBlockingQueue internally to present a InputStream interface, but I'm struggling with how to use the API correctly.
Specifically, when subscribing and requesting more, what is the unit of the Subscription#request call? Is it bytes? Is it calls to #onNext? Is there any way to know or control the size of the chunks passed to #onNext? I haven't been able to figure this out.
For me, it would obviously be much easier if there was an SDK implementation that would give me an InputStream or Reader from an asynchronous call, or like @zoewangg is requesting, a Publisher that managed buffering and produced lines, or custom sized chunks.
Does the current version of the SDK support returning the content of an S3 object reactively in a Publisher line by line similar to for example Athena pagination?
I was able to achieve line by line streaming using the below code:
ResponseInputStream
.bucket(s3Entity.getBucket().getName())
.key(s3Entity.getObject().getKey())
.build(), ResponseTransformer.toInputStream());
int linesRead = 0;
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(responseInputStream, StandardCharsets.UTF_8))) {
while (reader.readLine() != null) {
linesRead++;
}
} catch (IOException e) {
logger.log("IO Exception: "+e);
}
logger.log("Lines read: "+linesRead);`
This is not async though. I stumbled onto this issue looking for a better way to parse S3 files. More details here: https://github.com/aws/aws-sdk-java-v2/issues/1358
We have this use case as well, would be happy to have an implementation in the SDK
@zoewangg Any update on when the feature described by @martin-tarjanyi will be added?
Not at this time, but it's in our backlog. A PR is much easier to prioritize, if someone wanted to submit one for this feature.
Most helpful comment
I'm struggling with this too, I have code that does some S3 operations that I want to be able to do asynchronously, but for some of them the result needs to be handed off to for example Commons CSV that expects a
Reader. I have written anAsyncResponseTransformerimplementation that uses aLinkedBlockingQueueinternally to present aInputStreaminterface, but I'm struggling with how to use the API correctly.Specifically, when subscribing and requesting more, what is the unit of the
Subscription#requestcall? Is it bytes? Is it calls to#onNext? Is there any way to know or control the size of the chunks passed to#onNext? I haven't been able to figure this out.For me, it would obviously be much easier if there was an SDK implementation that would give me an
InputStreamorReaderfrom an asynchronous call, or like @zoewangg is requesting, aPublisherthat managed buffering and produced lines, or custom sized chunks.