Pulsar: Flink pulsar sink NotSerializableException

Created on 13 Jul 2019  ·  12Comments  ·  Source: apache/pulsar


org.apache.pulsar
pulsar-flink
2.4.0

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the RichSinkFunction is not serializable. The object probably contains or references non serializable fields.
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1225)
at wordcount.WordCountProcess.main(WordCountProcess.java:63)
Caused by: java.io.NotSerializableException: org.apache.pulsar.client.impl.DefaultBatcherBuilder
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
`

    stream.addSink(new FlinkPulsarProducer<>(
            SERVICE_URL,
            OUT_TOPIC,
            new AuthenticationDisabled(),
            wordWithCount -> wordWithCount.toString().getBytes(),
            wordWithCount -> wordWithCount.getWord()
    ));
    env.execute("flink window count!");`
componenflink typbug

Most helpful comment

@vruc @idantony thanks for your help

All 12 comments


org.apache.pulsar
pulsar-flink
2.4.0

Are you running the example: examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md ?

which version of Flink and Pulsar are you using? I cannot reproduce your issue. Could you please provide more information about your environment and the execution procedure that I can follow to reproduce?

@yjshen

org.apache.pulsar
pulsar-flink
2.4.0


org.apache.flink
flink-java
1.7.0


org.apache.flink
flink-streaming-java_2.11
1.7.0


org.apache.flink
flink-clients_2.11
1.7.0

example:https://github.com/apache/pulsar/blob/master/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/PulsarConsumerSourceWordCount.java

@cnicy I've tried flink 1.7.0, 1.7.2, 1.8.1 with Pulsar 2.4.0 using docker, none of these could reproduce the problem.

And the current PulsarConsumerSourceWordCount looks like:

        if (null != outputTopic) {
            wc.addSink(new FlinkPulsarProducer<>(
                serviceUrl,
                outputTopic,
                new AuthenticationDisabled(),
                wordWithCount -> wordWithCount.toString().getBytes(UTF_8),
                wordWithCount -> wordWithCount.word
            )).setParallelism(parallelism);
        } else {
            // print the results with a single thread, rather than in parallel
            wc.print().setParallelism(1);
        }

        env.execute("Pulsar Stream WordCount");
<dependencies>
        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-flink</artifactId>
            <version>2.4.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-streaming-java_2.11</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.7.0</version>
        </dependency>
    </dependencies>

public class PulsarConsumerSourceWordCount {
    public static void main(String[] args) throws Exception {
        String serviceUrl = "pulsar://brokerip:6650,brokerip:6660,brokerip:6670";
        String inputTopic = "persistent://my-tenant/my-namespace/pulsar-source";
        String subscription = "flink-source-subscription";
        String outputTopic = "persistent://my-tenant/my-namespace/pulsar-sink";
        int parallelism = 1;

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
        env.enableCheckpointing(5000);
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        PulsarSourceBuilder<String> builder = PulsarSourceBuilder.builder(new SimpleStringSchema())
                .serviceUrl(serviceUrl)
                .topic(inputTopic)
                .subscriptionName(subscription);
        SourceFunction<String> src = builder.build();
        DataStream<String> input = env.addSource(src);

        DataStream<WordWithCount> wc = input
                .flatMap((FlatMapFunction<String, WordWithCount>) (line, collector) -> {
                    for (String word : line.split("\\s")) {
                        collector.collect(new WordWithCount(word, 1));
                    }
                })
                .returns(WordWithCount.class)
                .keyBy("word")
                .timeWindow(Time.seconds(5))
                .reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
                        new WordWithCount(c1.word, c1.count + c2.count));

        if (null != outputTopic) {
            wc.addSink(new FlinkPulsarProducer<>(
                    serviceUrl,
                    outputTopic,
                    new AuthenticationDisabled(),
                    wordWithCount -> wordWithCount.toString().getBytes(UTF_8),
                    wordWithCount -> wordWithCount.word
            )).setParallelism(parallelism);
        } else {
            // print the results with a single thread, rather than in parallel
            wc.print().setParallelism(1);
        }

        env.execute("Pulsar Stream WordCount");
    }

    @AllArgsConstructor
    @NoArgsConstructor
    @ToString
    public static class WordWithCount implements Serializable {
        public String word;
        public long count;
    }
}

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the RichSinkFunction is not serializable. The object probably contains or references non serializable fields.
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:99)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1559)
    at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
    at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1225)
    at wordcount.PulsarConsumerSourceWordCount.main(PulsarConsumerSourceWordCount.java:65)
Caused by: java.io.NotSerializableException: org.apache.pulsar.client.impl.DefaultBatcherBuilder
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:534)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
    ... 4 more

Seems like org.apache.pulsar.client.impl.conf.ProducerConfigurationData throws the exception :

import org.apache.flink.util.InstantiationUtil
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData

val obj = new ProducerConfigurationData
InstantiationUtil.serializeObject(obj)

throws Exception in thread "main" java.io.NotSerializableException: org.apache.pulsar.client.impl.DefaultBatcherBuilder

(flink 1.8.1-2.12, pulsar 2.4.0)

Ended with a custom FlinkPulsarProducer with ProducerConfigurationData replaced with simple topic name.

I run into similar problem.
I follow the https://github.com/apache/pulsar/blob/master/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md and I execute the './bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub' successfully. But when I add the parameter 'output-topic', it will throw exception.
Exception information followed:

org.apache.pulsar.client.impl.DefaultBatcherBuilder@75d2da2d is not serializable. The object probably contains or references non serializable fields.
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1227)
org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount.main(PulsarConsumerSourceWordCount.java:98)

the version:
flink 1.9.0
pulsar 2.4.0
java 1.8

I run into similar problem.
I follow the https://github.com/apache/pulsar/blob/master/examples/flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/example/README.md and I execute the './bin/flink run -c org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount ${PULSAR_HOME}/examples/flink/target/pulsar-flink-examples.jar --service-url pulsar://localhost:6650 --input-topic test_src --subscription test_sub' successfully. But when I add the parameter 'output-topic', it will throw exception.
Exception information followed:

org.apache.pulsar.client.impl.DefaultBatcherBuilder@75d2da2d is not serializable. The object probably contains or references non serializable fields.
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71) org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1574)
org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:185)
org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1227)
org.apache.flink.streaming.connectors.pulsar.example.PulsarConsumerSourceWordCount.main(PulsarConsumerSourceWordCount.java:98)

the version:
flink 1.9.0
pulsar 2.4.0
java 1.8

You can use pulsar-flink 2.3.2 or 2.5.0-SNAPSHOT have a try.

DefaultBatcherBuilder in 2.4.1 didnot extends Serializable.

This is hotfix commit: https://github.com/apache/pulsar/commit/6a67ae094b5ecfa1a4602b8f7baff9a838b44e23#diff-d4876cc56bcbd7b0fe549311203cd213

FYI, the commit fix this issue is this one: https://github.com/apache/pulsar/pull/5068

@vruc @idantony thanks for your help

Does anyone know of a workaround? I'm stuck on this exact issue.

You should be able to reproduce it with this example:

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarProducer;
import org.apache.flink.streaming.connectors.pulsar.PulsarSourceBuilder;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;

import static java.nio.charset.StandardCharsets.UTF_8;

public class StreamingJob {

    public static Tuple2<String,String> mapToTuple(String incomingMessage) throws ParseException {
        JSONObject incomingObj = (JSONObject) new JSONParser().parse(incomingMessage);
        JSONObject correlationIdJson = (JSONObject) incomingObj.get("correlationId");
        String correlationId = "";
        if(correlationIdJson != null){
            correlationId = correlationIdJson.toString();
        } // Put in try/catch to throw exception if correlationIdJson == null
        Tuple2 msgEnvelope = new Tuple2(correlationId, incomingObj.toString());
        return msgEnvelope;
    }
    private static class JsonConcatenator
            implements AggregateFunction<Tuple2<String, String>, Tuple2<String, String>, String> {
        @Override
        public Tuple2<String, String> createAccumulator() {
            return new Tuple2<String, String>("","");
        }

        @Override
        public Tuple2<String, String> add(Tuple2<String, String> value, Tuple2<String, String> accumulator) {
            return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
        }

        @Override
        public String getResult(Tuple2<String, String> accumulator) {
            return "[" + accumulator.f1 + "]";
        }

        @Override
        public Tuple2<String, String> merge(Tuple2<String, String> a, Tuple2<String, String> b) {
            return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
        }
    }

    public static void main(String[] args) throws Exception {
        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        String SERVICE_URL = "pulsar://localhost:6650";
        String INPUT_TOPIC = "persistent://public/default/test-flink-jaeger-spanner-input";
        String SUBSCRIPTION_NAME = "test-jaeger-spanner";
        String OUTPUT_TOPIC = "persistent://public/default/test-flink-jaeger-spanner-output";

        PulsarSourceBuilder<String> builder = PulsarSourceBuilder
                .builder(new SimpleStringSchema())
                .serviceUrl(SERVICE_URL)
                .topic(INPUT_TOPIC)
                .subscriptionName(SUBSCRIPTION_NAME);
        SourceFunction<String> src = builder.build();
        DataStream<String> dataStream = env.addSource(src);

        DataStream<String> combinedEnvelopes = dataStream
                .map(new MapFunction<String, Tuple2<String, String>>() {
                    @Override
                    public Tuple2 map(String incomingMessage) throws Exception {
                        return mapToTuple(incomingMessage);
                    }
                })
                .keyBy(0)
                .window(EventTimeSessionWindows.withGap(Time.seconds(20)))
                .aggregate(new JsonConcatenator())
                .returns(String.class);

        combinedEnvelopes.addSink(new FlinkPulsarProducer<>(
                SERVICE_URL,
                OUTPUT_TOPIC,
                new AuthenticationDisabled(), // probably need to fix //  AuthenticationTls()
                combinedData -> combinedData.toString().getBytes(UTF_8),
                combinedData -> null)
        );

        // execute program
        env.execute("Flink Streaming Java API Skeleton");
    }
}

with this POM file:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>flink-poc</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>Flink Quickstart Job</name>
    <url>http://www.myorganization.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.9.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <pulsar.version>2.4.0</pulsar.version>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/io.jaegertracing/jaeger-client -->
        <dependency>
            <groupId>io.jaegertracing</groupId>
            <artifactId>jaeger-client</artifactId>
            <version>1.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-functions-api</artifactId>
            <version>${pulsar.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-io-core</artifactId>
            <version>${pulsar.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client</artifactId>
            <version>2.4.0</version> <!-- What's the latest stable version???-->
        </dependency>

        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client-admin</artifactId>
            <version>2.4.0</version> 
        </dependency>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.googlecode.json-simple</groupId>
            <artifactId>json-simple</artifactId>
            <version>1.1</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.6</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.pulsar/pulsar-flink -->
        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-flink</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>

            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>

            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.overstock.dataeng.jaeger.spanner.StreamingJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.1.1</version>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                    <executions>
                        <execution>
                            <id>assemble-all</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
                <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-shade-plugin</artifactId>
                                        <versionRange>[3.0.0,)</versionRange>
                                        <goals>
                                            <goal>shade</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <!-- This profile helps to make things run out of the box in IntelliJ -->
    <!-- Its adds Flink's core classes to the runtime class path. -->
    <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
    <profiles>
        <profile>
            <id>add-dependencies-for-IDEA</id>

            <activation>
                <property>
                    <name>idea.version</name>
                </property>
            </activation>

            <dependencies>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-java</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
            </dependencies>
        </profile>
    </profiles>

</project>

This has been fixed by #5068

Was this page helpful?
0 / 5 - 0 ratings