Can someone please provide an example of limiting the number of pulling pubsub threads with the new API so we can start testing?
In the old API it was possible to limit the number of concurrent threads that pulled messages with the following code:
PubSub.MessageConsumer consumer = pubsub.pullAsync(
subscription,
messageProcessor,
PubSub.PullOption.maxQueuedCallbacks(maxQueuedCallbacks),
PubSub.PullOption.executorFactory(executorFactory)
);
where maxQueuedCallbacks was an integer amount of msgs to pull and
executorFactory was provided from this
/**
* Create an ExecutorFactory instance for use with a single
* {@link PubSub.MessageConsumer}.
*
* <p>The number of threads in the executor determines how many messages can be processed at one
* time.</p>
*
* @see PubSub.PullOption#executorFactory(ExecutorFactory)
*/
private static ExecutorFactory<ExecutorService> createExecutor(
final int threads,
final String threadNameFormat) {
final ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat(threadNameFormat)
.build();
final ExecutorService executorService = Executors.newFixedThreadPool(threads, threadFactory);
return new ExecutorFactory<ExecutorService>() {
@Override
public ExecutorService get() {
return executorService;
}
@Override
public void release(final ExecutorService executor) {
executorService.shutdownNow();
}
};
}
There is not a way to do this yet.
ok I see.
I take it back, it is possible to do this. Example:
// InstantiatingExecutorProvider will create a new executor for each thing it is passed to;
// Here, Publisher and Subscriber will each get their own executor.
ExecutorProvider executorProvider =
InstantiatingExecutorProvider.newBuilder()
.setExecutorThreadCount(1)
.build();
Publisher publisher =
Publisher.newBuilder(topicName).setExecutorProvider(executorProvider).build();
Subscriber subscriber =
Subscriber.newBuilder(topicName).setExecutorProvider(executorProvider).build();
(Note - this comment has been edited since its original version)
I gave the wrong code in my last comment - it was setting the executor provider for the channel, not for the publishing/subscribing. I have fixed the code.
Thanks!
This sounds like https://github.com/GoogleCloudPlatform/google-cloud-java/issues/1683
@garrettjonesgoogle
When trying the above code using 0.12.0-alpha Publisher.newBuilderdoes't exist.
Do I need another version or is the example wrong?
The example is now out of date - we renamed newBuilder to defaultBuilder (since it returns a prepopulated Builder). New code sample:
// InstantiatingExecutorProvider will create a new executor for each thing it is passed to;
// Here, Publisher and Subscriber will each get their own executor.
ExecutorProvider executorProvider =
InstantiatingExecutorProvider.newBuilder()
.setExecutorThreadCount(1)
.build();
Publisher publisher =
Publisher.defaultBuilder(topicName).setExecutorProvider(executorProvider).build();
Subscriber subscriber =
Subscriber.defaultBuilder(topicName).setExecutorProvider(executorProvider).build();
thanks!
I have now scratched my head long enough to admit failure...
@garrettjonesgoogle what is wrong with my testcode below (it exits after around 10-20 seconds without receiving any messages)?
static void setUp() throws IOException {
ExecutorProvider executorProvider =
InstantiatingExecutorProvider.newBuilder()
.setExecutorThreadCount(1)
.build();
String projectId = "gcp-project";
String subscriptionId = "test-subscription";
SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId);
// Instantiate an asynchronous message receiver
MessageReceiver receiver =
new MessageReceiver() {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
System.out.println("Got message:" + message.toString());
consumer.ack();
}
};
Subscriber subscriber =
Subscriber.defaultBuilder(subscriptionName, receiver)
.setExecutorProvider(executorProvider).build();
subscriber.addListener(new Subscriber.Listener() {
public void failed(Subscriber.State from, Throwable failure) {
System.out.println(failure);
}
}, executorProvider.getExecutor());
subscriber.startAsync();
}
@pongad could you take a look?
@pongad ^^
@fonzy2013 Sorry I didn't see this. Which version are you using? Subscriber previously had a bug where it might deadlock an executor. I think this is what happened here.
The bug was fixed in #1915. The newest version (0.17.1-alpha) should definitely have this fix in.
I can give it a whirl. I was using the version which was latest at the time.
@pongad, it still didn't work. I am using version 0.17.2-alpha.
My complete code is pasted below for convinience:
package com.spotify;
import com.google.api.gax.grpc.ExecutorProvider;
import com.google.api.gax.grpc.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.spi.v1.AckReplyConsumer;
import com.google.cloud.pubsub.spi.v1.MessageReceiver;
import com.google.cloud.pubsub.spi.v1.Subscriber;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.SubscriptionName;
import java.io.IOException;
/**
* Hello world!
* pubsub.subscribe.project = "build-artifact-archiver"
* pubsub.subscribe.topic = "tc-development-build-events"
* pubsub.subscribe.subscription = "tc-development-build-events-subscription"
*/
public class App {
public static void main( String[] args ) throws Exception {
System.out.println( "Hello World!" );
App.startSubscribe( "build-artifact-archiver", "tc-development-build-events-subscription");
}
public static void startSubscribe(final String projectName,
final String subscriptionName) {
SubscriptionName subscription = SubscriptionName.create(projectName, subscriptionName);
ExecutorProvider executorProvider =
InstantiatingExecutorProvider.newBuilder()
.setExecutorThreadCount(1)
.build();
MessageReceiver receiver =
new MessageReceiver() {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
ByteString data = message.getData();
System.out.println(data);
consumer.ack();
}
};
Subscriber subscriber = null;
try {
subscriber = Subscriber.defaultBuilder(subscription, receiver).setExecutorProvider(executorProvider).build();
subscriber.addListener(
new Subscriber.Listener() {
@Override
public void failed(Subscriber.State from, Throwable failure) {
// Handle failure. This is called when the Subscriber encountered a fatal error
// and is shutting down.
System.err.println(failure);
}
},
MoreExecutors.directExecutor());
subscriber.startAsync().awaitRunning();
Thread.sleep(6000000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (subscriber != null) {
//subscriber.stopAsync();
}
}
}
}
I experienced the same issue and added #2041
I guess #2041 causes the above code sample to not work.
@pongad Is this the case?
@fonzy2013 I hope that #2049 will at least alleviate the problem. We should be publishing a new version soon. If you'd like, you could build from master and try.
I adapted your code here for testing, and things seem to work on my machine.
As part of #2049, the subscriber will take a while to fully spin up, so awaitRunning can take a while to return. You should start consuming messages pretty quickly though.
If it's still deadlocking, could you please share a stack trace? I usually use jstack to take stack snapshot of a running java process.
Since #2041 is resolved, I'll resolve this for now. Please reopen if the problem persists.
Hey,
I am now creating an async pull client that subscribe for a subscription, I am using
compile group: 'com.google.cloud', name: 'google-cloud-pubsub', version: '0.28.0-beta'
Using the following code:
public void startStreaming() {
Subscriber subscriber = null;
try {
SubscriptionName subscription = SubscriptionName.of("topic-test",
"subscription-id-1234");
ExecutorProvider executorProvider =
InstantiatingExecutorProvider.newBuilder()
.setExecutorThreadCount(1)
.build();
MessageReceiver receiver =
new MessageReceiver() {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
try {
String msg = message.getData().toStringUtf8();
System.out.println(msg);
} catch (Exception e) {
throw new RuntimeException("Failed to pull messages from topic");
}
consumer.ack();
}
};
subscriber = Subscriber
.newBuilder(subscription, receiver)
.setExecutorProvider(executorProvider)
.build();
subscriber.addListener(
new Subscriber.Listener() {
@Override
public void failed(Subscriber.State from, Throwable failure) {
}
},
MoreExecutors.directExecutor());
subscriber.startAsync().awaitRunning();
// why should I put sleep here??
Thread.sleep(60000);
} catch (Exception e) {
} finally {
if (subscriber != null) {
subscriber.stopAsync().awaitTerminated();
}
}
}
It works fine for 2-3 minutes and get stuck with no more ingestion of messages, I am sure that there are messages as once I restarted the pulling client, it reads data normally.
Every time it get stuck, I saw the below log:
Nov 12, 2017 7:33:17 PM com.google.cloud.pubsub.v1.StreamingSubscriberConnection sendAckOperations
WARNING: failed to send acks
java.lang.IllegalStateException: call was cancelled
at com.google.common.base.Preconditions.checkState(Preconditions.java:444)
at io.grpc.internal.ClientCallImpl.sendMessage(ClientCallImpl.java:405)
at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52)
at io.grpc.ForwardingClientCall.sendMessage(ForwardingClientCall.java:52)
at io.grpc.stub.ClientCalls$CallToStreamObserverAdapter.onNext(ClientCalls.java:286)
at com.google.cloud.pubsub.v1.StreamingSubscriberConnection.sendAckOperations(StreamingSubscriberConnection.java:274)
at com.google.cloud.pubsub.v1.MessageDispatcher.processOutstandingAckOperations(MessageDispatcher.java:602)
at com.google.cloud.pubsub.v1.MessageDispatcher.access$2100(MessageDispatcher.java:56)
at com.google.cloud.pubsub.v1.MessageDispatcher$AckDeadlineAlarm.run(MessageDispatcher.java:529)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
And when don't use ExecutorProvider executorProvider I got the below log
2017-11-12 19:08:43 [grpc-default-worker-ELG-2-3] DEBUG io.grpc.netty.NettyClientHandler [id: 0xdc0e7c4d, L:/10.8.17.4:47347 - R:pubsub.googleapis.com/1.1.1.1:111] OUTBOUND WINDOW_UPDATE: streamId=3 windowSizeIncrement=1048576
2017-11-12 19:08:43 [grpc-default-worker-ELG-2-4] DEBUG io.grpc.netty.NettyClientHandler [id: 0x330498ce, L:/10.8.17.4:47346 - R:pubsub.googleapis.com/1.1.1.1:111] OUTBOUND GO_AWAY: lastStreamId=0 errorCode=0 length=0 bytes=
2017-11-12 19:08:43 [grpc-default-worker-ELG-2-3] DEBUG io.grpc.netty.NettyClientHandler [id: 0xdc0e7c4d, L:/10.8.17.4:47347 - R:pubsub.googleapis.com/1.1.1.1:111] OUTBOUND GO_AWAY: lastStreamId=0 errorCode=0 length=0 bytes=
2017-11-12 19:08:44 [threadDeathWatcher-1-1] DEBUG io.netty.buffer.PoolThreadCache Freed 3 thread-local buffer(s) from thread: threadDeathWatcher-1-1
2017-11-12 19:08:44 [threadDeathWatcher-1-1] DEBUG io.netty.buffer.PoolThreadCache Freed 1 thread-local buffer(s) from thread: threadDeathWatcher-1-1
2017-11-12 19:08:44 [grpc-default-worker-ELG-2-3] DEBUG io.netty.buffer.PoolThreadCache Freed 153 thread-local buffer(s) from thread: grpc-default-worker-ELG-2-3
2017-11-12 19:08:44 [grpc-default-worker-ELG-2-4] DEBUG io.netty.buffer.PoolThreadCache Freed 131 thread-local buffer(s) from thread: grpc-default-worker-ELG-2-4
Thank you for the report @AgoloKarimTawfik. Since this is unrelated to the original bug, I have opened separate issues for them. Let's track there.
Thanks @pongad , just to know, I've been trying lots of stuff, and one of the trials worked for me fine.
What I did is that I commented out the below lines, and working so fine, and not get stopped till now (it has been working for few hours now)
// if (subscriber != null) {
// subscriber.stopAsync().awaitTerminated();
// }
I don't think that make sense, however it is working fine, do you have any explaination for this @pongad ?
@AgoloKarimTawfik Sorry I forgot to actually answer your question!
The stopAsync method tells the subscriber to stop pulling messages, and awaitTerminated waits until the subscriber fully stops. So if you remove the line, it'd make sense that the subscriber would keep going forever.
In the example, we call these stop methods after sleeping for one minute. So our subscriber runs for about a minute before stopping, like you observed. In a real application, you wouldn't do this. For instance, if you're bring up servers to handle messages, you'd probably call the methods before shutting down the servers to turn things off a little more gracefully.
Am I making any sense?
Make much sense to me now, this is now clear enough, but I think I went through the sample documentation and was not that clear, I think if such explanation found in the sample would me very helpful.
Thanks a lot @pongad, this clear things to me.
@AgoloKarimTawfik No problem. Glad I could help. #2616 should add the explanation. If you still think the doc isn't clear, please feel free to comment on that PR. The feedback would be valuable.
@pondag Much appreciated!
Most helpful comment
@AgoloKarimTawfik Sorry I forgot to actually answer your question!
The
stopAsyncmethod tells the subscriber to stop pulling messages, andawaitTerminatedwaits until the subscriber fully stops. So if you remove the line, it'd make sense that the subscriber would keep going forever.In the example, we call these stop methods after sleeping for one minute. So our subscriber runs for about a minute before stopping, like you observed. In a real application, you wouldn't do this. For instance, if you're bring up servers to handle messages, you'd probably call the methods before shutting down the servers to turn things off a little more gracefully.
Am I making any sense?