[ https://issues.apache.org/jira/browse/SPARK-38672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jungtaek Lim resolved SPARK-38672. ---------------------------------- Resolution: Invalid > Kafka, spark data frame read fails when a message does not exist for the > timestamp in all partitions > ---------------------------------------------------------------------------------------------------- > > Key: SPARK-38672 > URL: https://issues.apache.org/jira/browse/SPARK-38672 > Project: Spark > Issue Type: Bug > Components: Build > Affects Versions: 3.2.1 > Reporter: gattu shivakumar > Priority: Major > > *Issue:* > at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at > org.apache.spark.util.UninterruptibleThreadRunner$$anon$1.run(UninterruptibleThreadRunner.scala:33) > Caused by: java.lang.AssertionError: *No offset matched from request of > topic-partition source_JSON_sgattu_primittive-5 and timestamp 1648013731709.* > at > org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$readTimestampOffsets$1(KafkaOffsetReaderConsumer.scala:280) > at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) > at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:400) > at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728) > at scala.collection.TraversableLike.map(TraversableLike.scala:286) > To repro the issue, produce few messages to kafka and then try to fetch using > below program (Used scala shell) > bin/kafka-console-producer.sh --bootstrap-server > pkc-epwny.eastus.azure.confluent.cloud:9092 --topic > source_JSON_sgattu_primittive --producer.config kafkabroker.properties > {quote}{"ordertime":1497014222380,"orderid":18,"itemid":"Item_184","BinaryField":true} > {"ordertime":1497014222399,"orderid":19,"itemid":"Item_185","BinaryField":false} > {quote} > import org.apache.spark.sql.functions._ > import spark.implicits._ > val sqlContext = new org.apache.spark.sql.SQLContext(sc) > val kafkaServer: String = "XXX:9092" > val topicSampleName: String = "t2" > val columns=Array("id", "value", "last", "year", "binary", "intbinary") > val df=sc.parallelize(Seq( > (1, "John", "Doe", 1986, true, 1), > (2, "Ive", "Fish", 1990, false, 0), > (4, "John", "Wayne", 1995, false, 1) > )).toDF(columns: _*) > spark.read.format("kafka").option("kafka.bootstrap.servers", > kafkaServer).option("kafka.sasl.jaas.config", > "org.apache.kafka.common.security.plain.PlainLoginModule required > username='XXX' password='XXX';") > .option("kafka.sasl.mechanism", "PLAIN") > .option("kafka.security.protocol", "SASL_SSL") > .option("kafka.bootstrap.servers", "XXX:9092") > .option("subscribe", "kafkaTopic") > .option("startingOffsetsByTimestamp", > """\{"source_JSON_sgattu_primittive":{"0":1648013731709,"5":1648013731709,"4":1648013731709,"1":1648013731709,"2":1648013731709,"3":1648013731709}}""") > .load() -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org