Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the RichSinkFunction is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1225)
at wordcount.WordCountProcess.main(WordCountProcess.java:63)
Caused by: java.io.NotSerializableException: org.apache.pulsar.client.impl.DefaultBatcherBuilder
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
`
stream.addSink(new FlinkPulsarProducer<>(
SERVICE_URL,
OUT_TOPIC,
new AuthenticationDisabled(),
wordWithCount -> wordWithCount.toString().getBytes(),
wordWithCount -> wordWithCount.getWord()
));
env.execute("flink window count!");`
Are you running the example: examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md ?
which version of Flink and Pulsar are you using? I cannot reproduce your issue. Could you please provide more information about your environment and the execution procedure that I can follow to reproduce?
@yjshen
example:https://github.com/apache/pulsar/blob/master/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java
@cnicy I've tried flink 1.7.0, 1.7.2, 1.8.1 with Pulsar 2.4.0 using docker, none of these could reproduce the problem.
And the current PulsarConsumerSourceWordCount looks like:
if (null != outputTopic) {
wc.addSink(new FlinkPulsarProducer<>(
serviceUrl,
outputTopic,
new AuthenticationDisabled(),
wordWithCount -> wordWithCount.toString().getBytes(UTF_8),
wordWithCount -> wordWithCount.word
)).setParallelism(parallelism);
} else {
// print the results with a single thread, rather than in parallel
wc.print().setParallelism(1);
}
env.execute("Pulsar Stream WordCount");
<dependencies>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-flink</artifactId>
<version>2.4.0</version>
<exclusions>
<exclusion>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.7.0</version>
</dependency>
</dependencies>
public class PulsarConsumerSourceWordCount {
public static void main(String[] args) throws Exception {
String serviceUrl = "pulsar://brokerip:6650,brokerip:6660,brokerip:6670";
String inputTopic = "persistent://my-tenant/my-namespace/pulsar-source";
String subscription = "flink-source-subscription";
String outputTopic = "persistent://my-tenant/my-namespace/pulsar-sink";
int parallelism = 1;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.enableCheckpointing(5000);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
PulsarSourceBuilder<String> builder = PulsarSourceBuilder.builder(new SimpleStringSchema())
.serviceUrl(serviceUrl)
.topic(inputTopic)
.subscriptionName(subscription);
SourceFunction<String> src = builder.build();
DataStream<String> input = env.addSource(src);
DataStream<WordWithCount> wc = input
.flatMap((FlatMapFunction<String, WordWithCount>) (line, collector) -> {
for (String word : line.split("\\s")) {
collector.collect(new WordWithCount(word, 1));
}
})
.returns(WordWithCount.class)
.keyBy("word")
.timeWindow(Time.seconds(5))
.reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
new WordWithCount(c1.word, c1.count + c2.count));
if (null != outputTopic) {
wc.addSink(new FlinkPulsarProducer<>(
serviceUrl,
outputTopic,
new AuthenticationDisabled(),
wordWithCount -> wordWithCount.toString().getBytes(UTF_8),
wordWithCount -> wordWithCount.word
)).setParallelism(parallelism);
} else {
// print the results with a single thread, rather than in parallel
wc.print().setParallelism(1);
}
env.execute("Pulsar Stream WordCount");
}
@AllArgsConstructor
@NoArgsConstructor
@ToString
public static class WordWithCount implements Serializable {
public String word;
public long count;
}
}
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the RichSinkFunction is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1225)
at wordcount.PulsarConsumerSourceWordCount.main(PulsarConsumerSourceWordCount.java:65)
Caused by: java.io.NotSerializableException: org.apache.pulsar.client.impl.DefaultBatcherBuilder
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 4 more
Seems like org.apache.pulsar.client.impl.conf.ProducerConfigurationData throws the exception :
import org.apache.flink.util.InstantiationUtil
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData
val obj = new ProducerConfigurationData
InstantiationUtil.serializeObject(obj)
throws Exception in thread "main" java.io.NotSerializableException: org.apache.pulsar.client.impl.DefaultBatcherBuilder
(flink 1.8.1-2.12, pulsar 2.4.0)
Ended with a custom FlinkPulsarProducer with ProducerConfigurationData replaced with simple topic name.
I run into similar problem.
I follow the https://github.com/apache/pulsar/blob/master/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md and I execute the './bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub' successfully. But when I add the parameter 'output-topic', it will throw exception.
Exception information followed:
org.apache.pulsar.client.impl.DefaultBatcherBuilder@75d2da2d is not serializable. The object probably contains or references non serializable fields.
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1227)
org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount.main(PulsarConsumerSourceWordCount.java:98)
the version:
flink 1.9.0
pulsar 2.4.0
java 1.8
I run into similar problem.
I follow the https://github.com/apache/pulsar/blob/master/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md and I execute the './bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub' successfully. But when I add the parameter 'output-topic', it will throw exception.
Exception information followed:org.apache.pulsar.client.impl.DefaultBatcherBuilder@75d2da2d is not serializable. The object probably contains or references non serializable fields.
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1227)
org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount.main(PulsarConsumerSourceWordCount.java:98)the version:
flink 1.9.0
pulsar 2.4.0
java 1.8
You can use pulsar-flink 2.3.2 or 2.5.0-SNAPSHOT have a try.
DefaultBatcherBuilder in 2.4.1 didnot extends Serializable.
This is hotfix commit: https://github.com/apache/pulsar/commit/6a67ae094b5ecfa1a4602b8f7baff9a838b44e23#diff-d4876cc56bcbd7b0fe549311203cd213
FYI, the commit fix this issue is this one: https://github.com/apache/pulsar/pull/5068
@vruc @idantony thanks for your help
Does anyone know of a workaround? I'm stuck on this exact issue.
You should be able to reproduce it with this example:
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarProducer;
import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import static java.nio.charset.StandardCharsets.UTF_8;
public class StreamingJob {
public static Tuple2<String,String> mapToTuple(String incomingMessage) throws ParseException {
JSONObject incomingObj = (JSONObject) new JSONParser().parse(incomingMessage);
JSONObject correlationIdJson = (JSONObject) incomingObj.get("correlationId");
String correlationId = "";
if(correlationIdJson != null){
correlationId = correlationIdJson.toString();
} // Put in try/catch to throw exception if correlationIdJson == null
Tuple2 msgEnvelope = new Tuple2(correlationId, incomingObj.toString());
return msgEnvelope;
}
private static class JsonConcatenator
implements AggregateFunction<Tuple2<String, String>, Tuple2<String, String>, String> {
@Override
public Tuple2<String, String> createAccumulator() {
return new Tuple2<String, String>("","");
}
@Override
public Tuple2<String, String> add(Tuple2<String, String> value, Tuple2<String, String> accumulator) {
return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
}
@Override
public String getResult(Tuple2<String, String> accumulator) {
return "[" + accumulator.f1 + "]";
}
@Override
public Tuple2<String, String> merge(Tuple2<String, String> a, Tuple2<String, String> b) {
return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
}
}
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String SERVICE_URL = "pulsar://localhost:6650";
String INPUT_TOPIC = "persistent://public/default/test-flink-jaeger-spanner-input";
String SUBSCRIPTION_NAME = "test-jaeger-spanner";
String OUTPUT_TOPIC = "persistent://public/default/test-flink-jaeger-spanner-output";
PulsarSourceBuilder<String> builder = PulsarSourceBuilder
.builder(new SimpleStringSchema())
.serviceUrl(SERVICE_URL)
.topic(INPUT_TOPIC)
.subscriptionName(SUBSCRIPTION_NAME);
SourceFunction<String> src = builder.build();
DataStream<String> dataStream = env.addSource(src);
DataStream<String> combinedEnvelopes = dataStream
.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2 map(String incomingMessage) throws Exception {
return mapToTuple(incomingMessage);
}
})
.keyBy(0)
.window(EventTimeSessionWindows.withGap(Time.seconds(20)))
.aggregate(new JsonConcatenator())
.returns(String.class);
combinedEnvelopes.addSink(new FlinkPulsarProducer<>(
SERVICE_URL,
OUTPUT_TOPIC,
new AuthenticationDisabled(), // probably need to fix // AuthenticationTls()
combinedData -> combinedData.toString().getBytes(UTF_8),
combinedData -> null)
);
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
}
with this POM file:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>flink-poc</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Flink Quickstart Job</name>
<url>http://www.myorganization.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.9.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<pulsar.version>2.4.0</pulsar.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencies>
<!-- https://mvnrepository.com/artifact/io.jaegertracing/jaeger-client -->
<dependency>
<groupId>io.jaegertracing</groupId>
<artifactId>jaeger-client</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-api</artifactId>
<version>${pulsar.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-io-core</artifactId>
<version>${pulsar.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>2.4.0</version> <!-- What's the latest stable version???-->
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-admin</artifactId>
<version>2.4.0</version>
</dependency>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.pulsar/pulsar-flink -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-flink</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
<!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.overstock.dataeng.jaeger.spanner.StreamingJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.1.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>assemble-all</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.0.0,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<!-- This profile helps to make things run out of the box in IntelliJ -->
<!-- Its adds Flink's core classes to the runtime class path. -->
<!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
<profiles>
<profile>
<id>add-dependencies-for-IDEA</id>
<activation>
<property>
<name>idea.version</name>
</property>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
This has been fixed by #5068
Most helpful comment
@vruc @idantony thanks for your help