I am trying to implement KEDA for Confluent Kafka. I have tested out different scenarios but the error i get is
2020-10-12T14:39:02.788Z ERROR controllers.ScaledObject Failed to ensure HPA is correctly created for ScaledObject {"ScaledObject.Namespace": "keda", "ScaledObject.Name": "kafka-scaledobject", "error": "error getting scaler for trigger #0: error creating kafka client: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"}
github.com/go-logr/zapr.(*zapLogger).Error
/go/pkg/mod/github.com/go-logr/[email protected]/zapr.go:128
github.com/kedacore/keda/controllers.(*ScaledObjectReconciler).Reconcile
/workspace/controllers/scaledobject_controller.go:146
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).reconcileHandler
/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:235
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).processNextWorkItem
/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:209
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).worker
/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:188
k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:155
k8s.io/apimachinery/pkg/util/wait.BackoffUntil
/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:156
k8s.io/apimachinery/pkg/util/wait.JitterUntil
/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:133
k8s.io/apimachinery/pkg/util/wait.Until
/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:90
2020-10-12T14:39:02.796Z ERROR controller Reconciler error {"reconcilerGroup": "keda.sh", "reconcilerKind": "ScaledObject", "controller": "scaledobject", "name": "kafka-scaledobject", "namespace": "keda", "error": "error getting scaler for trigger #0: error creating kafka client: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"}
github.com/go-logr/zapr.(*zapLogger).Error
/go/pkg/mod/github.com/go-logr/[email protected]/zapr.go:128
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).reconcileHandler
/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:237
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).processNextWorkItem
/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:209
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).worker
/go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:188
k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:155
k8s.io/apimachinery/pkg/util/wait.BackoffUntil
/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:156
k8s.io/apimachinery/pkg/util/wait.JitterUntil
/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:133
k8s.io/apimachinery/pkg/util/wait.Until
/go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:90
I deployed a scaled object as follows.
Scenario 1 without TriggerAuthentication
triggers:
- type: kafka
metadata:
bootstrapServers: kafka.svc:9092
consumerGroup: my-group
topic: test-topic
lagThreshold: '5'
Scenario 2 with TriggerAuthentication
triggers:
- type: kafka
metadata:
bootstrapServers: kafka.svc:9092
consumerGroup: my-group
topic: test-topic
lagThreshold: '5'
authenticationRef:
name: keda-trigger-auth-kafka-credential
apiVersion: v1
kind: Secret
metadata:
name: keda-kafka-secrets
namespace: keda
data:
sasl: "plaintext"
username: "My confluent kafka API key"
password: "My confluent kafka API key secret"
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
name: keda-trigger-auth-kafka-credential
namespace: keda
spec:
secretTargetRef:
- parameter: sasl
name: keda-kafka-secrets
key: sasl
- parameter: username
name: keda-kafka-secrets
key: username
- parameter: password
name: keda-kafka-secrets
key: password
After deploying the scaled object, it should get the hpa.
ERROR controllers.ScaledObject Failed to ensure HPA is correctly created for ScaledObject {"ScaledObject.Namespace": "keda", "ScaledObject.Name": "kafka-scaledobject", "error": "error getting scaler for trigger #0: error creating kafka client: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"}
The API version on Scaledobject and TriggerAuthentication is keda.sh/v1alpha1
I'm not a Confluent Kafka expert, does it run inside your cluster or do they manage it? From the looks of it, the address (kafka.svc:9092) points to a local service (which is also the one from our docs), is that correct?
Our confluent Kafka does not run on kubernetes cluster. We are using the Confluent Kafka bootstrap servers(pkc-xxxxx.eastus2.azure.confluent.cloud:9092). We did try by replacing the address but still the same error
@harshik9 okay, so that's the value that should be in bootstrapServers section.
@zroubalik, yes. We tried by using that value in the bootstrapServers. What should be the sasl value when connecting to confluent Kafka.
Same issue here running a Kafka cluster locally, alongside with Keda installed on an on-premise K8s cluster. As a test, I created a Pod and was able to curl Kakfa topics from inside that Pod on my K8s. Apparently, Kafka cluster is reachable from inside my K8s cluster (initially I thought that that was the problem) but it feels like it is Keda that cannot reach Kafka endpoints.
Currently running Kafka without authentication, using Keda v2 and K8s 1.18!
Same issue here running a Kafka cluster locally, alongside with Keda installed on an on-premise K8s cluster. As a test, I created a Pod and was able to curl Kakfa topics from inside that Pod on my K8s. Apparently, Kafka cluster is reachable from inside my K8s cluster (initially I thought that that was the problem) but it feels like it is Keda that cannot reach Kafka endpoints.
The scenario you described here above, is Kafka with or without authentication?
Same issue here running a Kafka cluster locally, alongside with Keda installed on an on-premise K8s cluster. As a test, I created a Pod and was able to curl Kakfa topics from inside that Pod on my K8s. Apparently, Kafka cluster is reachable from inside my K8s cluster (initially I thought that that was the problem) but it feels like it is Keda that cannot reach Kafka endpoints.
The scenario you described here above, is Kafka with or without authentication?
Without authentication.
I was able to connect to confluent kafka with Authentication by going down on the KEDA version to 1.5. https://keda.sh/docs/1.5/scalers/apache-kafka/
The authmode that I used is sasl_ssl_plain. I deployed all the files from here --> https://github.com/kedacore/keda/releases/tag/v1.5.0 from the deploy folder.
Hm that's strange, unfortunately I am not able to reproduce that. I am using in-cluster Kafka (instaled via Strimzi) and it is working (the same for our e2e tests that are passing).
Is there anything non standard in your Kafka config?
I was able to connect to confluent kafka with Authentication by going down on the KEDA version to 1.5. https://keda.sh/docs/1.5/scalers/apache-kafka/
The authmode that I used is sasl_ssl_plain. I deployed all the files from here --> https://github.com/kedacore/keda/releases/tag/v1.5.0 from the deploy folder.
@harshik9 for that you used the same configuration of ScaledObject and TriggerAuthentication as mentioned above?
@zroubalik Our confluent Kafka is on azure. When we used the KEDA 2.0 BETA version. The Authentication parameters were sasl, Username and password. With sasl= plaintext, we were unable to connect but when I went down on the KEDA version 1.5, I had authentication parameters as authMode, username and password. The authMode that i gave was sasl_ssl_plain and this worked for me. I used the same scaledObject as in here --> https://keda.sh/docs/1.5/scalers/apache-kafka/ . But there was no necessity of providing a ca, cert and key. Username was our azure confluent kafka API key and password was our Confluent kafka API key secret.
I belive the issue was fixed by #1288 please reopen if the problem still persists
Most helpful comment
I'm not a Confluent Kafka expert, does it run inside your cluster or do they manage it? From the looks of it, the address (kafka.svc:9092) points to a local service (which is also the one from our docs), is that correct?