[ 
https://issues.apache.org/jira/browse/SPARK-38672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-38672.
----------------------------------
    Resolution: Invalid

> Kafka, spark data frame read fails when a message does not exist for the 
> timestamp in all partitions
> ----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-38672
>                 URL: https://issues.apache.org/jira/browse/SPARK-38672
>             Project: Spark
>          Issue Type: Bug
>          Components: Build
>    Affects Versions: 3.2.1
>            Reporter: gattu shivakumar
>            Priority: Major
>
> *Issue:*
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at 
> org.apache.spark.util.UninterruptibleThreadRunner$$anon$1.run(UninterruptibleThreadRunner.scala:33)
> Caused by: java.lang.AssertionError: *No offset matched from request of 
> topic-partition source_JSON_sgattu_primittive-5 and timestamp 1648013731709.*
> at 
> org.apache.spark.sql.kafka010.KafkaOffsetReaderConsumer.$anonfun$readTimestampOffsets$1(KafkaOffsetReaderConsumer.scala:280)
> at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
> at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:400)
> at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:728)
> at scala.collection.TraversableLike.map(TraversableLike.scala:286)
> To repro the issue, produce few messages to kafka and then try to fetch using 
> below program (Used scala shell)
> bin/kafka-console-producer.sh --bootstrap-server 
> pkc-epwny.eastus.azure.confluent.cloud:9092 --topic 
> source_JSON_sgattu_primittive --producer.config kafkabroker.properties
> {quote}{"ordertime":1497014222380,"orderid":18,"itemid":"Item_184","BinaryField":true}
> {"ordertime":1497014222399,"orderid":19,"itemid":"Item_185","BinaryField":false}
> {quote}
> import org.apache.spark.sql.functions._
> import spark.implicits._
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> val kafkaServer: String = "XXX:9092"
> val topicSampleName: String = "t2"
> val columns=Array("id", "value", "last", "year", "binary", "intbinary")
> val df=sc.parallelize(Seq(
> (1, "John", "Doe", 1986, true, 1),
> (2, "Ive", "Fish", 1990, false, 0),
> (4, "John", "Wayne", 1995, false, 1)
> )).toDF(columns: _*)
> spark.read.format("kafka").option("kafka.bootstrap.servers", 
> kafkaServer).option("kafka.sasl.jaas.config", 
> "org.apache.kafka.common.security.plain.PlainLoginModule required 
> username='XXX' password='XXX';")
> .option("kafka.sasl.mechanism", "PLAIN")
> .option("kafka.security.protocol", "SASL_SSL")
> .option("kafka.bootstrap.servers", "XXX:9092")
> .option("subscribe", "kafkaTopic")
> .option("startingOffsetsByTimestamp", 
> """\{"source_JSON_sgattu_primittive":{"0":1648013731709,"5":1648013731709,"4":1648013731709,"1":1648013731709,"2":1648013731709,"3":1648013731709}}""")
> .load()



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to