Nick Hryhoriev created SPARK-31427:
--------------------------------------

             Summary: Spark Structure streaming read data twice per every 
micro-batch.
                 Key: SPARK-31427
                 URL: https://issues.apache.org/jira/browse/SPARK-31427
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 2.4.3
            Reporter: Nick Hryhoriev


I have a very strange issue with spark structure streaming. Spark structure 
streaming creates two spark jobs for every micro-batch. As a result, read data 
from Kafka twice. Here is a simple code snippet.

 
{code:java}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger

object CheckHowSparkReadFromKafka {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder()
      .config(new SparkConf()
        .setAppName(s"simple read from kafka with repartition")
        .setMaster("local[*]")
        .set("spark.driver.host", "localhost"))
      .getOrCreate()
    val testPath = "/tmp/spark-test"
    FileSystem.get(session.sparkContext.hadoopConfiguration).delete(new 
Path(testPath), true)
    import session.implicits._
    val stream = session
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers",        "kafka-20002-prod:9092")
      .option("subscribe", "topic")
      .option("maxOffsetsPerTrigger", 1000)
      .option("failOnDataLoss", false)
      .option("startingOffsets", "latest")
      .load()
      .repartitionByRange( $"offset")
      .writeStream
      .option("path", testPath + "/data")
      .option("checkpointLocation", testPath + "/checkpoint")
      .format("parquet")
      .trigger(Trigger.ProcessingTime(10.seconds))
      .start()
    stream.processAllAvailable()
{code}
This happens because if {{.repartitionByRange( $"offset")}}, if I remove this 
line, all good. But with spark create two jobs, one with 1 stage just read from 
Kafka, the second with 3 stage read -> shuffle -> write. So the result of the 
first job never used.

This has a significant impact on performance. Some of my Kafka topics have 1550 
partitions, so read them twice is a big deal. In case I add cache, things going 
better, but this is not a way for me. In local mode, the first job in batch 
takes less than 0.1 ms, except batch with index 0. But in YARN cluster and 
Messos both jobs fully expected and on my topics take near 1.2 min.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to