Describe the bug
We are running Pulsar in Kubernetes. We also utilize Kubernetes as Functions Runtime. When we start a function, a Statefulset gets created. The startup command from the Statefulset point to the Broker's pod IP Address as part of the --admin-url and --pulsar_serviceurl parameters.
If the Broker pods were to restart, existing functions will not work forcing us to delete the function and re-create the function.
We are using Pulsar Proxy in our environment. As part of our Broker deployment, we configure PF_pulsarServiceUrl, PF_pulsarWebServiceUrl to our Proxy end points. However, the Functions Runtime is overwritten to utilize Broker's pod IP Address.
To Reproduce
Steps to reproduce the behavior:
Expected behavior
In the scenario when we deploy Pulsar in Kubernetes and use Kubernetes as Pulsar Functions Runtime, functions should be able to withstand Broker restarts.
Screenshots
Desktop (please complete the following information):
Additional context
Add any other context about the problem here.
I had a chance to look at the broker logs once again after restarting all broker per my instructions above.
It does look like there is an attempt to re-create the function with the correct Broker pod IP Address. However, since a Statefulset with the same name exists, creation of the new Statefulset fails with a WARN message.
17:37:01.420 [main] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/functions/assignments][reader-6eff432ebd] Get topic last message Id
17:37:01.430 [pulsar-client-io-51-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/functions/assignments][reader-6eff432ebd] Successfully getLastMessageId 82:0
17:37:01.431 [main] INFO org.apache.pulsar.functions.worker.FunctionAssignmentTailer - Received assignment update: instance {
functionMetaData {
functionDetails {
tenant: "public"
namespace: "default"
name: "first-function"
className: "org.apache.pulsar.functions.api.examples.ExclamationFunction"
autoAck: true
parallelism: 1
source {
typeClassName: "java.lang.String"
inputSpecs {
key: "persistent://public/default/topicAIn"
value {
}
}
cleanupSubscription: true
}
sink {
topic: "persistent://public/default/topicAOut"
typeClassName: "java.lang.String"
}
resources {
cpu: 1.0
ram: 1073741824
disk: 10737418240
}
componentType: FUNCTION
customRuntimeOptions: "{\"cosoDataVolumeClaim\":\"helm-data-vol\",\"cosoDataVolume\":\"coso-functions\"}"
}
packageLocation {
packagePath: "public/default/first-function/9f17b0ae-bf92-47ae-aa9a-fad94cf6dc45-api-examples.jar"
originalFileName: "api-examples.jar"
}
createTime: 1588699700863
}
instanceId: -1
}
workerId: "c-itomdipulsar-fw-172.16.17.183-8080"
17:37:01.664 [main] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/functions/assignments][reader-6eff432ebd] Get topic last message Id
17:37:01.676 [pulsar-client-io-51-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/functions/assignments][reader-6eff432ebd] Successfully getLastMessageId 82:0
17:37:01.683 [main] INFO org.apache.pulsar.functions.worker.FunctionRuntimeManager - infos: {}
17:37:01.684 [main] INFO org.apache.pulsar.functions.worker.FunctionActioner - public/default/first-function--1 Starting function ...
17:37:01.693 [main] INFO org.apache.pulsar.functions.runtime.RuntimeSpawner - public/default/first-function--1 RuntimeSpawner starting function
17:37:01.776 [main] INFO org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntime - Submitting the following service to k8 {"metadata":{"labels":{"component":"function","namespace":"default","name":"first-function","tenant":"public"},"name":"pf-public-default-first-function","namespace":"sandeep-sandbox"},"spec":{"clusterIP":"None","ports":[{"name":"grpc","port":9093,"protocol":"TCP"}],"selector":{"component":"function","namespace":"default","name":"first-function","tenant":"public"}}}
17:37:02.041 [main] WARN org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntime - Service already present for function public/default/first-function
17:37:02.055 [main] INFO org.apache.pulsar.functions.utils.Actions - Sucessfully completed action [ Submitting service for function public/default/first-function ]
17:37:02.196 [main] INFO org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntime - Submitting the following spec to k8 {"metadata":{"labels":{"app":"itomdipulsar","component":"function","app.kubernetes.io/managed-by":"pulsar","app.kubernetes.io/name":"itomdipulsar","release":"pulsar","namespace":"default","name":"first-function","heritage":"Helm","tire.backend":"backend","capability":"pulsar","tenant":"public","app.kubernetes.io/version":"2.5.1-97"},"name":"pf-public-default-first-function","namespace":"sandeep-sandbox"},"spec":{"podManagementPolicy":"Parallel","replicas":1,"selector":{"matchLabels":{"component":"function","namespace":"default","name":"first-function","tenant":"public"}},"serviceName":"pf-public-default-first-function","template":{"metadata":{"annotations":{"pod.boostport.com/vault-approle":"sandeep-sandbox-default","prometheus.io/port":"9094","pod.boostport.com/vault-init-container":"generate-certificates","prometheus.io/scrape":"true"},"labels":{"app":"itomdipulsar","component":"function","app.kubernetes.io/managed-by":"pulsar","app.kubernetes.io/name":"itomdipulsar","namespace":"default","name":"first-function","tenant":"public"}},"spec":{"containers":[{"image":"localhost:5000/hpeswitom/kubernetes-vault-renew:0.9.0-002","imagePullPolicy":"IfNotPresent","name":"certificate-renew","resources":{},"terminationMessagePath":"/dev/termination-log","terminationMessagePolicy":"File","volumeMounts":[{"mountPath":"/var/run/secrets/boostport.com","name":"vault-token"}]},{"command":["sh","-c","mkdir /pulsar/conf \u0026\u0026 cp -R /pulsar/stage-conf/* /pulsar/conf/ \u0026\u0026 source /pulsar/bin/mcf-pulsar-init.sh \u0026\u0026 /pulsar/bin/apply-config-from-env.py /pulsar/conf/client.conf \u0026\u0026 /pulsar/bin/pulsar-admin --admin-url https://172.16.17.183:8443 functions download --tenant public --namespace default --name first-function --destination-file /pulsar/api-examples.jar \u0026\u0026 SHARD_ID\u003d${POD_NAME##-} \u0026\u0026 echo shardId\u003d${SHARD_ID} \u0026\u0026 exec java -cp /pulsar/instances/java-instance.jar:/pulsar/instances/deps/ -Dpulsar.functions.extra.dependencies.dir\u003d/pulsar/instances/deps -Dpulsar.functions.instance.classpath\u003d/pulsar/conf:::/pulsar/lib/*: -Dlog4j.configurationFile\u003dkubernetes_instance_log4j2.xml -Dpulsar.function.log.dir\u003dlogs/functions/public/default/first-function -Dpulsar.function.log.file\u003dfirst-function-$SHARD_ID -Xmx1073741824 org.apache.pulsar.functions.instance.JavaInstanceMain --jar /pulsar/api-examples.jar --instance_id $SHARD_ID --function_id 02eef5d3-7da5-4e08-badc-7d4da40f3cd0 --function_version b4df2772-328c-413c-9874-40d5416b44de --function_details \u0027{\"tenant\":\"public\",\"namespace\":\"default\",\"name\":\"first-function\",\"className\":\"org.apache.pulsar.functions.api.examples.ExclamationFunction\",\"autoAck\":true,\"parallelism\":1,\"source\":{\"typeClassName\":\"java.lang.String\",\"inputSpecs\":{\"persistent://public/default/topicAIn\":{}},\"cleanupSubscription\":true},\"sink\":{\"topic\":\"persistent://public/default/topicAOut\",\"typeClassName\":\"java.lang.String\"},\"resources\":{\"cpu\":1.0,\"ram\":\"1073741824\",\"disk\":\"10737418240\"},\"componentType\":\"FUNCTION\",\"customRuntimeOptions\":\"{\\"cosoDataVolumeClaim\\":\\"helm-data-vol\\",\\"cosoDataVolume\\":\\"coso-functions\\"}\"}\u0027 --pulsar_serviceurl pulsar+ssl://172.16.17.183:6651 --client_auth_plugin org.apache.pulsar.client.impl.auth.AuthenticationTls --client_auth_params tlsCertFile:/var/run/secrets/boostport.com/server.crt,tlsKeyFile:/var/run/secrets/boostport.com/server.key --use_tls true --tls_allow_insecure false --hostname_verification_enabled false --tls_trust_cert_path /var/run/secrets/boostport.com/trustedCAs/RID_ca.crt --max_buffered_tuples 1024 --port 9093 --metrics_port 9094 --state_storage_serviceurl bk://itomdipulsar-bookkeeper:4181 --expected_healthcheck_interval -1 --secrets_provider org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider --cluster_name itomdipulsar"],"env":[{"name":"POD_NAME","valueFrom":{"fieldRef":{"fieldPath":"metadata.name"}}}],"envFrom":[{"configMapRef":{"name":"itomdipulsar-broker"}}],"image":"localhost:5000/hpeswitom/itom-pulsar-core:2.5.1-97","imagePullPolicy":"Always","name":"pulsarfunction","ports":[{"containerPort":9093,"name":"grpc"}],"resources":{"limits":{"memory":"1181116006","cpu":"1"},"requests":{"memory":"1181116006","cpu":"1"}},"volumeMounts":[{"mountPath":"/var/run/secrets/boostport.com","name":"vault-token"},{"mountPath":"first-function","name":"cosodata","subPath":"coso-functions/public/default"}]}],"dnsPolicy":"ClusterFirst","initContainers":[{"env":[{"name":"CERT_COMMON_NAME","value":"itomdipulsar-broker"}],"image":"localhost:5000/hpeswitom/kubernetes-vault-init:0.9.0-002","imagePullPolicy":"IfNotPresent","name":"generate-certificates","resources":{},"terminationMessagePath":"/dev/termination-log","terminationMessagePolicy":"File","volumeMounts":[{"mountPath":"/var/run/secrets/boostport.com","name":"vault-token"}]}],"terminationGracePeriodSeconds":0,"tolerations":[{"effect":"NoExecute","key":"node.kubernetes.io/not-ready","operator":"Exists","tolerationSeconds":10},{"effect":"NoExecute","key":"node.alpha.kubernetes.io/notReady","operator":"Exists","tolerationSeconds":10},{"effect":"NoExecute","key":"node.alpha.kubernetes.io/unreachable","operator":"Exists","tolerationSeconds":10}],"volumes":[{"emptyDir":{},"name":"vault-token"},{"name":"cosodata","persistentVolumeClaim":{"claimName":"helm-data-vol"}}]}}}}
17:37:02.218 [main] WARN org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntime - Statefulset already present for function public/default/first-function
@sandeep-kotagiri
I think the problem is that you didn't set the right settings so the functions end up using the service URLs in the worker config. When a Function Worker is running along with a broker, the service URLs are set to the broker IP addresses. So when you submit a function, the functions will end up using the broker service URLs.
In order to make this work correctly, you have to set the service URLs in the Kubernetes runtime config like the following. Here is an example: https://github.com/streamnative/charts/blob/master/charts/pulsar/templates/broker-configmap.yaml#L75
PF_functionRuntimeFactoryConfigs_pulsarAdminUrl
PF_functionRuntimeFactoryConfigs_pulsarServiceUrl
@sandeep-kotagiri - For your reference, this is the code about how KubernetesRuntime realize the actual service URLs to use - https://github.com/apache/pulsar/blob/772b789010267829cf3cd921db6782e0dbe59ab2/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java#L181
@Anonymitaet I think this issue here is a documentation issue. We need to fix the documentation and make sure it is stated clearly.
@sijie
I tested out these settings just now. By setting PF_functionRuntimeFactoryConfigs_pulsarAdminUrl and PF_functionRuntimeFactoryConfigs_pulsarServiceUrl parameters to my proxy URLs, I am able to withstand broker restarts.
I also looked at the code just now. I have looked at this code in the past and it has been an oversight from my end. We do set the other Kubernetes Function Runtime Factory Configurations. However, I totally forgot about these two.
Thank you for these pointers. We can close this issue.
Most helpful comment
@sijie
I tested out these settings just now. By setting PF_functionRuntimeFactoryConfigs_pulsarAdminUrl and PF_functionRuntimeFactoryConfigs_pulsarServiceUrl parameters to my proxy URLs, I am able to withstand broker restarts.
I also looked at the code just now. I have looked at this code in the past and it has been an oversight from my end. We do set the other Kubernetes Function Runtime Factory Configurations. However, I totally forgot about these two.
Thank you for these pointers. We can close this issue.