Robin@asgard02 ~/c/confluent-5.0.0-beta180702222458> ./bin/ksql-datagen quickstart=orders format=avro schemaRegistryUrl=http://localhost:8081
Schema exists:
Robin@asgard02 ~/c/confluent-5.0.0-beta180702222458> curl -s localhost:8081/subjects/orders_kafka_topic_avro-value/versions/latest
{"subject":"orders_kafka_topic_avro-value","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"KSQLDefaultSchemaName\",\"fields\":[{\"name\":\"ordertime\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"orderid\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"itemid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"orderunits\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"address\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"KSQLDefaultSchemaName_address\",\"fields\":[{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"state\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"zipcode\",\"type\":[\"null\",\"long\"],\"default\":null}]}],\"default\":null}]}"}
ksql> print 'orders_kafka_topic_avro' from beginning;
Format:STRING
7/7/18 6:22:40 PM UTC , 0 , \x00\x00\x00\x00\x01\x02\xFC\xD7\xA8\xE1\xB2W\x02\x00\x02\x10Item_374\x02F\xC1\xEA.w\x5C\x1E@\x02\x02\x0ECity_17\x02\x10State_74\x02\xC6\xC0\x0A
7/7/18 6:22:40 PM UTC , 1 , \x00\x00\x00\x00\x01\x02\xCC\xB3\xE8\xD2\xD2V\x02\x02\x02\x10Item_459\x02\xCC\xC7\x0F\xE7\xA5m\xF5?\x02\x02\x0ECity_87\x02\x10State_97\x02\xC4\xC5\x0A
7/7/18 6:22:40 PM UTC , 2 , \x00\x00\x00\x00\x01\x02\xE4\xBD\xAC\xBF\x8EW\x02\x04\x02\x10Item_764\x02x\xEAnl\xF7\x09\x0D@\x02\x02\x0ECity_38\x02\x10State_37\x02\xB4\x93\x0A
7/7/18 6:22:41 PM UTC , 3 , \x00\x00\x00\x00\x01\x02\xDE\xA5\xC0\xCB\x84X\x02\x06\x02\x10Item_542\x02\x16\xA9\x81\xF4\xC5\x9D\x14@\x02\x02\x0ECity_64\x02\x10State_84\x02\xC8\x96\x06
7/7/18 6:22:41 PM UTC , 4 , \x00\x00\x00\x00\x01\x02\xAA\xCD\xFA\xB3\x8DX\x02\x08\x02\x10Item_837\x02\xFD\xC0\xE1\xD6n\x86\x1D@\x02\x02\x0ECity_62\x02\x10State_84\x02\xD6\x8F\x07
Maybe SR was unreachable, because straight after tried:
ksql> create stream orders with (kafka_topic='orders_kafka_topic_avro', value_format='avro');
Unable to verify the AVRO schema is compatible with KSQL. Connection refused (Connection refused)
Restart entire stack, re-run datagen鈥攚orks fine now:
ksql> print 'orders_kafka_topic_avro' from beginning;
Format:AVRO
07/07/18 19:44:58 BST, 0, {"ordertime": 1489341619565, "orderid": 0, "itemid": "Item_705", "orderunits": 6.424277452435259, "address": {"city": "City_17", "state": "State_49", "zipcode": 78468}}
07/07/18 19:44:58 BST, 1, {"ordertime": 1508347188454, "orderid": 1, "itemid": "Item_432", "orderunits": 2.764169052800083, "address": {"city": "City_54", "state": "State_19", "zipcode": 98638}}
07/07/18 19:44:58 BST, 2, {"ordertime": 1513264604593, "orderid": 2, "itemid": "Item_884", "orderunits": 1.5032338216341627, "address": {"city": "City_49", "state": "State_21", "zipcode": 59780}}
07/07/18 19:44:59 BST, 3, {"ordertime": 1496294298548, "orderid": 3, "itemid": "Item_827", "orderunits": 8.711524279478931, "address": {"city": "City_57", "state": "State_45", "zipcode": 51950}}
interesting - i saw the same thing (Format: STRING) for a JSON topic a couple of days ago but forgot to log it
@rmoff can you share the server logs from the print which returned the string format?
Unfortunately not, I don't have them anymore. Will be sure to save them if it re-occurs.
I am getting (Format: STRING) for a JSON topic created from a filestreamsource connector.
I am reading a logback JSON format log file for processing in kafka.
Can i create/specify a JSON or AVRO schema for a FileStreamSource connector for this? How?
This is the output from the topic with the kafka-console-consumer:
{"schema":null,"payload":null} {"schema":{"type":"string","optional":false},"payload":"{\"@timestamp\":\"2018-09-18T15:07:43.455-04:00\",\"@version\":1,\"message\":\"Debug message\",\"logger_name\":\"jsonLogger\",\"thread_name\":\"main\",\"level\":\"DEBUG\",\"level_value\":10000,\"HOSTNAME\":\"RohitCho-mbpr15\"}"}
This is the output from topic in ksql:
ksql> print 'json-log' from beginning;
Format:STRING
9/19/18 4:09:42 PM EDT , {"schema":null,"payload":null} , {"schema":{"type":"string","optional":false},"payload":"{\x5C"@timestamp\x5C":\x5C"2018-09-18T15:07:43.455-04:00\x5C",\x5C"@version\x5C":1,\x5C"message\x5C":\x5C"Debug message\x5C",\x5C"logger_name\x5C":\x5C"jsonLogger\x5C",\x5C"thread_name\x5C":\x5C"main\x5C",\x5C"level\x5C":\x5C"DEBUG\x5C",\x5C"level_value\x5C":10000,\x5C"HOSTNAME\x5C":\x5C"RohitCho-mbpr15\x5C"}"}
Here is my connector definition:
{
"name": "json-file-src",
"config": {
"connector.class": "FileStreamSource",
"file": "/Users/rohit.chowdhary/tmp/json-log.json",
"topic": "json-log",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter"
}
}
Hi, wanted to see if there is any resolution to this issue?
Or if it is really an issue?
Is there a workaround?
Thanks
Hey, I also have this issue at work. I should be an AVRO format, but is a string thing. Was not able to fix it! Now I try to get things work with json and not bother with the format and schema stuff. Please help
@rdinkel can you post details of the version you're running, a sample of the actual message from the topic (using e.g. kafkacat), and what you're seeing in KSQL
yes, of course. ASAP I get to work tomorrow morning.
Short Info about setup. It is a demo for an evaluation for a new project. 1x postgres, 1x mongodb. We want to join the CDC streams and look up data to enrich it. Currently the postgres holds a table "comments" and mongo the "customers". The following is just enough to get data from postgres to kafka and ksql.
.env
DEBEZIUM_VERSION=0.9.0.Beta1
docker-compose:
services:
zookeeper:
image: debezium/zookeeper:${DEBEZIUM_VERSION}
ports:
- 2181:2181
- 2888:2888
- 3888:3888
networks:
app_net:
ipv4_address: 172.18.18.05
kafka:
image: debezium/kafka:${DEBEZIUM_VERSION}
ports:
- 9092:9092
networks:
app_net:
ipv4_address: 172.18.18.06
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
postgres:
image: debezium/postgres:11-alpine
networks:
app_net:
ipv4_address: 172.18.18.60
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
mongodb:
image: debezium/example-mongodb:0.9
hostname: mongodb
ports:
- 27017:27017
networks:
app_net:
ipv4_address: 172.18.18.65
environment:
- MONGODB_USER=debezium
- MONGODB_PASSWORD=dbz
connect:
image: debezium/connect:${DEBEZIUM_VERSION}
ports:
- 8083:8083
networks:
app_net:
ipv4_address: 172.18.18.07
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
- VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
- INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
- LOG_LEVEL=ERROR
- CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
- CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
schema-registry:
image: confluentinc/cp-schema-registry
ports:
- 8181:8181
- 8081:8081
networks:
app_net:
ipv4_address: 172.18.18.08
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
ksql-server:
image: confluentinc/cp-ksql-server:5.0.1
hostname: ksql-server
networks:
app_net:
ipv4_address: 172.18.18.70
depends_on:
- kafka
- connect
ports:
- "8088:8088"
environment:
KSQL_CONFIG_DIR: "/etc/ksql"
KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"
KSQL_BOOTSTRAP_SERVERS: "kafka:9092"
KSQL_HOST_NAME: ksql-server
KSQL_APPLICATION_ID: "cp-all-in-one"
KSQL_LISTENERS: "http://0.0.0.0:8088"
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
ksql-cli:
image: confluentinc/cp-ksql-cli:5.0.1
networks:
app_net:
ipv4_address: 172.18.18.75
depends_on:
- kafka
- connect
- ksql-server
entrypoint: /bin/sh
tty: true
networks:
app_net:
driver: bridge
driver_opts:
com.docker.network.enable_ipv6: "false"
ipam:
config:
- subnet: 172.18.18.0/24
Postgres Connector:
{
"name": "comments-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"database.server.name": "dbserver1",
"database.whitelist": "comments",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope"
}
}
add the comments table
create table comments
(
id serial not null,
comment VARCHAR(255),
customer_id int default 1
);
create unique index comments_id_uindex
on comments (id);
alter table comments
add constraint comments_pk
primary key (id);
Add Connector config:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @kafka_connectors/postgres_register_comments.json
Start ksql shell
docker-compose exec ksql-cli ksql http://ksql-server:8088
kafkacat command:
docker run --tty --interactive --network demo_app_net --rm confluentinc/cp-kafkacat kafkacat -b kafka:9092 -C -f '\nKey (%K bytes): %k\t\nValue (%S bytes): %s\n\Partition: %p\tOffset: %o\n--\n' -t dbserver1.public.comments
kafkacat output:
Key (6 bytes):
Value (16 bytes):
asdads
Partition: 0 Offset: 0
--
% Reached end of topic dbserver1.public.comments [0] at offset 1
Key (6 bytes):
Value (33 bytes): .Hello, This is Patrick!
Partition: 0 Offset: 1
--
% Reached end of topic dbserver1.public.comments [0] at offset 2
Key (6 bytes):
Value (32 bytes): ,Hello, This is Sponge!
Partition: 0 Offset: 2
--
% Reached end of topic dbserver1.public.comments [0] at offset 3
Key (6 bytes):
Value (31 bytes):*Hello, This is Doggo!
Partition: 0 Offset: 3
--
% Reached end of topic dbserver1.public.comments [0] at offset 4
Key (6 bytes):
Value (33 bytes):
.Hello, This is Sparta!!
Partition: 0 Offset: 4
--
% Reached end of topic dbserver1.public.comments [0] at offset 5
Add Comment:
INSERT INTO comments(comment, customer_id)
VALUES ('Hello, This is Sparta!!', 5);
KSQL
ksql> PRINT 'dbserver1.public.comments' FROM BEGINNING;
Format:STRING
12/18/18 8:42:37 AM UTC , , \x00\x00\x00\x00\x02\x02\x02\x0Casdads\x02\x02
12/18/18 8:43:28 AM UTC , , \x00\x00\x00\x00\x02\x04\x02.Hello, This is Patrick!\x02\x02
12/18/18 8:45:33 AM UTC , , \x00\x00\x00\x00\x02\x06\x02,Hello, This is Sponge!\x02\x06
12/18/18 8:46:04 AM UTC , , \x00\x00\x00\x00\x02\x08\x02*Hello, This is Doggo!\x02\x0A
12/18/18 9:01:08 AM UTC ,
, \x00\x00\x00\x00\x02\x0A\x02.Hello, This is Sparta!!\x02\x0A
I dont know where the dot symbol comes from, in front of the world Hello.
Also I do miss the primary key and customer_id.
@rdinkel thanks, this is great info. From the looks of things certainly it should just be Avro coming through from Connect into the topic, and thus handled fine by KSQL.
Since it's Avro, I would expect kafkacat to show 'funny' characters since it's not plain text.
What's the output from kafka-avro-console-consumer against the topic?
recently registered schema value
curl -X GET http://localhost:8081/subjects/dbserver1.public.comments-value/versions/latest
{"subject":"dbserver1.public.comments-value","version":1,"id":2,"schema":"{\"type\":\"record\",\"name\":\"Value\",\"namespace\":\"dbserver1.public.comments\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"comment\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"customer_id\",\"type\":[\"null\",\"int\"],\"default\":null}],\"connect.name\":\"dbserver1.public.comments.Value\"}"}
recently registered schema key
curl -X GET http://localhost:8081/subjects/dbserver1.public.comments-key/versions/latest
{"subject":"dbserver1.public.comments-key","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"dbserver1.public.comments\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"}],\"connect.name\":\"dbserver1.public.comments.Key\"}"}
docker-compose exec schema-registry /usr/bin/kafka-avro-console-consumer \
> --bootstrap-server kafka:9092 \
> --from-beginning \
> --property print.key=true \
> --property schema.registry.url=http://schema-registry:8081 \
> --topic dbserver1.public.comments
[2018-12-18 09:42:43,443] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2018-12-18 09:42:43,757] INFO ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [kafka:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = console-consumer-55265
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
(org.apache.kafka.clients.consumer.ConsumerConfig)
[2018-12-18 09:42:43,831] INFO Kafka version : 2.0.1-cp1 (org.apache.kafka.common.utils.AppInfoParser)
[2018-12-18 09:42:43,831] INFO Kafka commitId : 815feb8a888d39d9 (org.apache.kafka.common.utils.AppInfoParser)
[2018-12-18 09:42:43,972] INFO Cluster ID: 6neLt4bCRnqCWUimz4O4yg (org.apache.kafka.clients.Metadata)
[2018-12-18 09:42:43,973] INFO [Consumer clientId=consumer-1, groupId=console-consumer-55265] Discovered group coordinator 172.18.18.6:9092 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-12-18 09:42:43,975] INFO [Consumer clientId=consumer-1, groupId=console-consumer-55265] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-12-18 09:42:43,976] INFO [Consumer clientId=consumer-1, groupId=console-consumer-55265] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-12-18 09:42:43,998] INFO [Consumer clientId=consumer-1, groupId=console-consumer-55265] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-12-18 09:42:44,000] INFO [Consumer clientId=consumer-1, groupId=console-consumer-55265] Setting newly assigned partitions [dbserver1.public.comments-0] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-12-18 09:42:44,032] INFO [Consumer clientId=consumer-1, groupId=console-consumer-55265] Resetting offset for partition dbserver1.public.comments-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher)
{"id":1} {"id":1,"comment":{"string":"Hello, This is Sparta!!"},"customer_id":{"int":5}}
@rdinkel interesting. So does this work?
CREATE STREAM COMMENTS WITH (KAFKA_TOPIC='dbserver1.public.comments', VALUE_FORMAT='AVRO');
DESCRIBE COMMENTS;
SET 'auto.offset.reset'='earliest';
SELECT * FROM COMMENTS;
it doesnt.
ksql> CREATE STREAM COMMENTS WITH (KAFKA_TOPIC='dbserver1.public.comments', VALUE_FORMAT='AVRO');
Unable to verify if the Avro schema for topic dbserver1.public.comments is compatible with KSQL.
Reason: Connection refused (Connection refused)
Connection is ok, I guess. I can still do this:
ksql> PRINT 'dbserver1.public.comments' FROM BEGINNING;
Format:STRING
12/18/18 10:10:55 AM UTC , , \x00\x00\x00\x00\x02\x02\x02,Hello, This is Doggo!!\x02\x0A
Probably this issue is connected to https://github.com/confluentinc/ksql/issues/1691 ? But unfortunately a restart doesnt help.
@rmoff Can you provide a working example/demo from a fresh setup, please?
I've reproduced this behaviour, and it occurs when the KSQL Server cannot reach the Schema Registry. I've logged #2293 suggesting that KSQL should warn the user if it fails to deserialise what it has identified as Avro data.
@rdinkel so to your specific issue; your Schema Registry is not reachable. This can actually be seen in the CREATE STREAM that you tried to run:
Reason: Connection refused (Connection refused)
Looking at your Docker Compose, you've not specified the Schema Registry for KSQL Server, and so it will be defaulting to localhost I think. You can verify this in the KSQL server logs, which in my environment read:
[2018-12-19 21:21:00,817] INFO KafkaAvroDeserializerConfig values:
schema.registry.url = [http://schema-registry:8081]
See here for an example of the stack with Schema Registry correctly configured.
@rmoff literally been chasing my tail on this for entirely long, until I read your recommendation to register schema-registry with ksql. Thank you so much, glad I stumbled upon this.
My data was also coming in and displaying it as a string, but soon as I applied the schema.registry.url it displays as avro and as expected.
@rmoff the link does not work anymore. Please update : https://github.com/confluentinc/demo-scene/blob/e05ca9b85497bde042bf268e1f7604f43c4bc554/cos/docker-compose.yml#L54
Summary, adding this line worked for me :
ksql-server:
...
environment :
...
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
...
Thanks
Most helpful comment
@rdinkel so to your specific issue; your Schema Registry is not reachable. This can actually be seen in the
CREATE STREAMthat you tried to run:Looking at your Docker Compose, you've not specified the Schema Registry for KSQL Server, and so it will be defaulting to
localhostI think. You can verify this in the KSQL server logs, which in my environment read:See here for an example of the stack with Schema Registry correctly configured.