Ksql: Add DECIMAL support to KSQL

Created on 2 Mar 2018  路  14Comments  路  Source: confluentinc/ksql

Without decimal support, it's not possible to use KSQL to perform arithmetic operations on decimals, which are likely important in financial data, or metrics coming from IOT devices, etc.

enhancement

Most helpful comment

I have a connect job that uses the JDBC Source, and pulls data from Oracle. Unfortunately many of the fields use the Oracle Number "variant" type which I wrap in my sql in the connect job to reduce the decimal size. but I end up with a schema like this for a column

{"name":"MINIUM_INVOICE_AMOUNT","type":["null",{"type":"bytes","scale":6,"precision":64,"connect.version":1,"connect.parameters":{"scale":"6"},"connect.name":"org.apache.kafka.connect.data.Decimal","logicalType":"decimal"}]
When I try to create a stream I get the error
Could not fetch the AVRO schema from schema registry. Cannot find correct type for avro type: bytes

Is there a work around for decimal data types as bytes with Avro? It limits the usability of KSQL with Avro if decimals are not usable.

All 14 comments

Some quick thoughts...

Decimal support would first require:

  • Connect to support decimals - KSQL internally uses the Connect schema model, and this model does not currently support decimals.
  • Schema registry to support decimals - The Avro schema has a decimal logical type. I've not checked, by wouldn't be surprised if the SR does not have any special handling for logical types. Also, from my own experience, I've found that the Avro decimal support is a little lacking, (see below).

The main issue with Avro's decimal support comes from the fact that _decimal_ is a _logical type_ in Avro, not a first class type. The logical type is a way of _interpreting_ another type. In the case of _decimal_ the underlying type is either _bytes_ or _fixed_. Scale and precision information are not stored within the serialized type, only within the schema.

With Avro you can write and read the data using different schemas. Avro will check the schemas are compatible and throw an exception if they are not. It also provides compatibility checking methods that can be used by tools like the Schema Registry. Unfortunately, as of the time of writing Avro's schema evolution / compatibility checks do not validate logical types. The upshot of this is that you can write a decimal of "12.34", (schema has scale of two), and later read with a schema where the scale is different, say three, and Avro will happily deserialize the value as "123.4" i.e. Avro is not protecting against data corruption. This is covered by AVRO-2078 and AVRO-1721.

Until these Jiras are fixed we'd need special handling of decimals in the SR to ensure correct evolution checks are in place, and potentially additional checks in Avro deserializers to again ensure write/read schema compitiblity.

IMHO, not representing the scale in the actual data in some way is a bug.

If Avro were to fix the above Jiras and implement correct schema compatibility / evolution checks, it would mean that it is simply not possible to ever change the scale, (and possible precision), of a decimal. It would be possible to implement readers that could accept different scales, e.g. read with a scale of 3 what was written with a scale of 2. But it would not be possible to _evolve_ a Avro schema that included a change of scale. Doing so will result in data corruption.

I've created AVRO-2164 to get community feed back on potentially promoting Decimal to a first class type.

Thanks for the additional info, Andy.

One option we have, in the first iteration of this functionality, is to not support decimals when using Avro. This would reduce the dependency of the first iteration to just Kafka Connect.

@big-andy-coates Correction to your not -- Connect does support decimals, as well as other logical types, it just does so in a similar way to Avro. Decimals should translate into Avro decimals just fine (the SR doesn't have any code specifically for handling Decimal, but the Avro Converter does).

I have a connect job that uses the JDBC Source, and pulls data from Oracle. Unfortunately many of the fields use the Oracle Number "variant" type which I wrap in my sql in the connect job to reduce the decimal size. but I end up with a schema like this for a column

{"name":"MINIUM_INVOICE_AMOUNT","type":["null",{"type":"bytes","scale":6,"precision":64,"connect.version":1,"connect.parameters":{"scale":"6"},"connect.name":"org.apache.kafka.connect.data.Decimal","logicalType":"decimal"}]
When I try to create a stream I get the error
Could not fetch the AVRO schema from schema registry. Cannot find correct type for avro type: bytes

Is there a work around for decimal data types as bytes with Avro? It limits the usability of KSQL with Avro if decimals are not usable.

We are experimenting KSQL and trying to create STREAMS and TABLES on topic and encountered all the Oracle NUMBER(14,3) data types converted to bytes by the schema registry. I would like to create a stream and a table to run some aggregations on the NUMBER data types.
Which data type to be selected while creating a STREAM in KSQL for the byte type column. I have tried DOUBLE for the Amount column but the stream output shows null for all the values for that column. Appreciate your help.

CREATE STREAM FIN_STREAM (FIN_CODE varchar, FIN_ID varchar, FIN_STATUS varchar, FIN_AMT double ) WITH (KAFKA_TOPIC='my-topic-name ', VALUE_FORMAT='AVRO', KEY='FIN_CODE');

Schema registry data type for FIN_AMT is {\"name\":\"FIN_AMT\",\"type\":[\"null\",{\"type\":\"bytes\",\"scale\":3,\"precision\":64,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"3\"},\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"}],\"default\":null}

Would this ticket also address the other logical types? Or should I open a separate ticket for that?
We heavily make use of timestamp-millis for various types of timestamps and currently KSQL is unusable for us (together with #638). It also feels like a natural fit to modeling events (but I'm also interested if you think otherwise).

Example:

{
  "type": "record",
  "name": "some_event",
  "fields": [
    {
      "name": "event_id",
      "type": "string"
    },
    {
      "name": "event_timestamp",
      "type": {"type": "long", "logicalType": "timestamp-millis"}
    },
    {
      "name": "receiving_timestamp",
      "type": {"type": "long", "logicalType": "timestamp-millis"}
    }
  ]
}

Any resoultion for this error? I have a byte[] data type from debezium cdc that is not getting understood by KSQL and KStreams

Unable to verify the AVRO schema is compatible with KSQL. Unsupported Schema type: bytes

This would be a great feature. At the moment we cannot use KSQL for things where double is not sufficient. Are there plans to add it?

@sebastianvoss From what I understand, the problem lies with AVRO and their support for the datatype. Once they have support for it, then changes can be made for Kafka to support it.

@KieranDevvs , no you misunderstood what @big-andy-coates wrote in his second post.

To rephrase: Avro has a decimal type, but Kafka connect has its own type system, which has no decimal support. KSQL reuses the type system and is dependent on Kafka connect to implement it first. It's also not clear to Andy what the impact on the SR are (although I cannot see an issue tbh).

Any update on this?

Should we do #3593 as part of the role out of DECIMALs?

Update on this - KSQL will support decimals in the next release! I will close this ticket out, but please open specific bug reports/feature requests as the feature is still a little rough around the edges.

Was this page helpful?
0 / 5 - 0 ratings

Related issues

rmoff picture rmoff  路  3Comments

bhargav516 picture bhargav516  路  3Comments

rmoff picture rmoff  路  3Comments

eshil-patel picture eshil-patel  路  4Comments

hjafarpour picture hjafarpour  路  5Comments