Pulsar: Function localrun quits when using partitioned topic.

Created on 7 Aug 2018  路  6Comments  路  Source: apache/pulsar

First i write a demo package by gradle

plugins {
    id 'java'
    id "com.github.johnrengelman.shadow" version "2.0.4"
}
package com.codelipenghui.pulsar.function;

import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;

public class WordCountFunction implements Function<String, Void> {

    @Override
    public Void process(String input, Context context) throws Exception {
        context.incrCounter("test-case-total", 1);
        return null;
    }
}

I got info :

[root@qa-5-171 apache-pulsar-2.2.0-incubating-SNAPSHOT]# bin/pulsar-admin functions localrun --jar target/pulsar-functions-0.1.0-SNAPSHOT-jar-with-dependencies.jar --className pulsarfunctions.starter.sdk.WordCountFunction --tenant public --namespace default --name word-count-test --inputs persistent://public/default/plat.correctness.verification --output persistent://public/default/word-count-test-output
19:58:34.946 [main] INFO  org.apache.pulsar.functions.runtime.ProcessRuntimeFactory - Java instance jar location is not defined, using the location defined in system environment : /opt/apache-pulsar-2.2.0-incubating-SNAPSHOT/instances/java-instance.jar
19:58:34.954 [main] INFO  org.apache.pulsar.functions.runtime.ProcessRuntimeFactory - Python instance file location is not defined using the location defined in system environment : /opt/apache-pulsar-2.2.0-incubating-SNAPSHOT/instances/python-instance/python_instance_main.py
19:58:34.974 [main] INFO  org.apache.pulsar.functions.runtime.RuntimeSpawner - RuntimeSpawner starting function word-count-test - 0
19:58:34.987 [main] INFO  org.apache.pulsar.functions.runtime.ProcessRuntime - ProcessBuilder starting the process with args java -cp /opt/apache-pulsar-2.2.0-incubating-SNAPSHOT/instances/java-instance.jar -Dpulsar.functions.java.instance.jar=/opt/apache-pulsar-2.2.0-incubating-SNAPSHOT/instances/java-instance.jar -Dlog4j.configurationFile=java_instance_log4j2.yml -Dpulsar.log.dir=/opt/apache-pulsar-2.2.0-incubating-SNAPSHOT/logs/functions -Dpulsar.log.file=word-count-test org.apache.pulsar.functions.runtime.JavaInstanceMain --jar target/pulsar-functions-0.1.0-SNAPSHOT-jar-with-dependencies.jar --instance_id 0 --function_id ac8f5862-34cd-4f0e-869e-39f5717a99a2 --function_version 1498f7f5-5483-42c2-b2ac-c07c5c1626ce --tenant public --namespace default --name word-count-test --function_classname pulsarfunctions.starter.sdk.WordCountFunction --auto_ack true --processing_guarantees ATLEAST_ONCE --pulsar_serviceurl http://localhost:8080/ --use_tls false --tls_allow_insecure false --hostname_verification_enabled false --max_buffered_tuples 1024 --port 16821 --source_type_classname "java.lang.String" --source_subscription_type SHARED --source_topics_serde_classname {"persistent://public/default/plat.correctness.verification":""} --sink_type_classname "java.lang.Void" --sink_topic persistent://public/default/word-count-test-output
19:58:34.995 [main] INFO  org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully
19:58:41.297 [main] INFO  org.apache.pulsar.admin.cli.CmdFunctions - RuntimeSpawner quit because of
19:58:41.299 [Thread-3] INFO  org.apache.pulsar.admin.cli.CmdFunctions - Shutting down the localrun runtimeSpawner ...

And then i use https://github.com/streamlio/pulsar-functions-java-starter.git

I got the same.

componenfunctions typbug

Most helpful comment

Topic of plat.correctness.verification is a partitioned topic with six partitions.
When i set input to plat.correctness.verification local run will quit.
When i set input to plat.correctness.verification-partition-0 local run can process success.

  • Local run success with command below
bin/pulsar-admin functions localrun --jar target/function-starter-1.0.0-all.jar --className com.codelipenghui.pulsar.function.WordCountFunction --inputs persistent://public/default/plat.correctness.verification-partition-0 --output persistent://public/default/plat.correctness.verification.wordcount.output --name plat-correctness-verification-wordcount
  • Local run quit with command below
bin/pulsar-admin functions localrun --jar target/function-starter-1.0.0-all.jar --className com.codelipenghui.pulsar.function.WordCountFunction --inputs persistent://public/default/plat.correctness.verification --output persistent://public/default/plat.correctness.verification.wordcount.output --name plat-correctness-verification-wordcount

Log for quit

[root@qa-5-170 /opt/apache-pulsar-2.2.0-incubating-SNAPSHOT]#  bin/pulsar-admin functions localrun --jar target/function-starter-1.0.0-all.jar --className com.codelipenghui.pulsar.function.WordCountFunction --inputs persistent://public/default/plat.correctness.verification --output persistent://public/default/plat.correctness.verification.wordcount.output --name plat-correctness-verification-wordcount
14:20:24.335 [main] INFO  org.apache.pulsar.functions.runtime.ProcessRuntimeFactory - Java instance jar location is not defined, using the location defined in system environment : /opt/apache-pulsar-2.2.0-incubating-SNAPSHOT/instances/java-instance.jar
14:20:24.345 [main] INFO  org.apache.pulsar.functions.runtime.ProcessRuntimeFactory - Python instance file location is not defined using the location defined in system environment : /opt/apache-pulsar-2.2.0-incubating-SNAPSHOT/instances/python-instance/python_instance_main.py
14:20:24.367 [main] INFO  org.apache.pulsar.functions.runtime.RuntimeSpawner - RuntimeSpawner starting function plat-correctness-verification-wordcount - 0
14:20:24.383 [main] INFO  org.apache.pulsar.functions.runtime.ProcessRuntime - ProcessBuilder starting the process with args java -cp /opt/apache-pulsar-2.2.0-incubating-SNAPSHOT/instances/java-instance.jar -Dpulsar.functions.java.instance.jar=/opt/apache-pulsar-2.2.0-incubating-SNAPSHOT/instances/java-instance.jar -Dlog4j.configurationFile=java_instance_log4j2.yml -Dpulsar.log.dir=/opt/apache-pulsar-2.2.0-incubating-SNAPSHOT/logs/functions -Dpulsar.log.file=plat-correctness-verification-wordcount org.apache.pulsar.functions.runtime.JavaInstanceMain --jar target/function-starter-1.0.0-all.jar --instance_id 0 --function_id 0b81a970-2cf7-43ad-9b88-17ee169cab5b --function_version 5795d1da-3a02-43e5-a004-c4689e678688 --tenant public --namespace default --name plat-correctness-verification-wordcount --function_classname com.codelipenghui.pulsar.function.WordCountFunction --auto_ack true --processing_guarantees ATLEAST_ONCE --pulsar_serviceurl http://localhost:8080/ --use_tls false --tls_allow_insecure false --hostname_verification_enabled false --max_buffered_tuples 1024 --port 38107 --source_type_classname "java.lang.String" --source_subscription_type SHARED --source_topics_serde_classname {"persistent://public/default/plat.correctness.verification":""} --sink_type_classname "java.lang.String" --sink_topic persistent://public/default/plat.correctness.verification.wordcount.output
14:20:24.397 [main] INFO  org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully
14:20:34.250 [main] INFO  org.apache.pulsar.admin.cli.CmdFunctions - RuntimeSpawner quit because of
14:20:34.252 [Thread-3] INFO  org.apache.pulsar.admin.cli.CmdFunctions - Shutting down the localrun runtimeSpawner ...

All 6 comments

Hi,
Just tried it from pulsar master(your logs indicated that you were on 2.2.0-incubating) and was able to start the function. Which exact pulsar branch/head that you saw this error?

master branch on monday.
ok i will retry in master branch today.

I think the problem is state storage is not enabled in cluster mode by default. I am updating the documentation in #2335 . I will verify local run and update here.

Topic of plat.correctness.verification is a partitioned topic with six partitions.
When i set input to plat.correctness.verification local run will quit.
When i set input to plat.correctness.verification-partition-0 local run can process success.

  • Local run success with command below
bin/pulsar-admin functions localrun --jar target/function-starter-1.0.0-all.jar --className com.codelipenghui.pulsar.function.WordCountFunction --inputs persistent://public/default/plat.correctness.verification-partition-0 --output persistent://public/default/plat.correctness.verification.wordcount.output --name plat-correctness-verification-wordcount
  • Local run quit with command below
bin/pulsar-admin functions localrun --jar target/function-starter-1.0.0-all.jar --className com.codelipenghui.pulsar.function.WordCountFunction --inputs persistent://public/default/plat.correctness.verification --output persistent://public/default/plat.correctness.verification.wordcount.output --name plat-correctness-verification-wordcount

Log for quit

[root@qa-5-170 /opt/apache-pulsar-2.2.0-incubating-SNAPSHOT]#  bin/pulsar-admin functions localrun --jar target/function-starter-1.0.0-all.jar --className com.codelipenghui.pulsar.function.WordCountFunction --inputs persistent://public/default/plat.correctness.verification --output persistent://public/default/plat.correctness.verification.wordcount.output --name plat-correctness-verification-wordcount
14:20:24.335 [main] INFO  org.apache.pulsar.functions.runtime.ProcessRuntimeFactory - Java instance jar location is not defined, using the location defined in system environment : /opt/apache-pulsar-2.2.0-incubating-SNAPSHOT/instances/java-instance.jar
14:20:24.345 [main] INFO  org.apache.pulsar.functions.runtime.ProcessRuntimeFactory - Python instance file location is not defined using the location defined in system environment : /opt/apache-pulsar-2.2.0-incubating-SNAPSHOT/instances/python-instance/python_instance_main.py
14:20:24.367 [main] INFO  org.apache.pulsar.functions.runtime.RuntimeSpawner - RuntimeSpawner starting function plat-correctness-verification-wordcount - 0
14:20:24.383 [main] INFO  org.apache.pulsar.functions.runtime.ProcessRuntime - ProcessBuilder starting the process with args java -cp /opt/apache-pulsar-2.2.0-incubating-SNAPSHOT/instances/java-instance.jar -Dpulsar.functions.java.instance.jar=/opt/apache-pulsar-2.2.0-incubating-SNAPSHOT/instances/java-instance.jar -Dlog4j.configurationFile=java_instance_log4j2.yml -Dpulsar.log.dir=/opt/apache-pulsar-2.2.0-incubating-SNAPSHOT/logs/functions -Dpulsar.log.file=plat-correctness-verification-wordcount org.apache.pulsar.functions.runtime.JavaInstanceMain --jar target/function-starter-1.0.0-all.jar --instance_id 0 --function_id 0b81a970-2cf7-43ad-9b88-17ee169cab5b --function_version 5795d1da-3a02-43e5-a004-c4689e678688 --tenant public --namespace default --name plat-correctness-verification-wordcount --function_classname com.codelipenghui.pulsar.function.WordCountFunction --auto_ack true --processing_guarantees ATLEAST_ONCE --pulsar_serviceurl http://localhost:8080/ --use_tls false --tls_allow_insecure false --hostname_verification_enabled false --max_buffered_tuples 1024 --port 38107 --source_type_classname "java.lang.String" --source_subscription_type SHARED --source_topics_serde_classname {"persistent://public/default/plat.correctness.verification":""} --sink_type_classname "java.lang.String" --sink_topic persistent://public/default/plat.correctness.verification.wordcount.output
14:20:24.397 [main] INFO  org.apache.pulsar.functions.runtime.ProcessRuntime - Started process successfully
14:20:34.250 [main] INFO  org.apache.pulsar.admin.cli.CmdFunctions - RuntimeSpawner quit because of
14:20:34.252 [Thread-3] INFO  org.apache.pulsar.admin.cli.CmdFunctions - Shutting down the localrun runtimeSpawner ...

I had a discussion with @codelipenghui .

the original question was around stateful function. stateful function was shipped as a preview feature in 2.1. however it is not fully integrated in cluster mode. so I will be working on that piece in #2335 .

However there is a bigger problem around partitioned-topic. It seems there is some regression in partitioned topic in 2.1, which pulsar functions can't run with partitioned-topic. because we switched to using multi-topic subscription in 2.1, so it will return a partition name instead of topic name, which cause function runtime confused on finding a suitable serde to deserialize the message and it throws RuntimeException and quits.

I marked this issue as bug and updated the issue title to be more specific. this fix should also be cherry-picked to 2.1.1 release as well.

The underlying bug has been fixed at #2346. The fix is included at 2.1.1 and will be included at 2.2.0 release..

Was this page helpful?
0 / 5 - 0 ratings