Spring-cloud-netflix: Turbine Stream Server Error - Fatal Exception thrown on Scheduler.Worker thread ("RxComputationScheduler-1")

Created on 4 May 2018  路  9Comments  路  Source: spring-cloud/spring-cloud-netflix

Trying to spin up Turbine Stream Server and Getting below mentioned Exception when hitting default turbine stream server endpoint http://localhost:8989/.

Env:

java version "1.8.0_144"
Spring Cloud Version : Finchley.RC1
Eureka discovery Disabled
Rabbit 3.7.4
command to run rabbit under docker : $ docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
issue reproduction project : demo.zip

Exception :

2018-05-04 16:18:03.720  INFO 3609 --- [o-eventloop-3-1] o.s.c.n.t.s.TurbineStreamConfiguration   : SSE Request Received
2018-05-04 16:18:03.727  INFO 3609 --- [o-eventloop-3-1] o.s.c.n.t.s.TurbineStreamConfiguration   : Starting aggregation
Exception in thread "RxComputationScheduler-1" java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:59)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.AbstractMethodError
    at io.netty.util.ReferenceCountUtil.touch(ReferenceCountUtil.java:77)
    at io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:810)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:704)
    at io.netty.channel.DefaultChannelPipeline.write(DefaultChannelPipeline.java:1056)
    at io.netty.channel.AbstractChannel.write(AbstractChannel.java:290)
    at io.reactivex.netty.channel.DefaultChannelWriter.writeOnChannel(DefaultChannelWriter.java:165)
    at io.reactivex.netty.protocol.http.server.HttpServerResponse.writeOnChannel(HttpServerResponse.java:195)
    at io.reactivex.netty.channel.DefaultChannelWriter.write(DefaultChannelWriter.java:83)
    at io.reactivex.netty.channel.DefaultChannelWriter.writeAndFlush(DefaultChannelWriter.java:65)
    at org.springframework.cloud.netflix.turbine.stream.TurbineStreamConfiguration.lambda$null$6(TurbineStreamConfiguration.java:106)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:69)
    at rx.observers.Subscribers$5.onNext(Subscribers.java:235)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:395)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:355)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:846)
    at rx.internal.operators.OnSubscribeRefCount$2.onNext(OnSubscribeRefCount.java:120)
    at rx.internal.operators.OperatorPublish$PublishSubscriber.dispatch(OperatorPublish.java:585)
    at rx.internal.operators.OperatorPublish$PublishSubscriber.onNext(OperatorPublish.java:283)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
    at rx.internal.operators.OnSubscribeTimerPeriodically$1.call(OnSubscribeTimerPeriodically.java:52)
    at rx.internal.schedulers.SchedulePeriodicHelper$1.call(SchedulePeriodicHelper.java:72)
    at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:189)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    ... 7 more

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
...

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <spring-cloud.version>Finchley.RC1</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-turbine-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    </dependencies>
...
</project>

application.yml

eureka:
  client:
    enabled: false
management:
  endpoints:
    web:
      exposure:
        include: "*"
  endpoint:
    health:
      show-details: always
logging:
  level:
    ROOT: info

App.java

package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.turbine.stream.EnableTurbineStream;

@SpringBootApplication
@EnableTurbineStream
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }
}

Most helpful comment

How to fix that on pom level:

<dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-netflix-turbine-stream</artifactId>
      <exclusions>
        <exclusion>
          <groupId>com.netflix.rxnetty</groupId>
          <artifactId>rx-netty</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>io.reactivex</groupId>
      <artifactId>rxnetty</artifactId>
      <version>0.4.20</version>
      <scope>runtime</scope>
    </dependency>

All 9 comments

Source: I/O error on GET request for "http://localhost:8888/application/default": Connection refused; nested exception is java.net.ConnectException: Connection refused
2018-05-06 15:00:20.637  INFO 46083 --- [o-eventloop-3-1] o.s.c.n.t.s.TurbineStreamConfiguration   : SSE Request Received
2018-05-06 15:00:20.646  INFO 46083 --- [o-eventloop-3-1] o.s.c.n.t.s.TurbineStreamConfiguration   : Starting aggregation
Exception in thread "RxComputationScheduler-1" java.lang.IllegalStateException: Fatal Exception thrown on Scheduler.Worker thread.
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:59)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.AbstractMethodError
    at io.netty.util.ReferenceCountUtil.touch(ReferenceCountUtil.java:77)
    at io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:810)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
    at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:704)
    at io.netty.channel.DefaultChannelPipeline.write(DefaultChannelPipeline.java:1056)
    at io.netty.channel.AbstractChannel.write(AbstractChannel.java:290)
    at io.reactivex.netty.channel.DefaultChannelWriter.writeOnChannel(DefaultChannelWriter.java:165)
    at io.reactivex.netty.protocol.http.server.HttpServerResponse.writeOnChannel(HttpServerResponse.java:195)
    at io.reactivex.netty.channel.DefaultChannelWriter.write(DefaultChannelWriter.java:83)
    at io.reactivex.netty.channel.DefaultChannelWriter.writeAndFlush(DefaultChannelWriter.java:65)
    at org.springframework.cloud.netflix.turbine.stream.TurbineStreamConfiguration.lambda$null$6(TurbineStreamConfiguration.java:106)

Seems that jar conflict.

MacBook-Pro:turbine hanrenwei$ mvn dependency:tree|grep 0.4.9
[INFO] |  |  |  |  +- io.reactivex:rxnetty-contexts:jar:0.4.9:runtime
[INFO] |  |  |  |  \- io.reactivex:rxnetty-servo:jar:0.4.9:runtime
[INFO] |  |  |  \- io.reactivex:rxnetty:jar:0.4.9:runtime
MacBook-Pro:turbine hanrenwei$ mvn dependency:tree|grep 0.3.18
[INFO] |  |  +- com.netflix.rxnetty:rx-netty:jar:0.3.18:compile


jar confilct
+- org.springframework.cloud:spring-cloud-starter-netflix-turbine-stream:jar:2.0.0.RC1:compile
|  +- com.netflix.turbine:turbine-core:jar:2.0.0-DP.2:compile
[INFO] |  |  +- net.sf.jopt-simple:jopt-simple:jar:4.8:compile
[INFO] |  |  +- com.netflix.rxnetty:rx-netty:jar:0.3.18:compile
[INFO] |  |  |  +- io.netty:netty-codec-http:jar:4.1.23.Final:compile
[INFO] |  |  |  |  \- io.netty:netty-codec:jar:4.1.23.Final:compile
[INFO] |  |  |  \- io.netty:netty-transport-native-epoll:jar:4.1.23.Final:compile
[INFO] |  |  |     +- io.netty:netty-common:jar:4.1.23.Final:compile
[INFO] |  |  |     +- io.netty:netty-buffer:jar:4.1.23.Final:compile
[INFO] |  |  |     +- io.netty:netty-transport-native-unix-common:jar:4.1.23.Final:compile
[INFO] |  |  |     \- io.netty:netty-transport:jar:4.1.23.Final:compile
[INFO] |  |  |        \- io.netty:netty-resolver:jar:4.1.23.Final:compile
[INFO] |  |  +- org.codehaus.jackson:jackson-mapper-asl:jar:1.9.2:compile
[INFO] |  |  \- org.codehaus.jackson:jackson-core-asl:jar:1.9.2:compile


[INFO] +- org.springframework.cloud:spring-cloud-starter-netflix-eureka-client:jar:2.0.0.RC1:compile
[INFO] |  +- org.springframework.cloud:spring-cloud-starter-netflix-ribbon:jar:2.0.0.RC1:compile
[INFO] |  |  +- com.netflix.ribbon:ribbon:jar:2.2.5:compile
[INFO] |  |  |  +- com.netflix.ribbon:ribbon-transport:jar:2.2.5:runtime
[INFO] |  |  |  |  +- io.reactivex:rxnetty-contexts:jar:0.4.9:runtime
[INFO] |  |  |  |  \- io.reactivex:rxnetty-servo:jar:0.4.9:runtime
[INFO] |  |  |  \- io.reactivex:rxnetty:jar:0.4.9:runtime
  <dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-netflix-turbine-stream</artifactId>
      <exclusions>
        <exclusion>
          <groupId>io.reactivex</groupId>
          <artifactId>rxnetty</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

got

018-05-06 15:24:54.521  INFO 47080 --- [on(4)-127.0.0.1] c.c.c.ConfigServicePropertySourceLocator : Fetching config from server at: http://localhost:8888
2018-05-06 15:24:54.524  WARN 47080 --- [on(4)-127.0.0.1] c.c.c.ConfigServicePropertySourceLocator : Could not locate PropertySource: I/O error on GET request for "http://localhost:8888/application/default": Connection refused; nested exception is java.net.ConnectException: Connection refused
2018-05-06 15:24:57.634  WARN 47080 --- [o-eventloop-3-1] i.n.c.AbstractChannelHandlerContext      : An exception '{}' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception:

java.lang.NoSuchMethodError: rx.internal.operators.NotificationLite.instance()Lrx/internal/operators/NotificationLite;
    at io.reactivex.netty.protocol.http.UnicastContentSubject$State$BufferedObserver.<init>(UnicastContentSubject.java:243) ~[rx-netty-0.3.18.jar:na]
    at io.reactivex.netty.protocol.http.UnicastContentSubject$State$BufferedObserver.<init>(UnicastContentSubject.java:241) ~[rx-netty-0.3.18.jar:na]
    at io.reactivex.netty.protocol.http.UnicastContentSubject$State.<init>(UnicastContentSubject.java:197) ~[rx-netty-0.3.18.jar:na]
    at io.reactivex.netty.protocol.http.UnicastContentSubject.create(UnicastContentSubject.java:132) ~[rx-netty-0.3.18.jar:na]
    at io.reactivex.netty.protocol.http.UnicastContentSubject.create(UnicastContentSubject.java:122) ~[rx-netty-0.3.18.jar:na]
    at io.reactivex.netty.protocol.http.UnicastContentSubject.create(UnicastContentSubject.java:117) ~[rx-netty-0.3.18.jar:na]
    at io.reactivex.netty.protocol.http.server.ServerRequestResponseC

Here is the issue which i found

mvn dependency:tree|grep netty
[INFO] |  |  |  |  |  +- io.reactivex:rxnetty-contexts:jar:0.4.9:runtime
[INFO] |  |  |  |  |  \- io.reactivex:rxnetty-servo:jar:0.4.9:runtime
[INFO] +- io.reactivex:rxnetty:jar:0.4.20:runtime
[INFO] |  +- io.netty:netty-codec-http:jar:4.1.23.Final:compile
[INFO] |  |  \- io.netty:netty-codec:jar:4.1.23.Final:compile
[INFO] |  +- io.netty:netty-handler:jar:4.1.23.Final:compile
[INFO] |  |  +- io.netty:netty-buffer:jar:4.1.23.Final:compile
[INFO] |  |  \- io.netty:netty-transport:jar:4.1.23.Final:compile
[INFO] |  |     \- io.netty:netty-resolver:jar:4.1.23.Final:compile
[INFO] |  +- io.netty:netty-transport-native-epoll:jar:4.1.23.Final:compile
[INFO] |  |  +- io.netty:netty-common:jar:4.1.23.Final:compile
[INFO] |  |  \- io.netty:netty-transport-native-unix-common:jar:4.1.23.Final:compile
[INFO] |     |        \- io.projectreactor.ipc:reactor-netty:jar:0.7.6.RELEASE:compile
[INFO] |     |           \- io.netty:netty-handler-proxy:jar:4.1.23.Final:compile
[INFO] |     |              \- io.netty:netty-codec-socks:jar:4.1.23.Final:compile

netty are using 4.1.23.Final
io.reactivex:rxnetty are using 0.4.9

exception in

    at org.springframework.cloud.netflix.turbine.stream.TurbineStreamConfiguration.lambda$null$6(TurbineStreamConfiguration.java:106)

go to code to check

/**
 * @author Spencer Gibb
 * @author Daniel Lavoie
 */
@Configuration
@EnableConfigurationProperties(TurbineStreamProperties.class)
public class TurbineStreamConfiguration implements SmartLifecycle {

    private static final Log log = LogFactory.getLog(TurbineStreamConfiguration.class);

    private AtomicBoolean running = new AtomicBoolean(false);

    @Autowired
    private TurbineStreamProperties properties;

    private int turbinePort;

    @Bean
    public HasFeatures Feature() {
        return HasFeatures.namedFeature("Turbine (Stream)",
                TurbineStreamProperties.class);
    }

    @Bean
    public PublishSubject<Map<String, Object>> hystrixSubject() {
        return PublishSubject.create();
    }

    @Bean
    @SuppressWarnings("deprecation")
    public HttpServer<ByteBuf, ServerSentEvent> aggregatorServer() {
        // multicast so multiple concurrent subscribers get the same stream
        Observable<Map<String, Object>> publishedStreams = StreamAggregator
                .aggregateGroupedStreams(hystrixSubject().groupBy(
                        data -> InstanceKey.create((String) data.get("instanceId"))))
                .doOnUnsubscribe(() -> log.info("Unsubscribing aggregation."))
                .doOnSubscribe(() -> log.info("Starting aggregation")).flatMap(o -> o)
                .publish().refCount();
        Observable<Map<String, Object>> ping = Observable.timer(1, 10, TimeUnit.SECONDS)
                .map(count -> Collections.singletonMap("type", (Object) "Ping")).publish()
                .refCount();
        Observable<Map<String, Object>> output = Observable.merge(publishedStreams, ping);

        this.turbinePort = this.properties.getPort();

        if (this.turbinePort <= 0) {
            this.turbinePort = SocketUtils.findAvailableTcpPort(40000);
        }

        HttpServer<ByteBuf, ServerSentEvent> httpServer = RxNetty
                .createHttpServer(this.turbinePort, (request, response) -> {
                    log.info("SSE Request Received");
                    response.getHeaders().setHeader("Content-Type", "text/event-stream");
                    return output.doOnUnsubscribe(
                            () -> log.info("Unsubscribing RxNetty server connection"))
                            .flatMap(data -> response.writeAndFlush(new ServerSentEvent(
                                    null,
                                    Unpooled.copiedBuffer("message",
                                            StandardCharsets.UTF_8),
                                    Unpooled.copiedBuffer(JsonUtility.mapToJson(data)+"\n",
                                            StandardCharsets.UTF_8))));
                }, serveSseConfigurator());
        return httpServer;
    }

and

 *
 * @author Nitesh Kant
 */
public class ServerSentEvent implements ByteBufHolder {

    private static final Logger logger = LoggerFactory.getLogger(ServerSentEvent.class);

    private static Charset sseEncodingCharset;

    static {
        try {
            sseEncodingCharset = Charset.forName("UTF-8");
        } catch (Exception e) {
            logger.error("UTF-8 charset not available. Since SSE only contains UTF-8 data, we can not read SSE data.");
            sseEncodingCharset = null;
        }
    }

ROOT cause:
check class ServerSentEvent under io.reactivex:rxnetty:0.4.9
but class ByteBufHolder under 4.1.23.Final
because ByteBufHolder method are changed in version 4.1.23.Final and found the ServerSentEvent under io.reactivex:rxnetty:0.4.9 didn't implement all methods.

How to fix that on pom level:

<dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-netflix-turbine-stream</artifactId>
      <exclusions>
        <exclusion>
          <groupId>com.netflix.rxnetty</groupId>
          <artifactId>rx-netty</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>io.reactivex</groupId>
      <artifactId>rxnetty</artifactId>
      <version>0.4.20</version>
      <scope>runtime</scope>
    </dependency>

@spencergibb pls check.

@MadeInChina good job. it works for me. thanks!

Thanks @MadeInChina - it works.

Thank @MadeInChina Lost several days to find out the solution.

Good job.

Was this page helpful?
0 / 5 - 0 ratings