Quarkus: Issues with @Outgoing

Created on 17 Dec 2019  路  16Comments  路  Source: quarkusio/quarkus

Describe the bug
I've developed a similar scenario as documented in the Kafka quickstart guide. When I use the annotations "Incoming" (Kafka message), "Outgoing" (in memory channel) and "Broadcast" together on one method, the method is not invoked and no error is displayed.

Expected behavior
I expected the same behaviour as in the the above linked quickstart guide. A Kafka message comes in, is potentially converted and an in memory message is posted to a channel.

Actual behavior
The method with the three annotations is not even invoked. At least the print statement in the first line is not run and I don't see any errors.

As a workaround I don't use Outgoing and Broadcast, but Emitter.send instead. This allows me to invoke another method in the same class. See code.

Unfortunately this workaround doesn't work when I try to stream this data via server sent events in another class. See code. In this case a get a 'channel not connected' error. As a second workaround I added an 'Incoming' annotation to an empty method in the same class. See code.

To Reproduce
Steps to reproduce the behavior:
I've open sourced the version of my sample which works with the two workarounds. See the instructions how to set it up.

To reproduce the issue replace this code ...

@Incoming("new-article-created")
public void process(String articleId) {
   System.out.println("Kafka message received: new-article-created - " + articleId);
   emitter.send(articleId);
}

... with this code:

@Incoming("new-article-created")
@Outgoing("my-data-stream")
@Broadcast
public String process(String articleId) {
   System.out.println("Kafka message received: new-article-created - " + articleId);
   return articleId;
}

The method will no longer be invoked.

Configuration
Incoming Kafka message - see here

Outgoing Kafka message (from other service) - see here

Environment (please complete the following information):

  • Output of java -version: 11 - see here
  • GraalVM version (if different from Java): not used
  • Quarkus version or git rev: 1.0.1.Final - see here
aredocumentation arereactive-messaging triagout-of-date

All 16 comments

cc @cescoffier

@nheidloff, where's the configuration for your outgoing channel?

For the in-memory outgoing channel I don't have any configuration other than the name of the channel in the Incoming and Outgoing annotations. Do I need anything else? In the guide it sounds like this is not necessary for in-memory channels. I want to consume it in this code.

The configuration for the outgoing Kafka channel is in the source code of a second service only since I use the Kafka client. See code.

Is this issue related to https://groups.google.com/forum/#!topic/quarkus-dev/5xRN8Xna484?

The 'Stream not yet connected' error I saw as well which is why I had to add the empty method with the Incoming annotation in the same class.

Your method is not invoked because I suppose no one subscribed to my-data-stream. Do you have an:

  1. @Incoming("my-data-stream")
  2. @Inject @Channel("my-data-stream")

In the second case, you will need to subscribe manually.

Yes, I have (2) - see here.

Since this didn't work, I also added (1) as workaround - see here.

The subscription will happen when someone connects to the sse endpoint.

[will try to reproduce it]

Thanks @cescoffier. Just tried it. The 'Stream not yet connected' error disappears as soon as someone connects to the stream. How about a warning rather than error in the log?

This doesn't solve the real issue yet. My method with 3 annotations is not invoked for some reason. Would be great if you could take a look.

Thanks!

So, I looked at the code. I want to be sure I understand correctly.

You are receiving messages from Kafka (or whatever it's not important here).
These messages are processed and the result it broadcasted to 2 places:

  1. a method with @Incoming
  2. a @Channel

am I right?

If so, I was able to get an "excerpt" of your app working:

application.properties:

mp.messaging.incoming.new-article-created.connector=smallrye-kafka
mp.messaging.incoming.new-article-created.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

mp.messaging.outgoing.article-created.connector=smallrye-kafka
mp.messaging.outgoing.article-created.topic=new-article-created
mp.messaging.outgoing.article-created.value.serializer=org.apache.kafka.common.serialization.StringSerializer

ExampleResource.java:

package org.acme;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;

import org.jboss.resteasy.annotations.SseElementType;
import org.reactivestreams.Publisher;

import io.smallrye.reactive.messaging.annotations.Channel;

@Path("/hello")
public class ExampleResource {

    @GET
    @Produces(MediaType.TEXT_PLAIN)
    public String hello() {
        return "hello";
    }

    @Inject
    @Channel("my-data-stream") Publisher<String> newArticles;

    @GET
    @Path("/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS) 
    @SseElementType("text/plain") 
    public Publisher<String> stream() { 
        return newArticles;
    }
}

The listener:

package org.acme;

import org.eclipse.microprofile.reactive.messaging.Incoming;

import java.util.concurrent.TimeUnit;

import javax.enterprise.context.ApplicationScoped;

import io.reactivex.Flowable;
import io.smallrye.reactive.messaging.annotations.Broadcast;
import org.eclipse.microprofile.reactive.messaging.Outgoing;

@ApplicationScoped
public class NewArticleListener {


    @Incoming("new-article-created")
    @Outgoing("my-data-stream")
    @Broadcast
    public String processWorkaround(String articleId) {
        System.out.println("Kafka message received: new-article-created - " + articleId);
        return articleId;
    }

    @Incoming("my-data-stream")
    public void processAnotherWorkaround(String articleId) {
        System.out.println("GOT " + articleId);
    }


    @Outgoing("article-created")
    public Flowable<String> stream() {
        return Flowable.interval(10, 10, TimeUnit.SECONDS)
            .map(l -> Long.toString(l));
    }

}

I've added the outgoing stream() method to produce some kafka messages.

Thanks @cescoffier. This works, even without "article-created":

public class NewArticleListener {

    @Incoming("new-article-created")
    @Outgoing("my-data-stream")
    @Broadcast
    public String process(String articleId) {
        System.out.println("Kafka message received: new-article-created - " + articleId);
        return articleId;
    }

    @Incoming("my-data-stream")
    public void processWorkaround(String articleId) {
    }
}

It would be even better if the "processWorkaround" wouldn't be necessary. Without this method, it doesn't work.

Oh I finally understand your issue, I was thinking you were broadcasting to the channel and the processWorkaround method, but that was the work around :-)

I don't have any issue with having only:

    @Incoming("new-article-created")
    @Outgoing("my-data-stream")
    @Broadcast
    public String processWorkaround(String articleId) {
        System.out.println("Kafka message received: new-article-created - " + articleId);
        return articleId;
    }

That what is not working for you right?

Aaaah. You are right. It does work, but only when the SSE stream endpoint is connected. So this tricked me two times in my scenario :). First time since I saw the error in the logs which should only be a warning (see above) and second time since I didn't see any error and my method was not invoked when the incoming message arrived.

I was confused since the method ('process' in my code, 'processWorkaround' in your code above) is not even invoked without the SSE stream being connected. Since not even the print statement in the first line was executed, I assumed that meant that there was an issue with the incoming message.

Maybe it's just me, but that behaviour I didn't expect. Does it make sense to mention this in the guides? It's probably in the MicroProfile specs, but I didn't find it there.

Thanks a lot, @cescoffier!

We should mention it somewhere, probably in the guide.

Streams are lazy, actually double-lazy :-)

  1. Nothing happens until someone subscribe to it (here the SSE client)
  2. Even (1) is generally not enough. The subscriber needs to request items (here this is done automatically by RestEasy)

Has been fixed.

Was this page helpful?
0 / 5 - 0 ratings