HeartSaVioR edited a comment on issue #23747: [SPARK-26848][SQL] Introduce new option to Kafka source: offset by timestamp (starting/ending) URL: https://github.com/apache/spark/pull/23747#issuecomment-462493022 Just ran the query with Kafka 0.10.0. > batch query ``` val df = spark.read.format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "spark_26848_test_v1,spark_26848_test_2_v1") .option("startingOffsetsByTimestamp", """{"spark_26848_test_v1": 1549669142193, "spark_26848_test_2_v1": 1549669240965}""") .option("endingOffsetsByTimestamp", """{"spark_26848_test_v1": 1549669265676, "spark_26848_test_2_v1": 1549699265676}""") .load().selectExpr("CAST(value AS STRING)") df.show() ``` result: ``` 19/02/12 06:01:47 WARN KafkaOffsetReader: Error in attempt 1 getting Kafka offsets: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_OFFSETS with version in range [1,4]. The supported range is [0,0]. 19/02/12 06:01:48 WARN KafkaOffsetReader: Error in attempt 2 getting Kafka offsets: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_OFFSETS with version in range [1,4]. The supported range is [0,0]. 19/02/12 06:01:49 WARN KafkaOffsetReader: Error in attempt 3 getting Kafka offsets: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_OFFSETS with version in range [1,4]. The supported range is [0,0]. org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226) at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:375) at org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchSpecificTimestampBasedOffsets(KafkaOffsetReader.scala:184) at org.apache.spark.sql.kafka010.KafkaRelation.getPartitionOffsets(KafkaRelation.scala:147) at org.apache.spark.sql.kafka010.KafkaRelation.buildScan(KafkaRelation.scala:72) ... Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_OFFSETS with version in range [1,4]. The supported range is [0,0]. ``` > SS query ``` val df = spark.readStream.format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "spark_26848_test_v1,spark_26848_test_2_v1") .option("startingOffsetsByTimestamp", """{"spark_26848_test_v1": 1549669142193, "spark_26848_test_2_v1": 1549669240965}""") .load().selectExpr("CAST(value AS STRING)") val query = df.writeStream .format("console") // <-- use ConsoleSink .option("truncate", false) .option("numRows", 10) .queryName("rate-console") .start() query.awaitTermination() ``` result: ``` 19/02/12 06:09:10 WARN KafkaOffsetReader: Error in attempt 1 getting Kafka offsets: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_OFFSETS with version in range [1,4]. The supported range is [0,0]. 19/02/12 06:09:11 WARN KafkaOffsetReader: Error in attempt 2 getting Kafka offsets: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_OFFSETS with version in range [1,4]. The supported range is [0,0]. 19/02/12 06:09:12 WARN KafkaOffsetReader: Error in attempt 3 getting Kafka offsets: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_OFFSETS with version in range [1,4]. The supported range is [0,0]. 19/02/12 06:09:13 ERROR MicroBatchExecution: Query rate-console [id = 00429123-93dd-4c65-8c74-61aa8eb11f82, runId = c0263eb7-b31f-4ba6-9bd6-d076e89e5803] terminated with error org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_OFFSETS with version in range [1,4]. The supported range is [0,0]. org.apache.spark.sql.streaming.StreamingQueryException: Query rate-console [id = 00429123-93dd-4c65-8c74-61aa8eb11f82, runId = c0263eb7-b31f-4ba6-9bd6-d076e89e5803] terminated with exception: The broker does not support LIST_OFFSETS with version in range [1,4]. The supported range is [0,0]. at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:307) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:198) Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_OFFSETS with version in range [1,4]. The supported range is [0,0]. ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
