Trying to spin up Turbine Stream Server and Getting below mentioned Exception when hitting default turbine stream server endpoint http://localhost:8989/.
java version "1.8.0_144"
Spring Cloud Version : Finchley.RC1
Eureka discovery Disabled
Rabbit 3.7.4
command to run rabbit under docker : $ docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
issue reproduction project : demo.zip
2018-05-04 16:18:03.720 INFO 3609 --- [o-eventloop-3-1] o.s.c.n.t.s.TurbineStreamConfiguration : SSE Request Received
2018-05-04 16:18:03.727 INFO 3609 --- [o-eventloop-3-1] o.s.c.n.t.s.TurbineStreamConfiguration : Starting aggregation
Exception in thread "RxComputationScheduler-1" java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:59)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.AbstractMethodError
at io.netty.util.ReferenceCountUtil.touch(ReferenceCountUtil.java:77)
at io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:810)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:704)
at io.netty.channel.DefaultChannelPipeline.write(DefaultChannelPipeline.java:1056)
at io.netty.channel.AbstractChannel.write(AbstractChannel.java:290)
at io.reactivex.netty.channel.DefaultChannelWriter.writeOnChannel(DefaultChannelWriter.java:165)
at io.reactivex.netty.protocol.http.server.HttpServerResponse.writeOnChannel(HttpServerResponse.java:195)
at io.reactivex.netty.channel.DefaultChannelWriter.write(DefaultChannelWriter.java:83)
at io.reactivex.netty.channel.DefaultChannelWriter.writeAndFlush(DefaultChannelWriter.java:65)
at org.springframework.cloud.netflix.turbine.stream.TurbineStreamConfiguration.lambda$null$6(TurbineStreamConfiguration.java:106)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:69)
at rx.observers.Subscribers$5.onNext(Subscribers.java:235)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
at rx.internal.operators.OnSubscribeRefCount$2.onNext(OnSubscribeRefCount.java:120)
at rx.internal.operators.OperatorPublish$PublishSubscriber.dispatch(OperatorPublish.java:585)
at rx.internal.operators.OperatorPublish$PublishSubscriber.onNext(OperatorPublish.java:283)
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:52)
at rx.internal.schedulers.SchedulePeriodicHelper$1.call(SchedulePeriodicHelper.java:72)
at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:189)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
... 7 more
<?xml version="1.0" encoding="UTF-8"?>
...
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Finchley.RC1</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-turbine-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
...
</project>
eureka:
client:
enabled: false
management:
endpoints:
web:
exposure:
include: "*"
endpoint:
health:
show-details: always
logging:
level:
ROOT: info
package com.example.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.turbine.stream.EnableTurbineStream;
@SpringBootApplication
@EnableTurbineStream
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
Source: I/O error on GET request for "http://localhost:8888/application/default": Connection refused; nested exception is java.net.ConnectException: Connection refused
2018-05-06 15:00:20.637 INFO 46083 --- [o-eventloop-3-1] o.s.c.n.t.s.TurbineStreamConfiguration : SSE Request Received
2018-05-06 15:00:20.646 INFO 46083 --- [o-eventloop-3-1] o.s.c.n.t.s.TurbineStreamConfiguration : Starting aggregation
Exception in thread "RxComputationScheduler-1" java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:59)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.AbstractMethodError
at io.netty.util.ReferenceCountUtil.touch(ReferenceCountUtil.java:77)
at io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:810)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:704)
at io.netty.channel.DefaultChannelPipeline.write(DefaultChannelPipeline.java:1056)
at io.netty.channel.AbstractChannel.write(AbstractChannel.java:290)
at io.reactivex.netty.channel.DefaultChannelWriter.writeOnChannel(DefaultChannelWriter.java:165)
at io.reactivex.netty.protocol.http.server.HttpServerResponse.writeOnChannel(HttpServerResponse.java:195)
at io.reactivex.netty.channel.DefaultChannelWriter.write(DefaultChannelWriter.java:83)
at io.reactivex.netty.channel.DefaultChannelWriter.writeAndFlush(DefaultChannelWriter.java:65)
at org.springframework.cloud.netflix.turbine.stream.TurbineStreamConfiguration.lambda$null$6(TurbineStreamConfiguration.java:106)
Seems that jar conflict.
MacBook-Pro:turbine hanrenwei$ mvn dependency:tree|grep 0.4.9
[INFO] | | | | +- io.reactivex:rxnetty-contexts:jar:0.4.9:runtime
[INFO] | | | | \- io.reactivex:rxnetty-servo:jar:0.4.9:runtime
[INFO] | | | \- io.reactivex:rxnetty:jar:0.4.9:runtime
MacBook-Pro:turbine hanrenwei$ mvn dependency:tree|grep 0.3.18
[INFO] | | +- com.netflix.rxnetty:rx-netty:jar:0.3.18:compile
jar confilct
+- org.springframework.cloud:spring-cloud-starter-netflix-turbine-stream:jar:2.0.0.RC1:compile
| +- com.netflix.turbine:turbine-core:jar:2.0.0-DP.2:compile
[INFO] | | +- net.sf.jopt-simple:jopt-simple:jar:4.8:compile
[INFO] | | +- com.netflix.rxnetty:rx-netty:jar:0.3.18:compile
[INFO] | | | +- io.netty:netty-codec-http:jar:4.1.23.Final:compile
[INFO] | | | | \- io.netty:netty-codec:jar:4.1.23.Final:compile
[INFO] | | | \- io.netty:netty-transport-native-epoll:jar:4.1.23.Final:compile
[INFO] | | | +- io.netty:netty-common:jar:4.1.23.Final:compile
[INFO] | | | +- io.netty:netty-buffer:jar:4.1.23.Final:compile
[INFO] | | | +- io.netty:netty-transport-native-unix-common:jar:4.1.23.Final:compile
[INFO] | | | \- io.netty:netty-transport:jar:4.1.23.Final:compile
[INFO] | | | \- io.netty:netty-resolver:jar:4.1.23.Final:compile
[INFO] | | +- org.codehaus.jackson:jackson-mapper-asl:jar:1.9.2:compile
[INFO] | | \- org.codehaus.jackson:jackson-core-asl:jar:1.9.2:compile
[INFO] +- org.springframework.cloud:spring-cloud-starter-netflix-eureka-client:jar:2.0.0.RC1:compile
[INFO] | +- org.springframework.cloud:spring-cloud-starter-netflix-ribbon:jar:2.0.0.RC1:compile
[INFO] | | +- com.netflix.ribbon:ribbon:jar:2.2.5:compile
[INFO] | | | +- com.netflix.ribbon:ribbon-transport:jar:2.2.5:runtime
[INFO] | | | | +- io.reactivex:rxnetty-contexts:jar:0.4.9:runtime
[INFO] | | | | \- io.reactivex:rxnetty-servo:jar:0.4.9:runtime
[INFO] | | | \- io.reactivex:rxnetty:jar:0.4.9:runtime
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-turbine-stream</artifactId>
<exclusions>
<exclusion>
<groupId>io.reactivex</groupId>
<artifactId>rxnetty</artifactId>
</exclusion>
</exclusions>
</dependency>
got
018-05-06 15:24:54.521 INFO 47080 --- [on(4)-127.0.0.1] c.c.c.ConfigServicePropertySourceLocator : Fetching config from server at: http://localhost:8888
2018-05-06 15:24:54.524 WARN 47080 --- [on(4)-127.0.0.1] c.c.c.ConfigServicePropertySourceLocator : Could not locate PropertySource: I/O error on GET request for "http://localhost:8888/application/default": Connection refused; nested exception is java.net.ConnectException: Connection refused
2018-05-06 15:24:57.634 WARN 47080 --- [o-eventloop-3-1] i.n.c.AbstractChannelHandlerContext : An exception '{}' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception:
java.lang.NoSuchMethodError: rx.internal.operators.NotificationLite.instance()Lrx/internal/operators/NotificationLite;
at io.reactivex.netty.protocol.http.UnicastContentSubject$State$BufferedObserver.<init>(UnicastContentSubject.java:243) ~[rx-netty-0.3.18.jar:na]
at io.reactivex.netty.protocol.http.UnicastContentSubject$State$BufferedObserver.<init>(UnicastContentSubject.java:241) ~[rx-netty-0.3.18.jar:na]
at io.reactivex.netty.protocol.http.UnicastContentSubject$State.<init>(UnicastContentSubject.java:197) ~[rx-netty-0.3.18.jar:na]
at io.reactivex.netty.protocol.http.UnicastContentSubject.create(UnicastContentSubject.java:132) ~[rx-netty-0.3.18.jar:na]
at io.reactivex.netty.protocol.http.UnicastContentSubject.create(UnicastContentSubject.java:122) ~[rx-netty-0.3.18.jar:na]
at io.reactivex.netty.protocol.http.UnicastContentSubject.create(UnicastContentSubject.java:117) ~[rx-netty-0.3.18.jar:na]
at io.reactivex.netty.protocol.http.server.ServerRequestResponseC
Here is the issue which i found
mvn dependency:tree|grep netty
[INFO] | | | | | +- io.reactivex:rxnetty-contexts:jar:0.4.9:runtime
[INFO] | | | | | \- io.reactivex:rxnetty-servo:jar:0.4.9:runtime
[INFO] +- io.reactivex:rxnetty:jar:0.4.20:runtime
[INFO] | +- io.netty:netty-codec-http:jar:4.1.23.Final:compile
[INFO] | | \- io.netty:netty-codec:jar:4.1.23.Final:compile
[INFO] | +- io.netty:netty-handler:jar:4.1.23.Final:compile
[INFO] | | +- io.netty:netty-buffer:jar:4.1.23.Final:compile
[INFO] | | \- io.netty:netty-transport:jar:4.1.23.Final:compile
[INFO] | | \- io.netty:netty-resolver:jar:4.1.23.Final:compile
[INFO] | +- io.netty:netty-transport-native-epoll:jar:4.1.23.Final:compile
[INFO] | | +- io.netty:netty-common:jar:4.1.23.Final:compile
[INFO] | | \- io.netty:netty-transport-native-unix-common:jar:4.1.23.Final:compile
[INFO] | | \- io.projectreactor.ipc:reactor-netty:jar:0.7.6.RELEASE:compile
[INFO] | | \- io.netty:netty-handler-proxy:jar:4.1.23.Final:compile
[INFO] | | \- io.netty:netty-codec-socks:jar:4.1.23.Final:compile
netty are using 4.1.23.Final
io.reactivex:rxnetty are using 0.4.9
exception in
at org.springframework.cloud.netflix.turbine.stream.TurbineStreamConfiguration.lambda$null$6(TurbineStreamConfiguration.java:106)
go to code to check
/**
* @author Spencer Gibb
* @author Daniel Lavoie
*/
@Configuration
@EnableConfigurationProperties(TurbineStreamProperties.class)
public class TurbineStreamConfiguration implements SmartLifecycle {
private static final Log log = LogFactory.getLog(TurbineStreamConfiguration.class);
private AtomicBoolean running = new AtomicBoolean(false);
@Autowired
private TurbineStreamProperties properties;
private int turbinePort;
@Bean
public HasFeatures Feature() {
return HasFeatures.namedFeature("Turbine (Stream)",
TurbineStreamProperties.class);
}
@Bean
public PublishSubject<Map<String, Object>> hystrixSubject() {
return PublishSubject.create();
}
@Bean
@SuppressWarnings("deprecation")
public HttpServer<ByteBuf, ServerSentEvent> aggregatorServer() {
// multicast so multiple concurrent subscribers get the same stream
Observable<Map<String, Object>> publishedStreams = StreamAggregator
.aggregateGroupedStreams(hystrixSubject().groupBy(
data -> InstanceKey.create((String) data.get("instanceId"))))
.doOnUnsubscribe(() -> log.info("Unsubscribing aggregation."))
.doOnSubscribe(() -> log.info("Starting aggregation")).flatMap(o -> o)
.publish().refCount();
Observable<Map<String, Object>> ping = Observable.timer(1, 10, TimeUnit.SECONDS)
.map(count -> Collections.singletonMap("type", (Object) "Ping")).publish()
.refCount();
Observable<Map<String, Object>> output = Observable.merge(publishedStreams, ping);
this.turbinePort = this.properties.getPort();
if (this.turbinePort <= 0) {
this.turbinePort = SocketUtils.findAvailableTcpPort(40000);
}
HttpServer<ByteBuf, ServerSentEvent> httpServer = RxNetty
.createHttpServer(this.turbinePort, (request, response) -> {
log.info("SSE Request Received");
response.getHeaders().setHeader("Content-Type", "text/event-stream");
return output.doOnUnsubscribe(
() -> log.info("Unsubscribing RxNetty server connection"))
.flatMap(data -> response.writeAndFlush(new ServerSentEvent(
null,
Unpooled.copiedBuffer("message",
StandardCharsets.UTF_8),
Unpooled.copiedBuffer(JsonUtility.mapToJson(data)+"\n",
StandardCharsets.UTF_8))));
}, serveSseConfigurator());
return httpServer;
}
and
*
* @author Nitesh Kant
*/
public class ServerSentEvent implements ByteBufHolder {
private static final Logger logger = LoggerFactory.getLogger(ServerSentEvent.class);
private static Charset sseEncodingCharset;
static {
try {
sseEncodingCharset = Charset.forName("UTF-8");
} catch (Exception e) {
logger.error("UTF-8 charset not available. Since SSE only contains UTF-8 data, we can not read SSE data.");
sseEncodingCharset = null;
}
}
ROOT cause:
check class ServerSentEvent under io.reactivex:rxnetty:0.4.9
but class ByteBufHolder under 4.1.23.Final
because ByteBufHolder method are changed in version 4.1.23.Final and found the ServerSentEvent under io.reactivex:rxnetty:0.4.9 didn't implement all methods.
How to fix that on pom level:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-turbine-stream</artifactId>
<exclusions>
<exclusion>
<groupId>com.netflix.rxnetty</groupId>
<artifactId>rx-netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.reactivex</groupId>
<artifactId>rxnetty</artifactId>
<version>0.4.20</version>
<scope>runtime</scope>
</dependency>
@spencergibb pls check.
@MadeInChina good job. it works for me. thanks!
Thanks @MadeInChina - it works.
Thank @MadeInChina Lost several days to find out the solution.
Good job.
Most helpful comment
How to fix that on pom level: