Pulsar: When running Pulsar in Kubernetes, and utilizing Kubernetes for function runtime, Statefulsets utilize Pulsar Broker pod IP Address for starting functions

Created on 5 May 2020  路  5Comments  路  Source: apache/pulsar

Describe the bug

We are running Pulsar in Kubernetes. We also utilize Kubernetes as Functions Runtime. When we start a function, a Statefulset gets created. The startup command from the Statefulset point to the Broker's pod IP Address as part of the --admin-url and --pulsar_serviceurl parameters.

If the Broker pods were to restart, existing functions will not work forcing us to delete the function and re-create the function.

We are using Pulsar Proxy in our environment. As part of our Broker deployment, we configure PF_pulsarServiceUrl, PF_pulsarWebServiceUrl to our Proxy end points. However, the Functions Runtime is overwritten to utilize Broker's pod IP Address.

To Reproduce
Steps to reproduce the behavior:

  1. Deploy Pulsar in Kubernetes. Utilize Kubernetes as Functions Runtime.
    set PF_pulsarServiceUrl, PF_pulsarWebServiceUrl to Pulsar Proxy service end points.
    e.g.
    PF_pulsarServiceUrl: "pulsar+ssl://pulsar-proxy:6651"
    PF_pulsarWebServiceUrl: "https://pulsar-proxy:8443"
  2. Create a function that is made available out of the box such as org.apache.pulsar.functions.api.examples.ExclamationFunction
  3. Test the function to be working correctly.
  4. Restart Pulsar Broker pods
  5. Notice that the function does not work anymore.

Expected behavior

In the scenario when we deploy Pulsar in Kubernetes and use Kubernetes as Pulsar Functions Runtime, functions should be able to withstand Broker restarts.

Screenshots

statefulset_startup_script

Desktop (please complete the following information):

  • OS: [e.g. iOS]

Additional context
Add any other context about the problem here.

componendocumentation componenk8s triagweek-19 typquestion

Most helpful comment

@sijie

I tested out these settings just now. By setting PF_functionRuntimeFactoryConfigs_pulsarAdminUrl and PF_functionRuntimeFactoryConfigs_pulsarServiceUrl parameters to my proxy URLs, I am able to withstand broker restarts.

I also looked at the code just now. I have looked at this code in the past and it has been an oversight from my end. We do set the other Kubernetes Function Runtime Factory Configurations. However, I totally forgot about these two.

Thank you for these pointers. We can close this issue.

All 5 comments

I had a chance to look at the broker logs once again after restarting all broker per my instructions above.

It does look like there is an attempt to re-create the function with the correct Broker pod IP Address. However, since a Statefulset with the same name exists, creation of the new Statefulset fails with a WARN message.

17:37:01.420 [main] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/functions/assignments][reader-6eff432ebd] Get topic last message Id
17:37:01.430 [pulsar-client-io-51-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/functions/assignments][reader-6eff432ebd] Successfully getLastMessageId 82:0
17:37:01.431 [main] INFO org.apache.pulsar.functions.worker.FunctionAssignmentTailer - Received assignment update: instance {
functionMetaData {
functionDetails {
tenant: "public"
namespace: "default"
name: "first-function"
className: "org.apache.pulsar.functions.api.examples.ExclamationFunction"
autoAck: true
parallelism: 1
source {
typeClassName: "java.lang.String"
inputSpecs {
key: "persistent://public/default/topicAIn"
value {
}
}
cleanupSubscription: true
}
sink {
topic: "persistent://public/default/topicAOut"
typeClassName: "java.lang.String"
}
resources {
cpu: 1.0
ram: 1073741824
disk: 10737418240
}
componentType: FUNCTION
customRuntimeOptions: "{\"cosoDataVolumeClaim\":\"helm-data-vol\",\"cosoDataVolume\":\"coso-functions\"}"
}
packageLocation {
packagePath: "public/default/first-function/9f17b0ae-bf92-47ae-aa9a-fad94cf6dc45-api-examples.jar"
originalFileName: "api-examples.jar"
}
createTime: 1588699700863
}
instanceId: -1
}
workerId: "c-itomdipulsar-fw-172.16.17.183-8080"

17:37:01.664 [main] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/functions/assignments][reader-6eff432ebd] Get topic last message Id
17:37:01.676 [pulsar-client-io-51-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/functions/assignments][reader-6eff432ebd] Successfully getLastMessageId 82:0
17:37:01.683 [main] INFO org.apache.pulsar.functions.worker.FunctionRuntimeManager - infos: {}
17:37:01.684 [main] INFO org.apache.pulsar.functions.worker.FunctionActioner - public/default/first-function--1 Starting function ...
17:37:01.693 [main] INFO org.apache.pulsar.functions.runtime.RuntimeSpawner - public/default/first-function--1 RuntimeSpawner starting function
17:37:01.776 [main] INFO org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntime - Submitting the following service to k8 {"metadata":{"labels":{"component":"function","namespace":"default","name":"first-function","tenant":"public"},"name":"pf-public-default-first-function","namespace":"sandeep-sandbox"},"spec":{"clusterIP":"None","ports":[{"name":"grpc","port":9093,"protocol":"TCP"}],"selector":{"component":"function","namespace":"default","name":"first-function","tenant":"public"}}}
17:37:02.041 [main] WARN org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntime - Service already present for function public/default/first-function
17:37:02.055 [main] INFO org.apache.pulsar.functions.utils.Actions - Sucessfully completed action [ Submitting service for function public/default/first-function ]
17:37:02.196 [main] INFO org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntime - Submitting the following spec to k8 {"metadata":{"labels":{"app":"itomdipulsar","component":"function","app.kubernetes.io/managed-by":"pulsar","app.kubernetes.io/name":"itomdipulsar","release":"pulsar","namespace":"default","name":"first-function","heritage":"Helm","tire.backend":"backend","capability":"pulsar","tenant":"public","app.kubernetes.io/version":"2.5.1-97"},"name":"pf-public-default-first-function","namespace":"sandeep-sandbox"},"spec":{"podManagementPolicy":"Parallel","replicas":1,"selector":{"matchLabels":{"component":"function","namespace":"default","name":"first-function","tenant":"public"}},"serviceName":"pf-public-default-first-function","template":{"metadata":{"annotations":{"pod.boostport.com/vault-approle":"sandeep-sandbox-default","prometheus.io/port":"9094","pod.boostport.com/vault-init-container":"generate-certificates","prometheus.io/scrape":"true"},"labels":{"app":"itomdipulsar","component":"function","app.kubernetes.io/managed-by":"pulsar","app.kubernetes.io/name":"itomdipulsar","namespace":"default","name":"first-function","tenant":"public"}},"spec":{"containers":[{"image":"localhost:5000/hpeswitom/kubernetes-vault-renew:0.9.0-002","imagePullPolicy":"IfNotPresent","name":"certificate-renew","resources":{},"terminationMessagePath":"/dev/termination-log","terminationMessagePolicy":"File","volumeMounts":[{"mountPath":"/var/run/secrets/boostport.com","name":"vault-token"}]},{"command":["sh","-c","mkdir /pulsar/conf \u0026\u0026 cp -R /pulsar/stage-conf/* /pulsar/conf/ \u0026\u0026 source /pulsar/bin/mcf-pulsar-init.sh \u0026\u0026 /pulsar/bin/apply-config-from-env.py /pulsar/conf/client.conf \u0026\u0026 /pulsar/bin/pulsar-admin --admin-url https://172.16.17.183:8443 functions download --tenant public --namespace default --name first-function --destination-file /pulsar/api-examples.jar \u0026\u0026 SHARD_ID\u003d${POD_NAME##-} \u0026\u0026 echo shardId\u003d${SHARD_ID} \u0026\u0026 exec java -cp /pulsar/instances/java-instance.jar:/pulsar/instances/deps/ -Dpulsar.functions.extra.dependencies.dir\u003d/pulsar/instances/deps -Dpulsar.functions.instance.classpath\u003d/pulsar/conf:::/pulsar/lib/*: -Dlog4j.configurationFile\u003dkubernetes_instance_log4j2.xml -Dpulsar.function.log.dir\u003dlogs/functions/public/default/first-function -Dpulsar.function.log.file\u003dfirst-function-$SHARD_ID -Xmx1073741824 org.apache.pulsar.functions.instance.JavaInstanceMain --jar /pulsar/api-examples.jar --instance_id $SHARD_ID --function_id 02eef5d3-7da5-4e08-badc-7d4da40f3cd0 --function_version b4df2772-328c-413c-9874-40d5416b44de --function_details \u0027{\"tenant\":\"public\",\"namespace\":\"default\",\"name\":\"first-function\",\"className\":\"org.apache.pulsar.functions.api.examples.ExclamationFunction\",\"autoAck\":true,\"parallelism\":1,\"source\":{\"typeClassName\":\"java.lang.String\",\"inputSpecs\":{\"persistent://public/default/topicAIn\":{}},\"cleanupSubscription\":true},\"sink\":{\"topic\":\"persistent://public/default/topicAOut\",\"typeClassName\":\"java.lang.String\"},\"resources\":{\"cpu\":1.0,\"ram\":\"1073741824\",\"disk\":\"10737418240\"},\"componentType\":\"FUNCTION\",\"customRuntimeOptions\":\"{\\"cosoDataVolumeClaim\\":\\"helm-data-vol\\",\\"cosoDataVolume\\":\\"coso-functions\\"}\"}\u0027 --pulsar_serviceurl pulsar+ssl://172.16.17.183:6651 --client_auth_plugin org.apache.pulsar.client.impl.auth.AuthenticationTls --client_auth_params tlsCertFile:/var/run/secrets/boostport.com/server.crt,tlsKeyFile:/var/run/secrets/boostport.com/server.key --use_tls true --tls_allow_insecure false --hostname_verification_enabled false --tls_trust_cert_path /var/run/secrets/boostport.com/trustedCAs/RID_ca.crt --max_buffered_tuples 1024 --port 9093 --metrics_port 9094 --state_storage_serviceurl bk://itomdipulsar-bookkeeper:4181 --expected_healthcheck_interval -1 --secrets_provider org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider --cluster_name itomdipulsar"],"env":[{"name":"POD_NAME","valueFrom":{"fieldRef":{"fieldPath":"metadata.name"}}}],"envFrom":[{"configMapRef":{"name":"itomdipulsar-broker"}}],"image":"localhost:5000/hpeswitom/itom-pulsar-core:2.5.1-97","imagePullPolicy":"Always","name":"pulsarfunction","ports":[{"containerPort":9093,"name":"grpc"}],"resources":{"limits":{"memory":"1181116006","cpu":"1"},"requests":{"memory":"1181116006","cpu":"1"}},"volumeMounts":[{"mountPath":"/var/run/secrets/boostport.com","name":"vault-token"},{"mountPath":"first-function","name":"cosodata","subPath":"coso-functions/public/default"}]}],"dnsPolicy":"ClusterFirst","initContainers":[{"env":[{"name":"CERT_COMMON_NAME","value":"itomdipulsar-broker"}],"image":"localhost:5000/hpeswitom/kubernetes-vault-init:0.9.0-002","imagePullPolicy":"IfNotPresent","name":"generate-certificates","resources":{},"terminationMessagePath":"/dev/termination-log","terminationMessagePolicy":"File","volumeMounts":[{"mountPath":"/var/run/secrets/boostport.com","name":"vault-token"}]}],"terminationGracePeriodSeconds":0,"tolerations":[{"effect":"NoExecute","key":"node.kubernetes.io/not-ready","operator":"Exists","tolerationSeconds":10},{"effect":"NoExecute","key":"node.alpha.kubernetes.io/notReady","operator":"Exists","tolerationSeconds":10},{"effect":"NoExecute","key":"node.alpha.kubernetes.io/unreachable","operator":"Exists","tolerationSeconds":10}],"volumes":[{"emptyDir":{},"name":"vault-token"},{"name":"cosodata","persistentVolumeClaim":{"claimName":"helm-data-vol"}}]}}}}
17:37:02.218 [main] WARN org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntime - Statefulset already present for function public/default/first-function

@sandeep-kotagiri

I think the problem is that you didn't set the right settings so the functions end up using the service URLs in the worker config. When a Function Worker is running along with a broker, the service URLs are set to the broker IP addresses. So when you submit a function, the functions will end up using the broker service URLs.

In order to make this work correctly, you have to set the service URLs in the Kubernetes runtime config like the following. Here is an example: https://github.com/streamnative/charts/blob/master/charts/pulsar/templates/broker-configmap.yaml#L75

PF_functionRuntimeFactoryConfigs_pulsarAdminUrl
PF_functionRuntimeFactoryConfigs_pulsarServiceUrl

@Anonymitaet I think this issue here is a documentation issue. We need to fix the documentation and make sure it is stated clearly.

@sijie

I tested out these settings just now. By setting PF_functionRuntimeFactoryConfigs_pulsarAdminUrl and PF_functionRuntimeFactoryConfigs_pulsarServiceUrl parameters to my proxy URLs, I am able to withstand broker restarts.

I also looked at the code just now. I have looked at this code in the past and it has been an oversight from my end. We do set the other Kubernetes Function Runtime Factory Configurations. However, I totally forgot about these two.

Thank you for these pointers. We can close this issue.

Was this page helpful?
0 / 5 - 0 ratings