Sarama: How to configure SASL/OAUTHBEARER based producer/consumer?

Created on 20 Mar 2019  路  7Comments  路  Source: Shopify/sarama

Versions

Sarama Version: V2_2_0_0
Kafka Version: kafka_2.12-2.1.1
Go Version: go1.11

Configuration

I have setup kafka with SASL/OAUTHBEARER with unseureLogin
config/server.properties has following configuration added:

listeners=SASL_PLAINTEXT://:9093
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
sasl.enabled.mechanisms=OAUTHBEARER

config/kafka_server_jaas.conf is as below:

KafkaServer {
    org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
    unsecuredLoginStringClaim_sub="admin";
};

producer.properties has below configurations

security.protocol=SASL_PLAINTEXT
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub="bob";
**consumer.properties has below configurations**
security.protocol=SASL_PLAINTEXT
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub="john";

By above configurations I am able to produce and consume messages on kafka

Problem Description

Sarama configuration for consumer client are:

    consumerConfig := sarama.NewConfig()
    consumerConfig.Net.SASL.Enable = true
    consumerConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth
    consumerConfig.Consumer.Return.Errors = true
    consumerConfig.Version = sarama.MaxVersion
    client, err := sarama.NewClient([]string{"localhost:9093"}, consumerConfig)

I want to add following configuration.
consumerConfig.Net.SASL.TokenProvider =

Can you please help me to add this configuration. So, that I can consume/produce messages from kafka with unsecuredLoginStringClaim. In future, I want to make it secure as well.

Please help. Thanks in advance.

question stale

Most helpful comment

@amitgurav04 I threw together a very simple pkg which implements AccessTokenProvider using oauth2/clientcredentials pkg. I'm using Keycloak with this but as mentioned in the thread it's not tied to any particular auth provider.

Feel free to take a look - https://github.com/damiannolan/sasl

All 7 comments

TokenProvider takes an interface. So any type that has a function Token() will work.

Have a look at https://github.com/Shopify/sarama/blob/master/broker_test.go#L131 and see if that helps. If not, let me know.

@amitgurav04 This might help you get started (haven't actually tested it yet, sorry).

The example below is based upon the go-kafkacat implementation.

type UnsecuredTokenProvider struct {
    Principal string
}

func (t *UnsecuredTokenProvider) Token() (*sarama.AccessToken, error) {

    now := time.Now().Unix()

    claims := map[string]interface{}{
        "sub": t.Principal,
        "iat": now,
        "exp": now + int64(3600),
    }

    claimsJSON, err := json.Marshal(claims)
    if err != nil {
        return nil, err
    }

    header := "eyJhbGciOiJub25lIn0" // {"alg":"none"}
    jwt := header + "." + base64.RawURLEncoding.EncodeToString(claimsJSON) + "."

    return &sarama.AccessToken{Token: jwt}, nil
}

consumerConfig.Net.SASL.TokenProvider = &UnsecuredTokenProvider{Principal: "john"}

Thanks @mkaminski1988, I tried the example given. And it's working.

Now, I want to make it secure using KEYCLOAK oauth. How can I configure producer/consumer client of sarama? Please help.

Thanks in advance.

@amitgurav04 SASL/OAUTHBEARER is not tied to any particular auth platform and is meant to be flexible. Perhaps you could implement a AccessTokenProvider instance that uses an HTTP client to obtain credentials from Keycloak.

@amitgurav04 I threw together a very simple pkg which implements AccessTokenProvider using oauth2/clientcredentials pkg. I'm using Keycloak with this but as mentioned in the thread it's not tied to any particular auth provider.

Feel free to take a look - https://github.com/damiannolan/sasl

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur.
Please check if the master branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

jinleileiking picture jinleileiking  路  4Comments

damiannolan picture damiannolan  路  7Comments

chandradeepak picture chandradeepak  路  3Comments

male110 picture male110  路  6Comments

korjavin picture korjavin  路  3Comments