I hope this is not extremely out of topic, but I'm trying to figure out the best way to consume a Kafka test message and compare it to an expected message.
A few thoughts (that might be wrong):
@Incoming doesn't seem to fit this use caseI'm stuck with these tests at the moment. I'd be happy for some help on how to do that...
@FreifeldRoyi did you look at https://github.com/quarkusio/quarkus-quickstarts/blob/master/kafka-quickstart/src/test/java/org/acme/quarkus/sample/PriceResourceTest.java?
Also in the upcoming version of SmallRye reactive messaging, we provide an in-memory connector that simplifies these tests (and about using kafka broker)
@cescoffier how to override Kafka connector with in-memory connector? I tried mp.messaging.incoming.some_topic.connector=smallrye-in-memory but I got
2020-01-28 08:20:16,107 ERROR [io.sma.rea.mes.imp.ConfiguredChannelFactory] (main) Unable to create the publisher or subscriber during initialization: java.lang.IllegalArgumentException: Unknown connector for some_topic.
at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.lambda$createPublisherBuilder$6(ConfiguredChannelFactory.java:143)
at java.util.Optional.orElseThrow(Optional.java:290)
at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.createPublisherBuilder(ConfiguredChannelFactory.java:143)
at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.lambda$register$4(ConfiguredChannelFactory.java:123)
at java.util.HashMap.forEach(HashMap.java:1289)
at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.register(ConfiguredChannelFactory.java:123)
at io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory.initialize(ConfiguredChannelFactory.java:118)
It seems there is no io/smallrye/reactive/messaging/connector/InMemoryConnector.java class in in quarkus-smallrye-reactive-messaging*.
Yes, it's not released yet.
Do you have a rough estimate when it will be released?
SmallRye reactive messaging 2.0.1 was release and I tried https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2/testing/testing.html with quarkus 1.4.1. But I get the exception:
java.lang.RuntimeException: java.lang.RuntimeException: io.quarkus.builder.BuildException: Build failure: Build failed due to errors
[error]: Build step io.quarkus.arc.deployment.ArcProcessor#validate threw an exception: javax.enterprise.inject.spi.DeploymentException: javax.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type io.smallrye.reactive.messaging.connectors.InMemoryConnector and qualifiers [@Any]
- java member: de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest#connector
- declared on CLASS bean [types=[de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest, java.lang.Object], qualifiers=[@Default, @Any], target=de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest]
at io.quarkus.arc.processor.BeanDeployment.processErrors(BeanDeployment.java:973)
at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:240)
at io.quarkus.arc.processor.BeanProcessor.initialize(BeanProcessor.java:135)
at io.quarkus.arc.deployment.ArcProcessor.validate(ArcProcessor.java:358)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:567)
at io.quarkus.deployment.ExtensionLoader$2.execute(ExtensionLoader.java:931)
at io.quarkus.builder.BuildContext.run(BuildContext.java:277)
at org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
at org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:2046)
at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1578)
at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1452)
at java.base/java.lang.Thread.run(Thread.java:830)
at org.jboss.threads.JBossThread.run(JBossThread.java:479)
Caused by: javax.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type io.smallrye.reactive.messaging.connectors.InMemoryConnector and qualifiers [@Any]
- java member: de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest#connector
- declared on CLASS bean [types=[de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest, java.lang.Object], qualifiers=[@Default, @Any], target=de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest]
at io.quarkus.arc.processor.Beans.resolveInjectionPoint(Beans.java:486)
at io.quarkus.arc.processor.BeanInfo.init(BeanInfo.java:362)
at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:232)
... 14 more
at io.quarkus.test.junit.QuarkusTestExtension.beforeEach(QuarkusTestExtension.java:255)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeEachCallbacks$1(TestMethodTestDescriptor.java:161)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeBeforeMethodsOrCallbacksUntilExceptionOccurs$5(TestMethodTestDescriptor.java:197)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeMethodsOrCallbacksUntilExceptionOccurs(TestMethodTestDescriptor.java:197)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeBeforeEachCallbacks(TestMethodTestDescriptor.java:160)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:71)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:135)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1507)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1507)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:248)
at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$5(DefaultLauncher.java:211)
at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:226)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:199)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:141)
at org.eclipse.jdt.internal.junit5.runner.JUnit5TestReference.run(JUnit5TestReference.java:98)
at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:41)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:542)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:770)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:464)
at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:210)
Caused by: java.lang.RuntimeException: io.quarkus.builder.BuildException: Build failure: Build failed due to errors
[error]: Build step io.quarkus.arc.deployment.ArcProcessor#validate threw an exception: javax.enterprise.inject.spi.DeploymentException: javax.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type io.smallrye.reactive.messaging.connectors.InMemoryConnector and qualifiers [@Any]
- java member: de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest#connector
- declared on CLASS bean [types=[de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest, java.lang.Object], qualifiers=[@Default, @Any], target=de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest]
at io.quarkus.arc.processor.BeanDeployment.processErrors(BeanDeployment.java:973)
at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:240)
at io.quarkus.arc.processor.BeanProcessor.initialize(BeanProcessor.java:135)
at io.quarkus.arc.deployment.ArcProcessor.validate(ArcProcessor.java:358)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:567)
at io.quarkus.deployment.ExtensionLoader$2.execute(ExtensionLoader.java:931)
at io.quarkus.builder.BuildContext.run(BuildContext.java:277)
at org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
at org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:2046)
at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1578)
at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1452)
at java.base/java.lang.Thread.run(Thread.java:830)
at org.jboss.threads.JBossThread.run(JBossThread.java:479)
Caused by: javax.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type io.smallrye.reactive.messaging.connectors.InMemoryConnector and qualifiers [@Any]
- java member: de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest#connector
- declared on CLASS bean [types=[de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest, java.lang.Object], qualifiers=[@Default, @Any], target=de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest]
at io.quarkus.arc.processor.Beans.resolveInjectionPoint(Beans.java:486)
at io.quarkus.arc.processor.BeanInfo.init(BeanInfo.java:362)
at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:232)
... 14 more
at io.quarkus.runner.bootstrap.AugmentActionImpl.runAugment(AugmentActionImpl.java:247)
at io.quarkus.runner.bootstrap.AugmentActionImpl.createInitialRuntimeApplication(AugmentActionImpl.java:130)
at io.quarkus.runner.bootstrap.AugmentActionImpl.createInitialRuntimeApplication(AugmentActionImpl.java:52)
at io.quarkus.test.junit.QuarkusTestExtension.doJavaStart(QuarkusTestExtension.java:131)
at io.quarkus.test.junit.QuarkusTestExtension.ensureStarted(QuarkusTestExtension.java:269)
at io.quarkus.test.junit.QuarkusTestExtension.beforeAll(QuarkusTestExtension.java:292)
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeBeforeAllCallbacks$7(ClassBasedTestDescriptor.java:359)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeBeforeAllCallbacks(ClassBasedTestDescriptor.java:359)
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:189)
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:78)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:132)
... 31 more
Caused by: io.quarkus.builder.BuildException: Build failure: Build failed due to errors
[error]: Build step io.quarkus.arc.deployment.ArcProcessor#validate threw an exception: javax.enterprise.inject.spi.DeploymentException: javax.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type io.smallrye.reactive.messaging.connectors.InMemoryConnector and qualifiers [@Any]
- java member: de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest#connector
- declared on CLASS bean [types=[de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest, java.lang.Object], qualifiers=[@Default, @Any], target=de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest]
at io.quarkus.arc.processor.BeanDeployment.processErrors(BeanDeployment.java:973)
at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:240)
at io.quarkus.arc.processor.BeanProcessor.initialize(BeanProcessor.java:135)
at io.quarkus.arc.deployment.ArcProcessor.validate(ArcProcessor.java:358)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:567)
at io.quarkus.deployment.ExtensionLoader$2.execute(ExtensionLoader.java:931)
at io.quarkus.builder.BuildContext.run(BuildContext.java:277)
at org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
at org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:2046)
at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1578)
at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1452)
at java.base/java.lang.Thread.run(Thread.java:830)
at org.jboss.threads.JBossThread.run(JBossThread.java:479)
Caused by: javax.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type io.smallrye.reactive.messaging.connectors.InMemoryConnector and qualifiers [@Any]
- java member: de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest#connector
- declared on CLASS bean [types=[de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest, java.lang.Object], qualifiers=[@Default, @Any], target=de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest]
at io.quarkus.arc.processor.Beans.resolveInjectionPoint(Beans.java:486)
at io.quarkus.arc.processor.BeanInfo.init(BeanInfo.java:362)
at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:232)
... 14 more
at io.quarkus.builder.Execution.run(Execution.java:115)
at io.quarkus.builder.BuildExecutionBuilder.execute(BuildExecutionBuilder.java:79)
at io.quarkus.deployment.QuarkusAugmentor.run(QuarkusAugmentor.java:156)
at io.quarkus.runner.bootstrap.AugmentActionImpl.runAugment(AugmentActionImpl.java:245)
... 42 more
Caused by: javax.enterprise.inject.spi.DeploymentException: javax.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type io.smallrye.reactive.messaging.connectors.InMemoryConnector and qualifiers [@Any]
- java member: de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest#connector
- declared on CLASS bean [types=[de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest, java.lang.Object], qualifiers=[@Default, @Any], target=de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest]
at io.quarkus.arc.processor.BeanDeployment.processErrors(BeanDeployment.java:973)
at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:240)
at io.quarkus.arc.processor.BeanProcessor.initialize(BeanProcessor.java:135)
at io.quarkus.arc.deployment.ArcProcessor.validate(ArcProcessor.java:358)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:567)
at io.quarkus.deployment.ExtensionLoader$2.execute(ExtensionLoader.java:931)
at io.quarkus.builder.BuildContext.run(BuildContext.java:277)
at org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
at org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:2046)
at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1578)
at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1452)
at java.base/java.lang.Thread.run(Thread.java:830)
at org.jboss.threads.JBossThread.run(JBossThread.java:479)
Caused by: javax.enterprise.inject.UnsatisfiedResolutionException: Unsatisfied dependency for type io.smallrye.reactive.messaging.connectors.InMemoryConnector and qualifiers [@Any]
- java member: de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest#connector
- declared on CLASS bean [types=[de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest, java.lang.Object], qualifiers=[@Default, @Any], target=de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocketIntegrationTest]
at io.quarkus.arc.processor.Beans.resolveInjectionPoint(Beans.java:486)
at io.quarkus.arc.processor.BeanInfo.init(BeanInfo.java:362)
at io.quarkus.arc.processor.BeanDeployment.init(BeanDeployment.java:232)
... 14 more
You are right, some recent change broke it. I will fix it before next release.
_For the record:_ It requires the in-memory connector to be indexed.
WOOHOO!
Thnx @cescoffier
Now I get java.lang.IllegalArgumentException: Unknown channel computations when calling connector.sink("computations"); in the test.
I think the channels are configured and connected before the @BeforeAll is called and so the in-memory connector is not used.
Log output before @BeforeAll is called in a @QuarkusTest
Mai 03, 2020 11:41:52 AM org.jboss.threads.Version <clinit>
INFO: JBoss Threads version 3.1.1.Final
Mai 03, 2020 11:41:54 AM io.undertow.websockets.jsr.ServerWebSocketContainer addEndpointInternal
INFO: UT026003: Adding annotated server endpoint class de.uni_stuttgart.tik.viplab.websocket_api.ComputationWebSocket for path /computations
Mai 03, 2020 11:41:54 AM org.jboss.threads.Version <clinit>
INFO: JBoss Threads version 3.1.1.Final
Mai 03, 2020 11:41:54 AM io.smallrye.reactive.messaging.extension.MediatorManager initializeAndRun
INFO: Deployment done... start processing
Mai 03, 2020 11:41:54 AM io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory <init>
INFO: Found incoming connectors: [smallrye-amqp, smallrye-in-memory]
Mai 03, 2020 11:41:54 AM io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory <init>
INFO: Found outgoing connectors: [smallrye-amqp, smallrye-in-memory]
Mai 03, 2020 11:41:54 AM io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory initialize
INFO: Channel manager initializing...
Mai 03, 2020 11:41:54 AM io.smallrye.reactive.messaging.amqp.AmqpConnector getClient
INFO: AMQP broker configured to localhost:5672 for channel computations
Mai 03, 2020 11:41:54 AM io.smallrye.reactive.messaging.extension.MediatorManager initializeAndRun
INFO: Initializing mediators
Mai 03, 2020 11:41:54 AM io.smallrye.reactive.messaging.extension.MediatorManager weaving
INFO: Connecting mediators
Mai 03, 2020 11:41:54 AM io.smallrye.reactive.messaging.extension.MediatorManager weaving
INFO: Connecting emitter to sink computations
Mai 03, 2020 11:41:55 AM io.quarkus.runtime.Timing printStartupTime
INFO: Quarkus 1.4.1.Final started in 2.781s. Listening on: http://0.0.0.0:8081
Mai 03, 2020 11:41:55 AM io.quarkus.runtime.Timing printStartupTime
INFO: Profile test activated.
Mai 03, 2020 11:41:55 AM io.quarkus.runtime.Timing printStartupTime
INFO: Installed features: [cdi, mutiny, resteasy, scheduler, servlet, smallrye-context-propagation, smallrye-health, smallrye-metrics, smallrye-reactive-messaging, smallrye-reactive-messaging-amqp, smallrye-reactive-streams-operators, undertow-websockets, vertx]
You can see INFO: AMQP broker configured to localhost:5672 for channel computations that the channel "computations" uses the amqp connector.
Workaround:
Adding %test.mp.messaging.outgoing.computations.connector=smallrye-in-memory to the application.properties solve the problem.
It's still not integrated in Quarkus. I'm going to open the PR today.
You also need to switch the channel as indicated on https://smallrye.io/smallrye-reactive-messaging/smallrye-reactive-messaging/2/testing/testing.html.
I called InMemoryConnector.switchOutgoingChannelsToInMemory("computations"); here is the source code: https://github.com/VirtualProgrammingLab/viplab-websocket-api/blob/test-parameters/websocket-api-impl/src/test/java/de/uni_stuttgart/tik/viplab/websocket_api/ComputationWebSocketTest.java#L60
Did you try from a Quarkus Test Resource, as @BeforeAll might be called after the application start?
@cescoffier I don't know what you mean by "Quarkus Test Resource", I have a test class annotated with @QuarkusTest and a method annotated with @BeforeAll which switch to in memory connector and a method annotated with @Test. I stepped through the test with the debugger and the @BeforeAll method is called but I think to late.
@Legion2 something like https://github.com/quarkusio/quarkus-quickstarts/blob/master/kafka-quickstart/src/test/java/org/acme/kafka/KafkaResource.java, but instead of starting a container, switch the channels.
@Legion2 something like:
/**
* Use the in-memory connector to avoid having to use a broker.
*/
public class FakeKafkaResource implements QuarkusTestResourceLifecycleManager {
@Override
public Map<String, String> start() {
Map<String, String> env = new HashMap<>();
Map<String, String> props1 = InMemoryConnector.switchIncomingChannelsToInMemory("prices");
Map<String, String> props2 = InMemoryConnector.switchOutgoingChannelsToInMemory("my-data-stream");
env.putAll(props1);
env.putAll(props2);
return env;
}
@Override
public void stop() {
InMemoryConnector.clear();
}
}
@cescoffier Thanks the test now works with the QuarkusTestResource.