Pulsar: [pulsar-sql] Support arrays and maps

Created on 17 Jul 2019  ·  3Comments  ·  Source: apache/pulsar

Is your feature request related to a problem? Please describe.
Currently pulsar-sql seems not to support columns of type array and map. Such columns are not shown querying for show columns from pulsar."tenant/ns".topic; nor are they available in SELECT queries. This makes it impossible to fetch arrays and maps using pulsar-sql.

Describe the solution you'd like
IMHO it would be a nice addition to support arrays and maps so that they can be read using Presto. Even better if they could be queried using Presto's built in tools.

componenpython componenschemaregistry triagweek-36 typfeature

Most helpful comment

The reason is same as issues-7652 :

  1. PulsarMetadata.getColumns() , nested field is dissociate with presto ParameterizedType in TypeManager . nested field should be Row type in presto (reference Hive struct type support https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types#LanguageManualTypes-ComplexTypes)
  2. SchemaHandler is hard to work with RecordCursor.getObject() to support ROW,MAP,ARRAY .etc

so , I haved do some reconstruct in my Local Branch ,the main change is

  • PulsarMetadata sociated with presto TypeManager
  • Deprecate SchemaHandler , migrate to presto-record-decoder with a bit of extension
  • decoupled pulsar-presto main module ( RecordSet,ConnectorMetadata .etc ) with org.apache.avro.Schema-> coupled with org.apache.pulsar.common.schema.SchemaInfo, aim to friendly with other schema type ( PBthrift etc..)

I accomplished this code and test on my local environment ,@sijie @jerrypeng Is anyone else doing same thing ?

 presto> show create table pulsar."test-tenant/test-namespace".avroata;

 CREATE TABLE pulsar."test-tenant/test-namespace".avroata (
    name varchar COMMENT '["null","string"]',
    age integer COMMENT '"int"',
    childrens array(varchar) COMMENT '["null",{"type":"array","items":"string","java-class":"java.util.List"}]',
    teachers map(varchar, varchar) COMMENT '["null",{"type":"map","values":"string"}]',
    parent ROW(father varchar, mother varchar) COMMENT '["null",{"type":"record","name":"Parent","namespace":"com.hnail.pulsar.AvroGen"
    __partition__ integer COMMENT 'The partition number which the message belongs to',
    __event_time__ timestamp(3) COMMENT 'Application defined timestamp in milliseconds of when the event occurred',
    __publish_time__ timestamp(3) COMMENT 'The timestamp in milliseconds of when event as published',
    __message_id__ varchar COMMENT 'The message ID of the message used to generate this row',
    __sequence_id__ bigint COMMENT 'The sequence ID of the message used to generate this row',
    __producer_name__ varchar COMMENT 'The name of the producer that publish the message used to generate this row',
    __key__ varchar COMMENT 'The partition key for the topic',
    __properties__ varchar COMMENT 'User defined properties'
 )
(1 row)

Query 20200826_083759_00000_neuwa, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
9.18 [0 rows, 0B] [0 rows/s, 0B/s]

presto> select * from pulsar."test-tenant/test-namespace".avroata limit 3;
   name   | age |     childrens     |                   teachers                   |              parent              | __partition__ |
----------+-----+-------------------+----------------------------------------------+----------------------------------+---------------+
 Student1 |  23 | [zhangsan, lisi]  | {yuwen=yuwen_value, shuxue=shuxue_value}     | {father=father1, mother=mother1} |             2 |
 Student2 |  55 | [wangwu, fengliu] | {shuxue2=shuxue2_value, yuwen2=yuwen2_value} | {father=father2, mother=mother2} |             2 |
 Student1 |  23 | [zhangsan, lisi]  | {yuwen=yuwen_value, shuxue=shuxue_value}     | {father=father1, mother=mother1} |             0 |
(3 rows)

presto> select childrens[1],teachers['yuwen'] from pulsar."test-tenant/test-namespace".avroata limit 1;
  _col0   |    _col1
----------+-------------
 zhangsan | yuwen_value
(1 row)

Query 20200826_114004_00004_kz734, FINISHED, 1 node

presto> select parent.father from pulsar."test-tenant/test-namespace".avroata limit 3;
 father
---------
 father1
 father2
 father1
(3 rows)

Query 20200826_113712_00000_kz734, FINISHED, 1 node

All 3 comments

The reason is same as issues-7652 :

  1. PulsarMetadata.getColumns() , nested field is dissociate with presto ParameterizedType in TypeManager . nested field should be Row type in presto (reference Hive struct type support https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types#LanguageManualTypes-ComplexTypes)
  2. SchemaHandler is hard to work with RecordCursor.getObject() to support ROW,MAP,ARRAY .etc

so , I haved do some reconstruct in my Local Branch ,the main change is

  • PulsarMetadata sociated with presto TypeManager
  • Deprecate SchemaHandler , migrate to presto-record-decoder with a bit of extension
  • decoupled pulsar-presto main module ( RecordSet,ConnectorMetadata .etc ) with org.apache.avro.Schema-> coupled with org.apache.pulsar.common.schema.SchemaInfo, aim to friendly with other schema type ( PBthrift etc..)

I accomplished this code and test on my local environment ,@sijie @jerrypeng Is anyone else doing same thing ?

 presto> show create table pulsar."test-tenant/test-namespace".avroata;

 CREATE TABLE pulsar."test-tenant/test-namespace".avroata (
    name varchar COMMENT '["null","string"]',
    age integer COMMENT '"int"',
    childrens array(varchar) COMMENT '["null",{"type":"array","items":"string","java-class":"java.util.List"}]',
    teachers map(varchar, varchar) COMMENT '["null",{"type":"map","values":"string"}]',
    parent ROW(father varchar, mother varchar) COMMENT '["null",{"type":"record","name":"Parent","namespace":"com.hnail.pulsar.AvroGen"
    __partition__ integer COMMENT 'The partition number which the message belongs to',
    __event_time__ timestamp(3) COMMENT 'Application defined timestamp in milliseconds of when the event occurred',
    __publish_time__ timestamp(3) COMMENT 'The timestamp in milliseconds of when event as published',
    __message_id__ varchar COMMENT 'The message ID of the message used to generate this row',
    __sequence_id__ bigint COMMENT 'The sequence ID of the message used to generate this row',
    __producer_name__ varchar COMMENT 'The name of the producer that publish the message used to generate this row',
    __key__ varchar COMMENT 'The partition key for the topic',
    __properties__ varchar COMMENT 'User defined properties'
 )
(1 row)

Query 20200826_083759_00000_neuwa, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
9.18 [0 rows, 0B] [0 rows/s, 0B/s]

presto> select * from pulsar."test-tenant/test-namespace".avroata limit 3;
   name   | age |     childrens     |                   teachers                   |              parent              | __partition__ |
----------+-----+-------------------+----------------------------------------------+----------------------------------+---------------+
 Student1 |  23 | [zhangsan, lisi]  | {yuwen=yuwen_value, shuxue=shuxue_value}     | {father=father1, mother=mother1} |             2 |
 Student2 |  55 | [wangwu, fengliu] | {shuxue2=shuxue2_value, yuwen2=yuwen2_value} | {father=father2, mother=mother2} |             2 |
 Student1 |  23 | [zhangsan, lisi]  | {yuwen=yuwen_value, shuxue=shuxue_value}     | {father=father1, mother=mother1} |             0 |
(3 rows)

presto> select childrens[1],teachers['yuwen'] from pulsar."test-tenant/test-namespace".avroata limit 1;
  _col0   |    _col1
----------+-------------
 zhangsan | yuwen_value
(1 row)

Query 20200826_114004_00004_kz734, FINISHED, 1 node

presto> select parent.father from pulsar."test-tenant/test-namespace".avroata limit 3;
 father
---------
 father1
 father2
 father1
(3 rows)

Query 20200826_113712_00000_kz734, FINISHED, 1 node

The reason is same as issues-7652 :

  1. PulsarMetadata.getColumns() , nested field is dissociate with presto ParameterizedType in TypeManager . nested field should be Row type in presto (reference Hive struct type support https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types#LanguageManualTypes-ComplexTypes)
  2. SchemaHandler is hard to work with RecordCursor.getObject() to support ROW,MAP,ARRAY .etc

so , I haved do some reconstruct in my Local Branch ,the main change is

  • PulsarMetadata sociated with presto TypeManager
  • Deprecate SchemaHandler , migrate to presto-record-decoder with a bit of extension
  • decoupled pulsar-presto main module ( RecordSet,ConnectorMetadata .etc ) with org.apache.avro.Schema-> coupled with org.apache.pulsar.common.schema.SchemaInfo, aim to friendly with other schema type ( PBthrift etc..)

I accomplished this code and test on my local environment ,@sijie @jerrypeng Is anyone else doing same thing ?

 presto> show create table pulsar."test-tenant/test-namespace".avroata;

 CREATE TABLE pulsar."test-tenant/test-namespace".avroata (
    name varchar COMMENT '["null","string"]',
    age integer COMMENT '"int"',
    childrens array(varchar) COMMENT '["null",{"type":"array","items":"string","java-class":"java.util.List"}]',
    teachers map(varchar, varchar) COMMENT '["null",{"type":"map","values":"string"}]',
    parent ROW(father varchar, mother varchar) COMMENT '["null",{"type":"record","name":"Parent","namespace":"com.hnail.pulsar.AvroGen"
    __partition__ integer COMMENT 'The partition number which the message belongs to',
    __event_time__ timestamp(3) COMMENT 'Application defined timestamp in milliseconds of when the event occurred',
    __publish_time__ timestamp(3) COMMENT 'The timestamp in milliseconds of when event as published',
    __message_id__ varchar COMMENT 'The message ID of the message used to generate this row',
    __sequence_id__ bigint COMMENT 'The sequence ID of the message used to generate this row',
    __producer_name__ varchar COMMENT 'The name of the producer that publish the message used to generate this row',
    __key__ varchar COMMENT 'The partition key for the topic',
    __properties__ varchar COMMENT 'User defined properties'
 )
(1 row)

Query 20200826_083759_00000_neuwa, FINISHED, 1 node
Splits: 1 total, 1 done (100.00%)
9.18 [0 rows, 0B] [0 rows/s, 0B/s]

presto> select * from pulsar."test-tenant/test-namespace".avroata limit 3;
   name   | age |     childrens     |                   teachers                   |              parent              | __partition__ |
----------+-----+-------------------+----------------------------------------------+----------------------------------+---------------+
 Student1 |  23 | [zhangsan, lisi]  | {yuwen=yuwen_value, shuxue=shuxue_value}     | {father=father1, mother=mother1} |             2 |
 Student2 |  55 | [wangwu, fengliu] | {shuxue2=shuxue2_value, yuwen2=yuwen2_value} | {father=father2, mother=mother2} |             2 |
 Student1 |  23 | [zhangsan, lisi]  | {yuwen=yuwen_value, shuxue=shuxue_value}     | {father=father1, mother=mother1} |             0 |
(3 rows)

presto> select childrens[1],teachers['yuwen'] from pulsar."test-tenant/test-namespace".avroata limit 1;
  _col0   |    _col1
----------+-------------
 zhangsan | yuwen_value
(1 row)

Query 20200826_114004_00004_kz734, FINISHED, 1 node

presto> select parent.father from pulsar."test-tenant/test-namespace".avroata limit 3;
 father
---------
 father1
 father2
 father1
(3 rows)

Query 20200826_113712_00000_kz734, FINISHED, 1 node

As @hnail mentioned earlier, he is working on this issue right now.

I commited my _WIP prototype code_ https://github.com/hnail/pulsar/commit/1c67266918168d0c373a2839384362960b5afe4a without unit-tests to my private repository instead of pull request to official master repository . Because i think the change may contain some structural adjustment and beyond expectation .
so is there pulsar-sql committer or familiar with pulsar-sql can help preview the code and discuss with me ? appreciate you if can give me some suggests or opinions , glad you call me on slack @hnail or comment on this issues , I will detailedly explain the modifications ?


code Index:
SchemaInfo -> RowDecoder(PulsarRecordCursor) : PulsarDispatchingRowDecoderFactory.create(SchemaInfo schemaInfo, Set columns)
SchemaInfo -> ColumnMetadata( PulsarMetadata): PulsarDispatchingRowDecoderFactory. extractColumnMetadata(SchemaInfo schemaInfo, PulsarColumnHandle.HandleKeyValueType handleKeyValueType)


code reference presto-kafka , With the purpose of fix issues-7652 \ issues-4747 and support other non-avro schema encoder。

Was this page helpful?
0 / 5 - 0 ratings