HeartSaVioR edited a comment on issue #23747: [SPARK-26848][SQL] Introduce new 
option to Kafka source: offset by timestamp (starting/ending)
URL: https://github.com/apache/spark/pull/23747#issuecomment-462493022
 
 
   Just ran the query with Kafka 0.10.0.
   
   > batch query
   
   ```
   val df = spark.read.format("kafka")
     .option("kafka.bootstrap.servers", "localhost:9092")
     .option("subscribe", "spark_26848_test_v1,spark_26848_test_2_v1")
     .option("startingOffsetsByTimestamp", """{"spark_26848_test_v1": 
1549669142193, "spark_26848_test_2_v1": 1549669240965}""")
     .option("endingOffsetsByTimestamp", """{"spark_26848_test_v1": 
1549669265676, "spark_26848_test_2_v1": 1549699265676}""")
     .load().selectExpr("CAST(value AS STRING)")
   
   df.show()
   ```
   
   result:
   
   ```
   19/02/12 06:01:47 WARN KafkaOffsetReader: Error in attempt 1 getting Kafka 
offsets:
   org.apache.kafka.common.errors.UnsupportedVersionException: The broker does 
not support LIST_OFFSETS with version in range [1,4]. The supported range is 
[0,0].
   19/02/12 06:01:48 WARN KafkaOffsetReader: Error in attempt 2 getting Kafka 
offsets:
   org.apache.kafka.common.errors.UnsupportedVersionException: The broker does 
not support LIST_OFFSETS with version in range [1,4]. The supported range is 
[0,0].
   19/02/12 06:01:49 WARN KafkaOffsetReader: Error in attempt 3 getting Kafka 
offsets:
   org.apache.kafka.common.errors.UnsupportedVersionException: The broker does 
not support LIST_OFFSETS with version in range [1,4]. The supported range is 
[0,0].
   org.apache.spark.SparkException: Exception thrown in awaitResult:
     at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
     at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:375)
     at 
org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchSpecificTimestampBasedOffsets(KafkaOffsetReader.scala:184)
     at 
org.apache.spark.sql.kafka010.KafkaRelation.getPartitionOffsets(KafkaRelation.scala:147)
     at 
org.apache.spark.sql.kafka010.KafkaRelation.buildScan(KafkaRelation.scala:72)
   ...
   Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: The 
broker does not support LIST_OFFSETS with version in range [1,4]. The supported 
range is [0,0].
   ```
   
   > SS query
   
   ```
   val df = spark.readStream.format("kafka")
     .option("kafka.bootstrap.servers", "localhost:9092")
     .option("subscribe", "spark_26848_test_v1,spark_26848_test_2_v1")
     .option("startingOffsetsByTimestamp", """{"spark_26848_test_v1": 
1549669142193, "spark_26848_test_2_v1": 1549669240965}""")
     .load().selectExpr("CAST(value AS STRING)")
   
   val query = df.writeStream
     .format("console")  // <-- use ConsoleSink
     .option("truncate", false)
     .option("numRows", 10)
     .queryName("rate-console")
     .start()
   
   query.awaitTermination()
   ```
   
   result:
   
   ```
   19/02/12 06:09:10 WARN KafkaOffsetReader: Error in attempt 1 getting Kafka 
offsets:
   org.apache.kafka.common.errors.UnsupportedVersionException: The broker does 
not support LIST_OFFSETS with version in range [1,4]. The supported range is 
[0,0].
   19/02/12 06:09:11 WARN KafkaOffsetReader: Error in attempt 2 getting Kafka 
offsets:
   org.apache.kafka.common.errors.UnsupportedVersionException: The broker does 
not support LIST_OFFSETS with version in range [1,4]. The supported range is 
[0,0].
   19/02/12 06:09:12 WARN KafkaOffsetReader: Error in attempt 3 getting Kafka 
offsets:
   org.apache.kafka.common.errors.UnsupportedVersionException: The broker does 
not support LIST_OFFSETS with version in range [1,4]. The supported range is 
[0,0].
   19/02/12 06:09:13 ERROR MicroBatchExecution: Query rate-console [id = 
00429123-93dd-4c65-8c74-61aa8eb11f82, runId = 
c0263eb7-b31f-4ba6-9bd6-d076e89e5803] terminated with error
   org.apache.kafka.common.errors.UnsupportedVersionException: The broker does 
not support LIST_OFFSETS with version in range [1,4]. The supported range is 
[0,0].
   org.apache.spark.sql.streaming.StreamingQueryException: Query rate-console 
[id = 00429123-93dd-4c65-8c74-61aa8eb11f82, runId = 
c0263eb7-b31f-4ba6-9bd6-d076e89e5803] terminated with exception: The broker 
does not support LIST_OFFSETS with version in range [1,4]. The supported range 
is [0,0].
     at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:307)
     at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:198)
   Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: The 
broker does not support LIST_OFFSETS with version in range [1,4]. The supported 
range is [0,0].
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to