Keda: Unable to connect to confluent kafka

Created on 12 Oct 2020  路  12Comments  路  Source: kedacore/keda

I am trying to implement KEDA for Confluent Kafka. I have tested out different scenarios but the error i get is

2020-10-12T14:39:02.788Z    ERROR   controllers.ScaledObject    Failed to ensure HPA is correctly created for ScaledObject  {"ScaledObject.Namespace": "keda", "ScaledObject.Name": "kafka-scaledobject", "error": "error getting scaler for trigger #0: error creating kafka client: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"}
github.com/go-logr/zapr.(*zapLogger).Error
    /go/pkg/mod/github.com/go-logr/[email protected]/zapr.go:128
github.com/kedacore/keda/controllers.(*ScaledObjectReconciler).Reconcile
    /workspace/controllers/scaledobject_controller.go:146
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).reconcileHandler
    /go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:235
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).processNextWorkItem
    /go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:209
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).worker
    /go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:188
k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
    /go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:155
k8s.io/apimachinery/pkg/util/wait.BackoffUntil
    /go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:156
k8s.io/apimachinery/pkg/util/wait.JitterUntil
    /go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:133
k8s.io/apimachinery/pkg/util/wait.Until
    /go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:90
2020-10-12T14:39:02.796Z    ERROR   controller  Reconciler error    {"reconcilerGroup": "keda.sh", "reconcilerKind": "ScaledObject", "controller": "scaledobject", "name": "kafka-scaledobject", "namespace": "keda", "error": "error getting scaler for trigger #0: error creating kafka client: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"}
github.com/go-logr/zapr.(*zapLogger).Error
    /go/pkg/mod/github.com/go-logr/[email protected]/zapr.go:128
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).reconcileHandler
    /go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:237
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).processNextWorkItem
    /go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:209
sigs.k8s.io/controller-runtime/pkg/internal/controller.(*Controller).worker
    /go/pkg/mod/sigs.k8s.io/[email protected]/pkg/internal/controller/controller.go:188
k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1
    /go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:155
k8s.io/apimachinery/pkg/util/wait.BackoffUntil
    /go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:156
k8s.io/apimachinery/pkg/util/wait.JitterUntil
    /go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:133
k8s.io/apimachinery/pkg/util/wait.Until
    /go/pkg/mod/k8s.io/[email protected]/pkg/util/wait/wait.go:90

I deployed a scaled object as follows.

Scenario 1 without TriggerAuthentication

triggers:
- type: kafka
  metadata:
    bootstrapServers: kafka.svc:9092
    consumerGroup: my-group
    topic: test-topic
    lagThreshold: '5'

Scenario 2 with TriggerAuthentication

triggers:
- type: kafka
  metadata:
    bootstrapServers: kafka.svc:9092
    consumerGroup: my-group
    topic: test-topic
    lagThreshold: '5'   
 authenticationRef:
      name: keda-trigger-auth-kafka-credential

apiVersion: v1
kind: Secret
metadata:
  name: keda-kafka-secrets
  namespace: keda
data:
  sasl: "plaintext"
  username: "My confluent kafka API key"
  password: "My confluent kafka API key secret"

apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
  name: keda-trigger-auth-kafka-credential
  namespace: keda
spec:
  secretTargetRef:
  - parameter: sasl
    name: keda-kafka-secrets
    key: sasl
  - parameter: username
    name: keda-kafka-secrets
    key: username
  - parameter: password
    name: keda-kafka-secrets
    key: password

Expected Behavior

After deploying the scaled object, it should get the hpa.

Actual Behavior

ERROR controllers.ScaledObject Failed to ensure HPA is correctly created for ScaledObject {"ScaledObject.Namespace": "keda", "ScaledObject.Name": "kafka-scaledobject", "error": "error getting scaler for trigger #0: error creating kafka client: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)"}

Steps to Reproduce the Problem

  1. Helm version is 3.3.x, Installed KEDA from https://github.com/kedacore/keda/releases/tag/v2.0.0-beta
  2. Deployed ScaledObject as mentioned above
  3. Checked the logs of KEDA operator and it errors out as mentioned above in the issue

Specifications

  • KEDA Version: 2.0.0-beta
  • Platform & Version: Azure Kubernetes Service
  • Kubernetes Version: 1.17.9
  • Scaler(s): Apache Kafka targeting Confluent Kafka

The API version on Scaledobject and TriggerAuthentication is keda.sh/v1alpha1

Scaledobjectsandtriggerauthentications.txt

bug scaler-kafka-topic

Most helpful comment

I'm not a Confluent Kafka expert, does it run inside your cluster or do they manage it? From the looks of it, the address (kafka.svc:9092) points to a local service (which is also the one from our docs), is that correct?

All 12 comments

I'm not a Confluent Kafka expert, does it run inside your cluster or do they manage it? From the looks of it, the address (kafka.svc:9092) points to a local service (which is also the one from our docs), is that correct?

Our confluent Kafka does not run on kubernetes cluster. We are using the Confluent Kafka bootstrap servers(pkc-xxxxx.eastus2.azure.confluent.cloud:9092). We did try by replacing the address but still the same error

@harshik9 okay, so that's the value that should be in bootstrapServers section.

@zroubalik, yes. We tried by using that value in the bootstrapServers. What should be the sasl value when connecting to confluent Kafka.

Same issue here running a Kafka cluster locally, alongside with Keda installed on an on-premise K8s cluster. As a test, I created a Pod and was able to curl Kakfa topics from inside that Pod on my K8s. Apparently, Kafka cluster is reachable from inside my K8s cluster (initially I thought that that was the problem) but it feels like it is Keda that cannot reach Kafka endpoints.

Currently running Kafka without authentication, using Keda v2 and K8s 1.18!

Same issue here running a Kafka cluster locally, alongside with Keda installed on an on-premise K8s cluster. As a test, I created a Pod and was able to curl Kakfa topics from inside that Pod on my K8s. Apparently, Kafka cluster is reachable from inside my K8s cluster (initially I thought that that was the problem) but it feels like it is Keda that cannot reach Kafka endpoints.

The scenario you described here above, is Kafka with or without authentication?

Same issue here running a Kafka cluster locally, alongside with Keda installed on an on-premise K8s cluster. As a test, I created a Pod and was able to curl Kakfa topics from inside that Pod on my K8s. Apparently, Kafka cluster is reachable from inside my K8s cluster (initially I thought that that was the problem) but it feels like it is Keda that cannot reach Kafka endpoints.

The scenario you described here above, is Kafka with or without authentication?

Without authentication.

I was able to connect to confluent kafka with Authentication by going down on the KEDA version to 1.5. https://keda.sh/docs/1.5/scalers/apache-kafka/
The authmode that I used is sasl_ssl_plain. I deployed all the files from here --> https://github.com/kedacore/keda/releases/tag/v1.5.0 from the deploy folder.

Hm that's strange, unfortunately I am not able to reproduce that. I am using in-cluster Kafka (instaled via Strimzi) and it is working (the same for our e2e tests that are passing).

Is there anything non standard in your Kafka config?

I was able to connect to confluent kafka with Authentication by going down on the KEDA version to 1.5. https://keda.sh/docs/1.5/scalers/apache-kafka/
The authmode that I used is sasl_ssl_plain. I deployed all the files from here --> https://github.com/kedacore/keda/releases/tag/v1.5.0 from the deploy folder.

@harshik9 for that you used the same configuration of ScaledObject and TriggerAuthentication as mentioned above?

@zroubalik Our confluent Kafka is on azure. When we used the KEDA 2.0 BETA version. The Authentication parameters were sasl, Username and password. With sasl= plaintext, we were unable to connect but when I went down on the KEDA version 1.5, I had authentication parameters as authMode, username and password. The authMode that i gave was sasl_ssl_plain and this worked for me. I used the same scaledObject as in here --> https://keda.sh/docs/1.5/scalers/apache-kafka/ . But there was no necessity of providing a ca, cert and key. Username was our azure confluent kafka API key and password was our Confluent kafka API key secret.

I belive the issue was fixed by #1288 please reopen if the problem still persists

Was this page helpful?
0 / 5 - 0 ratings