It seems right now google-cloud-pubsub doesn't support gcloud emulator. At least I'm not able to configure it for Pub/Sub even though I managed to use an emulator with google-cloud-datastore. I think it is related to https://github.com/GoogleCloudPlatform/google-cloud-datastore/issues/114.
Anyways, when I'm trying to create a subscriber using a channel provider with no credentials. Here are some snippets in Kotlin of how I create a SubscriptionAdminClient:
val channelProvider = InstantiatingChannelProvider.newBuilder()
.setEndpoint("localhost:$port")
.setCredentialsProvider(FixedCredentialsProvider.create(NoCredentials.getInstance()))
.build()
protected fun getSubscriptionAdminClient(): SubscriptionAdminClient {
val adminSettings = SubscriptionAdminSettings.defaultBuilder()
.setChannelProvider(channelProvider)
.build()
return SubscriptionAdminClient.create(adminSettings)
}
When I'm trying to call com.google.cloud.pubsub.spi.v1.SubscriptionAdminClient#createSubscription I'm getting:
com.google.api.gax.grpc.ApiException: io.grpc.StatusRuntimeException: UNKNOWN
at com.google.api.gax.grpc.ExceptionTransformingCallable$ExceptionTransformingFuture.onFailure(ExceptionTransformingCallable.java:109)
at com.google.api.gax.core.ApiFutures$1.onFailure(ApiFutures.java:53)
at com.google.common.util.concurrent.Futures$4.run(Futures.java:1126)
at com.google.common.util.concurrent.MoreExecutors$DirectExecutor.execute(MoreExecutors.java:399)
at com.google.common.util.concurrent.AbstractFuture.executeListener(AbstractFuture.java:902)
at com.google.common.util.concurrent.AbstractFuture.addListener(AbstractFuture.java:636)
at com.google.common.util.concurrent.ForwardingListenableFuture.addListener(ForwardingListenableFuture.java:45)
at com.google.api.gax.core.internal.ApiFutureToListenableFuture.addListener(ApiFutureToListenableFuture.java:53)
at com.google.common.util.concurrent.Futures.addCallback(Futures.java:1138)
at com.google.common.util.concurrent.Futures.addCallback(Futures.java:1076)
at com.google.api.gax.core.ApiFutures.addCallback(ApiFutures.java:48)
at com.google.api.gax.grpc.ExceptionTransformingCallable.futureCall(ExceptionTransformingCallable.java:65)
at com.google.api.gax.grpc.RetryingCallable$GrpcRetryCallable.call(RetryingCallable.java:139)
at com.google.api.gax.grpc.RetryingCallable.futureCall(RetryingCallable.java:84)
at com.google.api.gax.grpc.UnaryCallable.futureCall(UnaryCallable.java:219)
at com.google.api.gax.grpc.UnaryCallable.futureCall(UnaryCallable.java:230)
at com.google.api.gax.grpc.UnaryCallable.call(UnaryCallable.java:258)
at com.google.cloud.pubsub.spi.v1.SubscriptionAdminClient.createSubscription(SubscriptionAdminClient.java:339)
at com.google.cloud.pubsub.spi.v1.SubscriptionAdminClient.createSubscription(SubscriptionAdminClient.java:307)
at com.fkorotkov.gcloud.pubsub.PubSubImpl.registerSubscriber(PubSubImpl.kt:72)
Caused by: io.grpc.StatusRuntimeException: UNKNOWN
at io.grpc.Status.asRuntimeException(Status.java:540)
at io.grpc.stub.ClientCalls$UnaryStreamToFuture.onClose(ClientCalls.java:439)
at io.grpc.ClientInterceptors$CheckedForwardingClientCall.start(ClientInterceptors.java:202)
at io.grpc.ForwardingClientCall.start(ForwardingClientCall.java:47)
at com.google.api.gax.grpc.HeaderInterceptor$1.start(HeaderInterceptor.java:62)
at io.grpc.stub.ClientCalls.startCall(ClientCalls.java:270)
at io.grpc.stub.ClientCalls.asyncUnaryRequestCall(ClientCalls.java:249)
at io.grpc.stub.ClientCalls.futureUnaryCall(ClientCalls.java:186)
at com.google.api.gax.grpc.DirectCallable.futureCall(DirectCallable.java:59)
at com.google.api.gax.grpc.ExceptionTransformingCallable.futureCall(ExceptionTransformingCallable.java:62)
... 55 more
Caused by: java.lang.IllegalStateException: OAuth2Credentials instance does not support refreshing the access token. An instance with a new access token should be used, or a derived type that supports refreshing.
at com.google.auth.oauth2.OAuth2Credentials.refreshAccessToken(OAuth2Credentials.java:182)
at com.google.auth.oauth2.OAuth2Credentials.refresh(OAuth2Credentials.java:149)
at com.google.auth.oauth2.OAuth2Credentials.getRequestMetadata(OAuth2Credentials.java:135)
at io.grpc.auth.ClientAuthInterceptor.getRequestMetadata(ClientAuthInterceptor.java:148)
at io.grpc.auth.ClientAuthInterceptor.access$100(ClientAuthInterceptor.java:62)
at io.grpc.auth.ClientAuthInterceptor$1.checkedStart(ClientAuthInterceptor.java:94)
at io.grpc.ClientInterceptors$CheckedForwardingClientCall.start(ClientInterceptors.java:194)
... 62 more
As I mentioned above I'm able to use Datastore emulator when I run it with --no-legacy flag and by creating a Datastore service like this:
DatastoreOptions.newBuilder()
.setProjectId("unit-testing")
.setHost("localhost:$port")
.setCredentials(NoCredentials.getInstance())
.build()
.getService()
I'm using the latest available version of the libraries at the moment:
{
"group": "com.google.cloud",
"name": "google-cloud-datastore",
"version": "0.13.0-beta"
},
{
"group": "com.google.cloud",
"name": "google-cloud-pubsub",
"version": "0.13.0-alpha"
}
I can replicate this on my desktop; it looks like a bug.
This seems to work correctly however:
ManagedChannel chan = ManagedChannelBuilder.forAddress("localhost", 8085).usePlaintext(true).build();
SubscriptionAdminSettings settings =
SubscriptionAdminSettings.defaultBuilder()
.setChannelProvider(FixedChannelProvider.create(chan))
.build();
try (SubscriptionAdminClient subscriptionAdminClient =
SubscriptionAdminClient.create(settings)) {
subscriptionAdminClient.createSubscription(
subscription, topic, PushConfig.getDefaultInstance(), 0);
}
@tcoffee-google Since you're working on auth-related stuff, could you take a look?
@pongad @tcoffee-google thank you for taking a look!
I've created a reproducible example if you'll need to: https://github.com/fkorotkov/snippets/commit/a0dd999135e753b78d2b36ce11623b162e620dcc
Also interesting that if I change pubsub version to 0.17.1-alpha the test case simply hangs. I can see that a container is running through docker ps and by getting logs via docekr logs CONTAINER_ID I can see bunch of:
[pubsub] May 01, 2017 4:38:42 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected non-HTTP/2 connection.
[pubsub] May 01, 2017 4:38:42 PM io.gapi.emulators.netty.NotFoundHandler handleRequest
[pubsub] INFO: Unknown request URI: /bad-request
[pubsub] May 01, 2017 4:38:42 PM io.gapi.emulators.grpc.GrpcServer$3 operationComplete
[pubsub] INFO: Adding handler(s) to newly registered Channel.
[pubsub] May 01, 2017 4:38:42 PM io.gapi.emulators.netty.HttpVersionRoutingHandler channelRead
[pubsub] INFO: Detected non-HTTP/2 connection.
Will play around with 0.17.1-alpha a bit more. Great speed of releases!
@fkorotkov I can also reproduce that. I suspect that's actually gRPC retrying a connection; NoCredentials is probably causing an infinite retry behavior somehow.
The fix I added above seems to solve both issues though. Is it good enough to unblock you for now?
@pongad yeah, works perfectly for me https://github.com/fkorotkov/snippets/commit/d310bf967fb79e2e5617ede076051c3cd7e4e336. Thank you very much! Going to incorporate it into the main app(using an in-memory in-house implementation at the moment).
This can be fixed by adding setPlainText to InstantiatingChannelProvider but it would have to call ManagedChannelBuilder::usePlainText which is beta API.
@garrettjonesgoogle ?
I would prefer not to add the dependency on the beta API within our non-test code (InstantiatingChannelProvider).
I tried the workarounds in code and was able to send a message, however never received it using a pull subscription.
@rcleveng I also just got back to it and my test case hangs on trying to publish a message with bunch of:
[pubsub] May 14, 2017 10:06:53 PM com.google.cloud.iam.testing.v1.shared.authorization.AuthInterceptor interceptCall
[pubsub] INFO: Authentication interceptor: Header value is null
At the moment we use an in-memory implementation of Publisher but would love to move to an official emulator to have better tests.
Actually publishing is fine. Seems I have the same issue as @rcleveng when I'm not able to receive a message after publishing.
@fkorotkov I think your test is incorrect. In your PubSubTestingUtil.kt, you seem to be pulling messages to check whether there are messages pending, is this correct? The following sequence of events can happen:
hasMessages pulls from the same subscription, finds the message, and reports that the subscription isn't empty.hasMessages, and pubsub tries to not send duplicate messages.hasMessages run again. By the same reason as (5), it receives no messages and falsely reports that the subscription is empty.messagesFromSubscriber, finds it empty, and fails.Because we have a concurrent system, writing tests for it is a little difficult. What do you want the test to check?
If you're experimenting with the client library, maybe you just want MessageReceiver to print the message to the console. If you see the message, you can be pretty sure you set things up right.
If you need automated testing, consider using BlockingQueue::poll. You can set the timeout to something improbably long like 30 seconds. The test might flake if your computer is really busy, but it should work well enough under normal circumstances.
@pongad thank you for responding! I was thinking about a similar scenario that I wanted to verify.
Let me explain what I want to achieve. I want to be able in a test to wait for all published messages to be processed to verify that all subscribers are working correctly and I have an expected state.
At the moment I have a wrapper interface around Publisher and two implementations for production and tests. The one for production uses google-cloud-pubsub library. The one for tests is just an in-memory implementation of the same interface with a MultiMap<TopicName, Message> and one additional method to manually process all messages from the multimap.
My current implementation is working like a charm but I want try to reuse the production implementation in tests to test the logic I have there. Bring tests closer to production.
Thanks again for all responses. I will poke around to avoid a possible deadlock.
@fkorotkov I just now realized that my response was too accusatory. I apologize.
Our client library code is pretty complex and very probably have bugs lurking around. If you can't get it to work after suggested workaround, can I trouble you for a stack trace? I cannot seem to reproduce this on my computer, but I'll be happy to help you troubleshoot it.
I workaround it by just waiting for all messages to be processed since I know all message ids(https://github.com/fkorotkov/snippets/commit/1e7f0a21c9a471ac887616673acd5e806330c467). Will try to incorporate this solution into our project.
@fkorotkov Want to give you a heads-up. Our latest release (0.17.2) has a bug described in #1827. If your subscriber is deadlocking, that might be why. If you run into this problem, could you let us know in the linked issue?
We should be releasing a new version soon, which will hopefully fix this.
Im having simliar issue as @fkorotkov and @rcleveng where i dont get any message to my MessageReceiver. The MessageReceiver is not triggered at all.
I have tested the code both with the emulator and on google cloud and i can receive messages when using google cloud.
I have basically started my application with emulator settings and are manually sending a message on the topic by CURL'nig the api.
I get back
{
"messageIds": [
"1"
]
}
So i know that the message has been received on the topic at hand.
Im using this version of pubsub.
com.google.cloud:google-cloud-pubsub:0.17.2-alpha
I have used the settings specified here from @pongad
Any thoughts/suggestions
@Nosfert I cannot seem to reproduce the problem you're facing. I synced up with @rcleveng offline; I think his trouble actually came from misconfiguration.
Do you have some kind of "hello pubsub" application you could share with me in a gist? I'll gladly help you troubleshoot. This problem could be caused by a deadlock; just in case I still cannot reproduce, could you also share with me a stack trace? (I usually run jstack against the running JVM after letting it sit for a couple minutes.)
@pongad
Here is a gist, will try and fix you a jstack or stack-trace later today or maybe tomorrow.
https://gist.github.com/Nosfert/f967eb735efe466afd3ec86c81e1c958
I also added the curl's that im running. One thing that i was thinking, that i have not testet, is that the emulator might not like it when there is an empty data and weird attributes.
@Nosfert Your setup is pretty much the same as mine except I use curl command to create subscription instead. I noticed an issue with the gist you post, though I'm not sure if it's just a copy-paste error.
In the curl command, your topic name is "pubsub-rights.message.test".
However, in PubsubListener, the topic name is "pubsub.message.test", without the "-rights".
I'm not sure how the CreateSubscription on line 23 could succeed though...
@pongad yeah the topic name is a copy paste bug when i tried to make the code a bit more anonymous. Fixed that in Gist.
Line 23-28 tries to create subscription if it does not exist. If it does exist then just get that subscription. And then convert the subscription to a subscriber.
@pongad do you have a simple example for pubsub + emulator in the newest version that runs?
@Nosfert Got it. Here's a gist of what I got working: https://gist.github.com/pongad/d13d5737294b34924ba381c2c9a4cf1f
I just verified it on 0.18.0-alpha
@pongad
I done some alterations on my code after you example.
And I got it to works, so thank you for your help.
Think that the issue that i had was that I did not specify a ChannelProvider for the Subscriber
At row 59
Thanks!
@Nosfert Ah I see! Glad I could help.
I think above discussions resolve the issue. I'll close this, but please reopen if problem persists.
What dependency allows you to use val channelProvider = InstantiatingChannelProvider.newBuilder()....?
The class name is now InstantiatingGrpcChannelProvider and it is present in gax-grpc.
Interesting, is there an updated example of a PubSub emulator then? Ex: https://github.com/GoogleCloudPlatform/google-cloud-java/blob/master/TESTING.md#testing-code-that-uses-pubsub seems to use old code
Most helpful comment
Interesting, is there an updated example of a PubSub emulator then? Ex: https://github.com/GoogleCloudPlatform/google-cloud-java/blob/master/TESTING.md#testing-code-that-uses-pubsub seems to use old code