Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20554#discussion_r167807584
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
    @@ -0,0 +1,410 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +import java.io._
    +import java.nio.charset.StandardCharsets
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.commons.io.IOUtils
    +import org.apache.kafka.common.TopicPartition
    +
    +import org.apache.spark.SparkEnv
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler.ExecutorCacheTaskLocation
    +import org.apache.spark.sql.{Row, SparkSession}
    +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
    +import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
    +import org.apache.spark.sql.sources.v2.DataSourceOptions
    +import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory}
    +import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
    +import org.apache.spark.sql.types.StructType
    +import org.apache.spark.util.UninterruptibleThread
    +
    +/**
    + * A [[MicroBatchReader]] that reads data from Kafka.
    + *
    + * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this 
source that contains
    + * a map of TopicPartition -> offset. Note that this offset is 1 + 
(available offset). For
    + * example if the last record in a Kafka topic "t", partition 2 is offset 
5, then
    + * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is 
done keep it consistent
    + * with the semantics of `KafkaConsumer.position()`.
    + *
    + * Zero data lost is not guaranteed when topics are deleted. If zero data 
lost is critical, the user
    + * must make sure all messages in a topic have been processed when 
deleting a topic.
    + *
    + * There is a known issue caused by KAFKA-1894: the query using Kafka 
maybe cannot be stopped.
    + * To avoid this issue, you should make sure stopping the query before 
stopping the Kafka brokers
    + * and not use wrong broker addresses.
    + */
    +private[kafka010] class KafkaMicroBatchReader(
    +    kafkaOffsetReader: KafkaOffsetReader,
    +    executorKafkaParams: ju.Map[String, Object],
    +    options: DataSourceOptions,
    +    metadataPath: String,
    +    startingOffsets: KafkaOffsetRangeLimit,
    +    failOnDataLoss: Boolean)
    +  extends MicroBatchReader with Logging {
    +
    +  type PartitionOffsetMap = Map[TopicPartition, Long]
    +
    +  private var startPartitionOffsets: PartitionOffsetMap = _
    +  private var endPartitionOffsets: PartitionOffsetMap = _
    +
    +  private val pollTimeoutMs = options.getLong(
    +    "kafkaConsumer.pollTimeoutMs",
    +    SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
    +
    +  private val maxOffsetsPerTrigger =
    +    Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
    +
    +  /**
    +   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
    +   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
    +   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
    +   */
    +  private lazy val initialPartitionOffsets = 
getOrCreateInitialPartitionOffsets()
    +
    +  override def setOffsetRange(start: ju.Optional[Offset], end: 
ju.Optional[Offset]): Unit = {
    +    // Make sure initialPartitionOffsets is initialized
    +    initialPartitionOffsets
    +
    +    startPartitionOffsets = Option(start.orElse(null))
    +        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
    +        .getOrElse(initialPartitionOffsets)
    +
    +    endPartitionOffsets = Option(end.orElse(null))
    +        .map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
    +        .getOrElse {
    +          val latestPartitionOffsets = 
kafkaOffsetReader.fetchLatestOffsets()
    +          maxOffsetsPerTrigger.map { maxOffsets =>
    +            rateLimit(maxOffsets, startPartitionOffsets, 
latestPartitionOffsets)
    +          }.getOrElse {
    +            latestPartitionOffsets
    +          }
    +        }
    +  }
    +
    +  override def createDataReaderFactories(): 
ju.List[DataReaderFactory[Row]] = {
    +    // Find the new partitions, and get their earliest offsets
    +    val newPartitions = 
endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
    +    val newPartitionOffsets = 
kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
    +    if (newPartitionOffsets.keySet != newPartitions) {
    +      // We cannot get from offsets for some partitions. It means they got 
deleted.
    +      val deletedPartitions = 
newPartitions.diff(newPartitionOffsets.keySet)
    +      reportDataLoss(
    +        s"Cannot find earliest offsets of ${deletedPartitions}. Some data 
may have been missed")
    +    }
    +    logInfo(s"Partitions added: $newPartitionOffsets")
    +    newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
    +      reportDataLoss(
    +        s"Added partition $p starts from $o instead of 0. Some data may 
have been missed")
    +    }
    +
    +    // Find deleted partitions, and report data loss if required
    +    val deletedPartitions = 
startPartitionOffsets.keySet.diff(endPartitionOffsets.keySet)
    +    if (deletedPartitions.nonEmpty) {
    +      reportDataLoss(s"$deletedPartitions are gone. Some data may have 
been missed")
    +    }
    +
    +    // Use the until partitions to calculate offset ranges to ignore 
partitions that have
    +    // been deleted
    +    val topicPartitions = endPartitionOffsets.keySet.filter { tp =>
    +      // Ignore partitions that we don't know the from offsets.
    +      newPartitionOffsets.contains(tp) || 
startPartitionOffsets.contains(tp)
    +    }.toSeq
    +    logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
    +
    +    val sortedExecutors = getSortedExecutorList()
    +    val numExecutors = sortedExecutors.length
    +    logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
    +
    +    // Calculate offset ranges
    +    val factories = topicPartitions.flatMap { tp =>
    +      val fromOffset = startPartitionOffsets.get(tp).getOrElse {
    +        newPartitionOffsets.getOrElse(
    +        tp, {
    +          // This should not happen since newPartitionOffsets contains all 
partitions not in
    +          // fromPartitionOffsets
    +          throw new IllegalStateException(s"$tp doesn't have a from 
offset")
    +        })
    +      }
    +      val untilOffset = endPartitionOffsets(tp)
    +
    +      if (untilOffset >= fromOffset) {
    +        // This allows cached KafkaConsumers in the executors to be 
re-used to read the same
    +        // partition in every batch.
    +        val preferredLoc = if (numExecutors > 0) {
    +          Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
    +        } else None
    +        val range = KafkaOffsetRange(tp, fromOffset, untilOffset)
    +        Some(
    +          new KafkaMicroBatchDataReaderFactory(
    +            range, preferredLoc, executorKafkaParams, pollTimeoutMs, 
failOnDataLoss))
    +      } else {
    +        reportDataLoss(
    +          s"Partition $tp's offset was changed from " +
    +            s"$fromOffset to $untilOffset, some data may have been missed")
    +        None
    +      }
    +    }
    +    factories.map(_.asInstanceOf[DataReaderFactory[Row]]).asJava
    +  }
    +
    +  override def getStartOffset: Offset = {
    +    KafkaSourceOffset(startPartitionOffsets)
    +  }
    +
    +  override def getEndOffset: Offset = {
    +    KafkaSourceOffset(endPartitionOffsets)
    +  }
    +
    +  override def deserializeOffset(json: String): Offset = {
    +    KafkaSourceOffset(JsonUtils.partitionOffsets(json))
    +  }
    +
    +  override def readSchema(): StructType = KafkaOffsetReader.kafkaSchema
    +
    +  override def commit(end: Offset): Unit = {}
    +
    +  override def stop(): Unit = {}
    --- End diff --
    
    This method should close the reader.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to