Pulsar: sql can not query

Created on 27 Jul 2020  路  4Comments  路  Source: apache/pulsar

  1. standalone.conf
managedLedgerMinLedgerRolloverTimeMinutes=1
managedLedgerMaxEntriesPerLedger=500
managedLedgerOffloadDriver=filesystem
fileSystemURI=hdfs://127.0.0.1:9001
fileSystemProfilePath=/usr/local/Cellar/apache-pulsar-2.6.0/conf/filesystem_offload_core_site.xml
offloadersDirectory=/usr/local/Cellar/apache-pulsar-2.6.0/offloaders
  1. bin/pulsar standalone
  2. write some data to my-user-json-topic1
  3. bin/pulsar sql-worker run
  4. bin/pulsar sql
  5. when i execute in presto
presto> select count(*) from pulsar."public/default"."my-user-json-topic1";
 _col0 
-------
  3499 
(1 row)
  1. **
    bin/pulsar-admin namespaces set-offload-threshold --size 100M public/default
    8.bin/pulsar-admin namespaces policies public/default
    get policies info
    {
    "offload_threshold" : 104857600,
    "schema_auto_update_compatibility_strategy" : "Full",
    "schema_compatibility_strategy" : "UNDEFINED",
    "is_allow_auto_update_schema" : true,
    "schema_validation_enforced" : false,
    "offload_policies" : {
    "offloadersDirectory" : "/usr/local/Cellar/apache-pulsar-2.6.0/offloaders",
    "managedLedgerOffloadDriver" : "filesystem",
    "managedLedgerOffloadMaxThreads" : 2,
    "managedLedgerOffloadPrefetchRounds" : 1,
    "managedLedgerOffloadThresholdInBytes" : 104857600,
    "s3ManagedLedgerOffloadMaxBlockSizeInBytes" : 67108864,
    "s3ManagedLedgerOffloadReadBufferSizeInBytes" : 1048576,
    "s3ManagedLedgerOffloadRoleSessionName" : "pulsar-s3-offload",
    "gcsManagedLedgerOffloadMaxBlockSizeInBytes" : 67108864,
    "gcsManagedLedgerOffloadReadBufferSizeInBytes" : 1048576,
    "fileSystemProfilePath" : "/usr/local/Cellar/apache-pulsar-2.6.0/conf/filesystem_offload_core_site.xml",
    "fileSystemURI" : "hdfs://127.0.0.1:9001",
    "s3Driver" : false,
    "gcsDriver" : false,
    "fileSystemDriver" : true
    }
    }

9.now i execute some sql
presto> select count(*) from pulsar."public/default"."my-user-json-topic1";

Query 20200727_121319_00003_f75ji, RUNNING, 1 node, 17 splits
0:09 [ 0 rows, 0B] [ 0 rows/s, 0B/s] [>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>] 0%

 STAGES   ROWS  ROWS/s  BYTES  BYTES/s  QUEUED    RUN   DONE

0.........R 0 0 0B 0B 0 17 0
1.......R 0 0 0B 0B 0 0 0

get some
2020-07-27T20:13:19.275+0800 ERROR remote-task-callback-10 io.airlift.concurrent.BoundedExecutor Task failed
java.lang.IllegalArgumentException: com.facebook.presto.server.TaskUpdateRequest could not be converted to JSON
at io.airlift.json.JsonCodec.toJsonBytes(JsonCodec.java:214)
at io.airlift.http.client.JsonBodyGenerator.(JsonBodyGenerator.java:32)
at io.airlift.http.client.JsonBodyGenerator.jsonBodyGenerator(JsonBodyGenerator.java:27)
at com.facebook.presto.server.remotetask.HttpRemoteTask.sendUpdate(HttpRemoteTask.java:504)
at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:78)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.fasterxml.jackson.databind.JsonMappingException: No serializer found for class org.apache.pulsar.common.policies.data.OffloadPolicies and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: com.facebook.presto.server.TaskUpdateRequest["sources"]->com.google.common.collect.SingletonImmutableList[0]->com.facebook.presto.TaskSource["splits"]->com.google.common.collect.RegularImmutableSet[0]->com.facebook.presto.ScheduledSplit["split"]->com.facebook.presto.metadata.Split["connectorSplit"]->org.apache.pulsar.sql.presto.PulsarSplit["offloadPolicies"])
at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:284)
at com.fasterxml.jackson.databind.SerializerProvider.mappingException(SerializerProvider.java:1110)
at com.fasterxml.jackson.databind.SerializerProvider.reportMappingProblem(SerializerProvider.java:1135)
at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.failForEmpty(UnknownSerializer.java:69)
at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.serialize(UnknownSerializer.java:32)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:704)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:689)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeWithType(BeanSerializerBase.java:580)
at com.facebook.presto.metadata.AbstractTypedJacksonModule$InternalTypeSerializer.serialize(AbstractTypedJacksonModule.java:115)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:704)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:689)
at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:155)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:704)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:689)
at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:155)
at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serializeContents(CollectionSerializer.java:149)
at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serialize(CollectionSerializer.java:112)
at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serialize(CollectionSerializer.java:25)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:704)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:689)
at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:155)
at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContents(IndexedListSerializer.java:119)
at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:79)
at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:18)
at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:704)
at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:689)
at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:155)
at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:292)
at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:3697)
at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsBytes(ObjectMapper.java:3097)
at io.airlift.json.JsonCodec.toJsonBytes(JsonCodec.java:211)
... 7 more

why? it seams that namespaces's policies effect the sql query

in addtion,the data can also offload to hdfs~~ bu can not query the offload data

releas2.6.1 triagweek-31 typbug

All 4 comments

@Shenfeng1011 thank you for reporting this issue! @codelipenghui will take a look at this issue.

Fixed the issue in #7701, will close it

@Shenfeng1011 is this fixed for you now? I am trying to run the presto connector with offloading enabled and appear to be getting the same exception:

2020-09-17T19:49:39.093Z    ERROR   remote-task-callback-11 io.airlift.concurrent.BoundedExecutor   Task failed
java.lang.IllegalArgumentException: io.prestosql.server.TaskUpdateRequest could not be converted to JSON
    at io.airlift.json.JsonCodec.toJsonBytes(JsonCodec.java:214)
    at io.prestosql.server.remotetask.HttpRemoteTask.sendUpdate(HttpRemoteTask.java:513)
    at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:78)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class org.apache.pulsar.common.policies.data.OffloadPolicies and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: io.prestosql.server.TaskUpdateRequest["sources"]->com.google.common.collect.SingletonImmutableList[0]->io.prestosql.execution.TaskSource["splits"]->com.google.common.collect.RegularImmutableSet[0]->io.prestosql.execution.ScheduledSplit["split"]->io.prestosql.metadata.Split["connectorSplit"]->org.apache.pulsar.sql.presto.PulsarSplit["offloadPolicies"])
    at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77)
    at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1191)
    at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:404)
    at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.failForEmpty(UnknownSerializer.java:71)
    at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.serialize(UnknownSerializer.java:33)
    at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:727)
    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:722)
    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeWithType(BeanSerializerBase.java:607)
    at io.prestosql.metadata.AbstractTypedJacksonModule$InternalTypeSerializer.serialize(AbstractTypedJacksonModule.java:115)
    at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:727)
    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:722)
    at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:166)
    at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:727)
    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:722)
    at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:166)
    at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serializeContents(CollectionSerializer.java:145)
    at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serialize(CollectionSerializer.java:107)
    at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serialize(CollectionSerializer.java:25)
    at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:727)
    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:722)
    at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:166)
    at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContents(IndexedListSerializer.java:119)
    at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:79)
    at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:18)
    at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:727)
    at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:722)
    at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:166)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
    at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:400)
    at com.fasterxml.jackson.databind.ObjectWriter$Prefetch.serialize(ObjectWriter.java:1429)
    at com.fasterxml.jackson.databind.ObjectWriter._configAndWriteValue(ObjectWriter.java:1135)
    at com.fasterxml.jackson.databind.ObjectWriter.writeValueAsBytes(ObjectWriter.java:1029)
    at io.airlift.json.JsonCodec.toJsonBytes(JsonCodec.java:211)
    ... 5 more

I do have a slightly different setup though. I am installing presto using prestosql v332 tarball and then installing the pulsar-presto-connector tarball that gets built in /pulsar-sql/presto-plusar-plugin.

My presto config for the pulsar connector looks like:

connector.name=pulsar
pulsar.broker-service-url=http://my-pulsar-broker:8080
pulsar.zookeeper-uri=my-pulsar-zookeeper:2181
pulsar.max-entry-read-batch-size=100
pulsar.target-num-splits=2
pulsar.max-split-message-queue-size=10000
pulsar.max-split-entry-queue-size=1000
pulsar.managed-ledger-offload-driver=s3
pulsar.offloaders-directory=/pulsar/offloaders
pulsar.managed-ledger-offload-max-threads=2
pulsar.offloader-properties = \
 {"s3ManagedLedgerOffloadBucket": "pulsar-offload", \
 "s3ManagedLedgerOffloadServiceEndpoint": "http://my-minio:9000", \
 "s3ManagedLedgerOffloadRegion": "us-east-1"}
pulsar.namespace-delimiter-rewrite-enable=false
pulsar.rewrite-namespace-delimiter=/

@Shenfeng1011 is this fixed for you now? I am trying to run the presto connector with offloading enabled and appear to be getting the same exception:

2020-09-17T19:49:39.093Z  ERROR   remote-task-callback-11 io.airlift.concurrent.BoundedExecutor   Task failed
java.lang.IllegalArgumentException: io.prestosql.server.TaskUpdateRequest could not be converted to JSON
  at io.airlift.json.JsonCodec.toJsonBytes(JsonCodec.java:214)
  at io.prestosql.server.remotetask.HttpRemoteTask.sendUpdate(HttpRemoteTask.java:513)
  at io.airlift.concurrent.BoundedExecutor.drainQueue(BoundedExecutor.java:78)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class org.apache.pulsar.common.policies.data.OffloadPolicies and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: io.prestosql.server.TaskUpdateRequest["sources"]->com.google.common.collect.SingletonImmutableList[0]->io.prestosql.execution.TaskSource["splits"]->com.google.common.collect.RegularImmutableSet[0]->io.prestosql.execution.ScheduledSplit["split"]->io.prestosql.metadata.Split["connectorSplit"]->org.apache.pulsar.sql.presto.PulsarSplit["offloadPolicies"])
  at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77)
  at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1191)
  at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:404)
  at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.failForEmpty(UnknownSerializer.java:71)
  at com.fasterxml.jackson.databind.ser.impl.UnknownSerializer.serialize(UnknownSerializer.java:33)
  at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:727)
  at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:722)
  at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeWithType(BeanSerializerBase.java:607)
  at io.prestosql.metadata.AbstractTypedJacksonModule$InternalTypeSerializer.serialize(AbstractTypedJacksonModule.java:115)
  at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:727)
  at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:722)
  at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:166)
  at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:727)
  at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:722)
  at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:166)
  at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serializeContents(CollectionSerializer.java:145)
  at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serialize(CollectionSerializer.java:107)
  at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serialize(CollectionSerializer.java:25)
  at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:727)
  at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:722)
  at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:166)
  at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serializeContents(IndexedListSerializer.java:119)
  at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:79)
  at com.fasterxml.jackson.databind.ser.impl.IndexedListSerializer.serialize(IndexedListSerializer.java:18)
  at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:727)
  at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:722)
  at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:166)
  at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
  at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:400)
  at com.fasterxml.jackson.databind.ObjectWriter$Prefetch.serialize(ObjectWriter.java:1429)
  at com.fasterxml.jackson.databind.ObjectWriter._configAndWriteValue(ObjectWriter.java:1135)
  at com.fasterxml.jackson.databind.ObjectWriter.writeValueAsBytes(ObjectWriter.java:1029)
  at io.airlift.json.JsonCodec.toJsonBytes(JsonCodec.java:211)
  ... 5 more

I do have a slightly different setup though. I am installing presto using prestosql v332 tarball and then installing the pulsar-presto-connector tarball that gets built in /pulsar-sql/presto-plusar-plugin.

My presto config for the pulsar connector looks like:

connector.name=pulsar
pulsar.broker-service-url=http://my-pulsar-broker:8080
pulsar.zookeeper-uri=my-pulsar-zookeeper:2181
pulsar.max-entry-read-batch-size=100
pulsar.target-num-splits=2
pulsar.max-split-message-queue-size=10000
pulsar.max-split-entry-queue-size=1000
pulsar.managed-ledger-offload-driver=s3
pulsar.offloaders-directory=/pulsar/offloaders
pulsar.managed-ledger-offload-max-threads=2
pulsar.offloader-properties = \
 {"s3ManagedLedgerOffloadBucket": "pulsar-offload", \
 "s3ManagedLedgerOffloadServiceEndpoint": "http://my-minio:9000", \
 "s3ManagedLedgerOffloadRegion": "us-east-1"}
pulsar.namespace-delimiter-rewrite-enable=false
pulsar.rewrite-namespace-delimiter=/

this issue is resolved in 2.6.1 but i have not verified .you can have a try ,

Was this page helpful?
0 / 5 - 0 ratings