Ksql: KSQL prints AVRO topic as STRING

Created on 9 Jul 2018  路  19Comments  路  Source: confluentinc/ksql

Robin@asgard02 ~/c/confluent-5.0.0-beta180702222458> ./bin/ksql-datagen quickstart=orders format=avro schemaRegistryUrl=http://localhost:8081

Schema exists:

Robin@asgard02 ~/c/confluent-5.0.0-beta180702222458> curl -s localhost:8081/subjects/orders_kafka_topic_avro-value/versions/latest
{"subject":"orders_kafka_topic_avro-value","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"KSQLDefaultSchemaName\",\"fields\":[{\"name\":\"ordertime\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"orderid\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"itemid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"orderunits\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"address\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"KSQLDefaultSchemaName_address\",\"fields\":[{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"state\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"zipcode\",\"type\":[\"null\",\"long\"],\"default\":null}]}],\"default\":null}]}"}
ksql> print 'orders_kafka_topic_avro' from beginning;
Format:STRING
7/7/18 6:22:40 PM UTC , 0 , \x00\x00\x00\x00\x01\x02\xFC\xD7\xA8\xE1\xB2W\x02\x00\x02\x10Item_374\x02F\xC1\xEA.w\x5C\x1E@\x02\x02\x0ECity_17\x02\x10State_74\x02\xC6\xC0\x0A
7/7/18 6:22:40 PM UTC , 1 , \x00\x00\x00\x00\x01\x02\xCC\xB3\xE8\xD2\xD2V\x02\x02\x02\x10Item_459\x02\xCC\xC7\x0F\xE7\xA5m\xF5?\x02\x02\x0ECity_87\x02\x10State_97\x02\xC4\xC5\x0A
7/7/18 6:22:40 PM UTC , 2 , \x00\x00\x00\x00\x01\x02\xE4\xBD\xAC\xBF\x8EW\x02\x04\x02\x10Item_764\x02x\xEAnl\xF7\x09\x0D@\x02\x02\x0ECity_38\x02\x10State_37\x02\xB4\x93\x0A
7/7/18 6:22:41 PM UTC , 3 , \x00\x00\x00\x00\x01\x02\xDE\xA5\xC0\xCB\x84X\x02\x06\x02\x10Item_542\x02\x16\xA9\x81\xF4\xC5\x9D\x14@\x02\x02\x0ECity_64\x02\x10State_84\x02\xC8\x96\x06
7/7/18 6:22:41 PM UTC , 4 , \x00\x00\x00\x00\x01\x02\xAA\xCD\xFA\xB3\x8DX\x02\x08\x02\x10Item_837\x02\xFD\xC0\xE1\xD6n\x86\x1D@\x02\x02\x0ECity_62\x02\x10State_84\x02\xD6\x8F\x07

Maybe SR was unreachable, because straight after tried:

ksql> create stream orders with (kafka_topic='orders_kafka_topic_avro', value_format='avro');
 Unable to verify the AVRO schema is compatible with KSQL. Connection refused (Connection refused)

Restart entire stack, re-run datagen鈥攚orks fine now:

ksql> print 'orders_kafka_topic_avro' from beginning;
Format:AVRO
07/07/18 19:44:58 BST, 0, {"ordertime": 1489341619565, "orderid": 0, "itemid": "Item_705", "orderunits": 6.424277452435259, "address": {"city": "City_17", "state": "State_49", "zipcode": 78468}}
07/07/18 19:44:58 BST, 1, {"ordertime": 1508347188454, "orderid": 1, "itemid": "Item_432", "orderunits": 2.764169052800083, "address": {"city": "City_54", "state": "State_19", "zipcode": 98638}}
07/07/18 19:44:58 BST, 2, {"ordertime": 1513264604593, "orderid": 2, "itemid": "Item_884", "orderunits": 1.5032338216341627, "address": {"city": "City_49", "state": "State_21", "zipcode": 59780}}
07/07/18 19:44:59 BST, 3, {"ordertime": 1496294298548, "orderid": 3, "itemid": "Item_827", "orderunits": 8.711524279478931, "address": {"city": "City_57", "state": "State_45", "zipcode": 51950}}

Most helpful comment

@rdinkel so to your specific issue; your Schema Registry is not reachable. This can actually be seen in the CREATE STREAM that you tried to run:

Reason: Connection refused (Connection refused)

Looking at your Docker Compose, you've not specified the Schema Registry for KSQL Server, and so it will be defaulting to localhost I think. You can verify this in the KSQL server logs, which in my environment read:

[2018-12-19 21:21:00,817] INFO KafkaAvroDeserializerConfig values:
   schema.registry.url = [http://schema-registry:8081]

See here for an example of the stack with Schema Registry correctly configured.

All 19 comments

interesting - i saw the same thing (Format: STRING) for a JSON topic a couple of days ago but forgot to log it

@rmoff can you share the server logs from the print which returned the string format?

Unfortunately not, I don't have them anymore. Will be sure to save them if it re-occurs.

I am getting (Format: STRING) for a JSON topic created from a filestreamsource connector.
I am reading a logback JSON format log file for processing in kafka.
Can i create/specify a JSON or AVRO schema for a FileStreamSource connector for this? How?

This is the output from the topic with the kafka-console-consumer:

{"schema":null,"payload":null} {"schema":{"type":"string","optional":false},"payload":"{\"@timestamp\":\"2018-09-18T15:07:43.455-04:00\",\"@version\":1,\"message\":\"Debug message\",\"logger_name\":\"jsonLogger\",\"thread_name\":\"main\",\"level\":\"DEBUG\",\"level_value\":10000,\"HOSTNAME\":\"RohitCho-mbpr15\"}"}

This is the output from topic in ksql:

ksql> print 'json-log' from beginning;
Format:STRING
9/19/18 4:09:42 PM EDT , {"schema":null,"payload":null} , {"schema":{"type":"string","optional":false},"payload":"{\x5C"@timestamp\x5C":\x5C"2018-09-18T15:07:43.455-04:00\x5C",\x5C"@version\x5C":1,\x5C"message\x5C":\x5C"Debug message\x5C",\x5C"logger_name\x5C":\x5C"jsonLogger\x5C",\x5C"thread_name\x5C":\x5C"main\x5C",\x5C"level\x5C":\x5C"DEBUG\x5C",\x5C"level_value\x5C":10000,\x5C"HOSTNAME\x5C":\x5C"RohitCho-mbpr15\x5C"}"}

Here is my connector definition:

{
"name": "json-file-src",
"config": {
"connector.class": "FileStreamSource",
"file": "/Users/rohit.chowdhary/tmp/json-log.json",
"topic": "json-log",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter"
}
}

Hi, wanted to see if there is any resolution to this issue?
Or if it is really an issue?
Is there a workaround?
Thanks

Hey, I also have this issue at work. I should be an AVRO format, but is a string thing. Was not able to fix it! Now I try to get things work with json and not bother with the format and schema stuff. Please help

@rdinkel can you post details of the version you're running, a sample of the actual message from the topic (using e.g. kafkacat), and what you're seeing in KSQL

yes, of course. ASAP I get to work tomorrow morning.

Short Info about setup. It is a demo for an evaluation for a new project. 1x postgres, 1x mongodb. We want to join the CDC streams and look up data to enrich it. Currently the postgres holds a table "comments" and mongo the "customers". The following is just enough to get data from postgres to kafka and ksql.

.env

DEBEZIUM_VERSION=0.9.0.Beta1

docker-compose:

services:
  zookeeper:
    image: debezium/zookeeper:${DEBEZIUM_VERSION}
    ports:
      - 2181:2181
      - 2888:2888
      - 3888:3888
    networks:
      app_net:
        ipv4_address: 172.18.18.05
  kafka:
    image: debezium/kafka:${DEBEZIUM_VERSION}
    ports:
      - 9092:9092
    networks:
      app_net:
        ipv4_address: 172.18.18.06
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
  postgres:
    image: debezium/postgres:11-alpine
    networks:
      app_net:
        ipv4_address: 172.18.18.60
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
  mongodb:
    image: debezium/example-mongodb:0.9
    hostname: mongodb
    ports:
      - 27017:27017
    networks:
      app_net:
        ipv4_address: 172.18.18.65
    environment:
      - MONGODB_USER=debezium
      - MONGODB_PASSWORD=dbz
  connect:
    image: debezium/connect:${DEBEZIUM_VERSION}
    ports:
      - 8083:8083
    networks:
      app_net:
        ipv4_address: 172.18.18.07
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
      - VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
      - INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
      - INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
      - LOG_LEVEL=ERROR
      - CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
      - CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
  schema-registry:
    image: confluentinc/cp-schema-registry
    ports:
      - 8181:8181
      - 8081:8081
    networks:
      app_net:
        ipv4_address: 172.18.18.08
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
      - SCHEMA_REGISTRY_HOST_NAME=schema-registry
      - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081

  ksql-server:
    image: confluentinc/cp-ksql-server:5.0.1
    hostname: ksql-server
    networks:
      app_net:
        ipv4_address: 172.18.18.70
    depends_on:
      - kafka
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"
      KSQL_BOOTSTRAP_SERVERS: "kafka:9092"
      KSQL_HOST_NAME: ksql-server
      KSQL_APPLICATION_ID: "cp-all-in-one"
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
  ksql-cli:
    image: confluentinc/cp-ksql-cli:5.0.1
    networks:
      app_net:
        ipv4_address: 172.18.18.75
    depends_on:
      - kafka
      - connect
      - ksql-server
    entrypoint: /bin/sh
    tty: true

networks:
  app_net:
    driver: bridge
    driver_opts:
      com.docker.network.enable_ipv6: "false"
    ipam:
      config:
        - subnet: 172.18.18.0/24

Postgres Connector:

{
  "name": "comments-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname" : "postgres",
    "database.server.name": "dbserver1",
    "database.whitelist": "comments",

    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schema.registry.url": "http://schema-registry:8081",

    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope"

  }
}

add the comments table

create table comments
(
  id serial not null,
  comment VARCHAR(255),
  customer_id int default 1
);

create unique index comments_id_uindex
  on comments (id);

alter table comments
  add constraint comments_pk
    primary key (id);

Add Connector config:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @kafka_connectors/postgres_register_comments.json

Start ksql shell

docker-compose exec ksql-cli ksql http://ksql-server:8088

kafkacat command:

docker run --tty --interactive --network demo_app_net --rm confluentinc/cp-kafkacat kafkacat -b kafka:9092 -C -f '\nKey (%K bytes): %k\t\nValue (%S bytes): %s\n\Partition: %p\tOffset: %o\n--\n' -t dbserver1.public.comments

kafkacat output:

Key (6 bytes):  
Value (16 bytes): 
                  asdads
Partition: 0    Offset: 0
--
% Reached end of topic dbserver1.public.comments [0] at offset 1

Key (6 bytes):  
Value (33 bytes): .Hello, This is Patrick!
Partition: 0    Offset: 1
--
% Reached end of topic dbserver1.public.comments [0] at offset 2

Key (6 bytes):  
Value (32 bytes): ,Hello, This is Sponge!
Partition: 0    Offset: 2
--
% Reached end of topic dbserver1.public.comments [0] at offset 3

Key (6 bytes): 
Value (31 bytes):*Hello, This is Doggo!

Partition: 0    Offset: 3
--
% Reached end of topic dbserver1.public.comments [0] at offset 4

Key (6 bytes): 

Value (33 bytes): 
.Hello, This is Sparta!!

Partition: 0    Offset: 4
--
% Reached end of topic dbserver1.public.comments [0] at offset 5

Add Comment:

INSERT INTO comments(comment, customer_id)
VALUES ('Hello, This is Sparta!!', 5);

KSQL

ksql> PRINT 'dbserver1.public.comments' FROM BEGINNING;
Format:STRING
12/18/18 8:42:37 AM UTC ,  , \x00\x00\x00\x00\x02\x02\x02\x0Casdads\x02\x02
12/18/18 8:43:28 AM UTC ,  , \x00\x00\x00\x00\x02\x04\x02.Hello, This is Patrick!\x02\x02
12/18/18 8:45:33 AM UTC ,  , \x00\x00\x00\x00\x02\x06\x02,Hello, This is Sponge!\x02\x06
12/18/18 8:46:04 AM UTC , , \x00\x00\x00\x00\x02\x08\x02*Hello, This is Doggo!\x02\x0A
12/18/18 9:01:08 AM UTC , 
 , \x00\x00\x00\x00\x02\x0A\x02.Hello, This is Sparta!!\x02\x0A

I dont know where the dot symbol comes from, in front of the world Hello.
Also I do miss the primary key and customer_id.

@rdinkel thanks, this is great info. From the looks of things certainly it should just be Avro coming through from Connect into the topic, and thus handled fine by KSQL.
Since it's Avro, I would expect kafkacat to show 'funny' characters since it's not plain text.

What's the output from kafka-avro-console-consumer against the topic?

recently registered schema value

curl -X GET http://localhost:8081/subjects/dbserver1.public.comments-value/versions/latest

{"subject":"dbserver1.public.comments-value","version":1,"id":2,"schema":"{\"type\":\"record\",\"name\":\"Value\",\"namespace\":\"dbserver1.public.comments\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"comment\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"customer_id\",\"type\":[\"null\",\"int\"],\"default\":null}],\"connect.name\":\"dbserver1.public.comments.Value\"}"}

recently registered schema key

curl -X GET http://localhost:8081/subjects/dbserver1.public.comments-key/versions/latest

{"subject":"dbserver1.public.comments-key","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"dbserver1.public.comments\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"}],\"connect.name\":\"dbserver1.public.comments.Key\"}"}
docker-compose exec schema-registry /usr/bin/kafka-avro-console-consumer \
>     --bootstrap-server kafka:9092 \
>     --from-beginning \
>     --property print.key=true \
>     --property schema.registry.url=http://schema-registry:8081 \
>     --topic dbserver1.public.comments

[2018-12-18 09:42:43,443] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2018-12-18 09:42:43,757] INFO ConsumerConfig values: 
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [kafka:9092]
        check.crcs = true
        client.id = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = console-consumer-55265
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig)
[2018-12-18 09:42:43,831] INFO Kafka version : 2.0.1-cp1 (org.apache.kafka.common.utils.AppInfoParser)
[2018-12-18 09:42:43,831] INFO Kafka commitId : 815feb8a888d39d9 (org.apache.kafka.common.utils.AppInfoParser)
[2018-12-18 09:42:43,972] INFO Cluster ID: 6neLt4bCRnqCWUimz4O4yg (org.apache.kafka.clients.Metadata)
[2018-12-18 09:42:43,973] INFO [Consumer clientId=consumer-1, groupId=console-consumer-55265] Discovered group coordinator 172.18.18.6:9092 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-12-18 09:42:43,975] INFO [Consumer clientId=consumer-1, groupId=console-consumer-55265] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-12-18 09:42:43,976] INFO [Consumer clientId=consumer-1, groupId=console-consumer-55265] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-12-18 09:42:43,998] INFO [Consumer clientId=consumer-1, groupId=console-consumer-55265] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-12-18 09:42:44,000] INFO [Consumer clientId=consumer-1, groupId=console-consumer-55265] Setting newly assigned partitions [dbserver1.public.comments-0] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-12-18 09:42:44,032] INFO [Consumer clientId=consumer-1, groupId=console-consumer-55265] Resetting offset for partition dbserver1.public.comments-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher)


{"id":1}        {"id":1,"comment":{"string":"Hello, This is Sparta!!"},"customer_id":{"int":5}}

@rdinkel interesting. So does this work?

CREATE STREAM COMMENTS WITH (KAFKA_TOPIC='dbserver1.public.comments', VALUE_FORMAT='AVRO');
DESCRIBE COMMENTS;
SET 'auto.offset.reset'='earliest';
SELECT * FROM COMMENTS;

it doesnt.

ksql> CREATE STREAM COMMENTS WITH (KAFKA_TOPIC='dbserver1.public.comments', VALUE_FORMAT='AVRO');

Unable to verify if the Avro schema for topic dbserver1.public.comments is compatible with KSQL.
Reason: Connection refused (Connection refused)

Connection is ok, I guess. I can still do this:

ksql> PRINT 'dbserver1.public.comments' FROM BEGINNING;
Format:STRING
12/18/18 10:10:55 AM UTC ,  , \x00\x00\x00\x00\x02\x02\x02,Hello, This is Doggo!!\x02\x0A

Probably this issue is connected to https://github.com/confluentinc/ksql/issues/1691 ? But unfortunately a restart doesnt help.

@rmoff Can you provide a working example/demo from a fresh setup, please?

I've reproduced this behaviour, and it occurs when the KSQL Server cannot reach the Schema Registry. I've logged #2293 suggesting that KSQL should warn the user if it fails to deserialise what it has identified as Avro data.

@rdinkel so to your specific issue; your Schema Registry is not reachable. This can actually be seen in the CREATE STREAM that you tried to run:

Reason: Connection refused (Connection refused)

Looking at your Docker Compose, you've not specified the Schema Registry for KSQL Server, and so it will be defaulting to localhost I think. You can verify this in the KSQL server logs, which in my environment read:

[2018-12-19 21:21:00,817] INFO KafkaAvroDeserializerConfig values:
   schema.registry.url = [http://schema-registry:8081]

See here for an example of the stack with Schema Registry correctly configured.

@rmoff literally been chasing my tail on this for entirely long, until I read your recommendation to register schema-registry with ksql. Thank you so much, glad I stumbled upon this.

My data was also coming in and displaying it as a string, but soon as I applied the schema.registry.url it displays as avro and as expected.

@rmoff the link does not work anymore. Please update : https://github.com/confluentinc/demo-scene/blob/e05ca9b85497bde042bf268e1f7604f43c4bc554/cos/docker-compose.yml#L54

Summary, adding this line worked for me :

ksql-server:
  ...
  environment :
  ...
    KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
  ...

Thanks

Was this page helpful?
0 / 5 - 0 ratings