Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15102#discussion_r82030589
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 ---
    @@ -0,0 +1,396 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.kafka010
    +
    +import java.{util => ju}
    +
    +import scala.collection.JavaConverters._
    +import scala.util.control.NonFatal
    +
    +import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer}
    +import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
    +import org.apache.kafka.common.TopicPartition
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.scheduler.ExecutorCacheTaskLocation
    +import org.apache.spark.sql._
    +import org.apache.spark.sql.execution.streaming._
    +import org.apache.spark.sql.kafka010.KafkaSource._
    +import org.apache.spark.sql.types._
    +import org.apache.spark.util.UninterruptibleThread
    +
    +/**
    + * A [[Source]] that uses Kafka's own [[KafkaConsumer]] API to reads data 
from Kafka. The design
    + * for this source is as follows.
    + *
    + * - The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this 
source that contains
    + *   a map of TopicPartition -> offset. Note that this offset is 1 + 
(available offset). For
    + *   example if the last record in a Kafka topic "t", partition 2 is 
offset 5, then
    + *   KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is 
done keep it consistent
    + *   with the semantics of `KafkaConsumer.position()`.
    + *
    + * - The [[ConsumerStrategy]] class defines which Kafka topics and 
partitions should be read
    + *   by this source. These strategies directly correspond to the different 
consumption options
    + *   in . This class is designed to return a configured [[KafkaConsumer]] 
that is used by the
    + *   [[KafkaSource]] to query for the offsets. See the docs on
    + *   [[org.apache.spark.sql.kafka010.KafkaSource.ConsumerStrategy]] for 
more details.
    + *
    + * - The [[KafkaSource]] written to do the following.
    + *
    + *  - As soon as the source is created, the pre-configured KafkaConsumer 
returned by the
    + *    [[ConsumerStrategy]] is used to query the initial offsets that this 
source should
    + *    start reading from. This used to create the first batch.
    + *
    + *   - `getOffset()` uses the KafkaConsumer to query the latest available 
offsets, which are
    + *     returned as a [[KafkaSourceOffset]].
    + *
    + *   - `getBatch()` returns a DF that reads from the 'start offset' until 
the 'end offset' in
    + *     for each partition. The end offset is excluded to be consistent 
with the semantics of
    + *     [[KafkaSourceOffset]] and `KafkaConsumer.position()`.
    + *
    + *   - The DF returned is based on [[KafkaSourceRDD]] which is constructed 
such that the
    + *     data from Kafka topic + partition is consistently read by the same 
executors across
    + *     batches, and cached KafkaConsumers in the executors can be reused 
efficiently. See the
    + *     docs on [[KafkaSourceRDD]] for more details.
    + *
    + * Zero data lost is not guaranteed when topics are deleted. If zero data 
lost is critical, the user
    + * must make sure all messages in a topic have been processed when 
deleting a topic.
    + *
    + * There is a known issue caused by KAFKA-1894: the query using 
KafkaSource maybe cannot be stopped.
    + * To avoid this issue, you should make sure stopping the query before 
stopping the Kafka brokers
    + * and not use wrong broker addresses.
    + */
    +private[kafka010] case class KafkaSource(
    +    sqlContext: SQLContext,
    +    consumerStrategy: ConsumerStrategy,
    +    executorKafkaParams: ju.Map[String, Object],
    +    sourceOptions: Map[String, String],
    +    metadataPath: String,
    +    failOnDataLoss: Boolean)
    +  extends Source with Logging {
    +
    +  private val sc = sqlContext.sparkContext
    +
    +  private val pollTimeoutMs = 
sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong
    +
    +  private val maxOffsetFetchAttempts =
    +    sourceOptions.getOrElse("fetchOffset.numRetries", "3").toInt
    +
    +  private val offsetFetchAttemptIntervalMs =
    +    sourceOptions.getOrElse("fetchOffset.retryIntervalMs", "10").toLong
    +
    +  /**
    +   * A KafkaConsumer used in the driver to query the latest Kafka offsets. 
This only queries the
    +   * offsets and never commits them.
    +   */
    +  private val consumer = consumerStrategy.createConsumer()
    +
    +  /**
    +   * Lazy set initialPartitionOffsets to make sure only call 
`KafkaConsumer.poll` in
    +   * StreamExecutionThread. Otherwise, interrupting a thread running 
`KafkaConsumer.poll` may hang
    +   * forever (KAFKA-1894).
    +   */
    +  private lazy val initialPartitionOffsets = {
    +    val metadataLog = new 
HDFSMetadataLog[KafkaSourceOffset](sqlContext.sparkSession, metadataPath)
    +    metadataLog.get(0).getOrElse {
    +      val offsets = KafkaSourceOffset(fetchPartitionOffsets(seekToEnd = 
false))
    +      metadataLog.add(0, offsets)
    +      logInfo(s"Initial offsets: $offsets")
    +      offsets
    +    }.partitionToOffsets
    +  }
    +
    +  override def schema: StructType = KafkaSource.kafkaSchema
    +
    +  /** Returns the maximum available offset for this source. */
    +  override def getOffset: Option[Offset] = {
    +    // Make sure initialPartitionOffsets is set
    +    initialPartitionOffsets
    +
    +    val offset = KafkaSourceOffset(fetchPartitionOffsets(seekToEnd = true))
    +    logDebug(s"GetOffset: 
${offset.partitionToOffsets.toSeq.map(_.toString).sorted}")
    +    Some(offset)
    +  }
    +
    +  /**
    +   * Returns the data that is between the offsets
    +   * [`start.get.partitionToOffsets`, `end.partitionToOffsets`), i.e. 
end.partitionToOffsets is
    +   * exclusive.
    +   */
    +  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
    +    // Make sure initialPartitionOffsets is set
    +    initialPartitionOffsets
    +
    +    logInfo(s"GetBatch called with start = $start, end = $end")
    +    val untilPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(end)
    +    val fromPartitionOffsets = start match {
    +      case Some(prevBatchEndOffset) =>
    +        KafkaSourceOffset.getPartitionOffsets(prevBatchEndOffset)
    +      case None =>
    +        initialPartitionOffsets
    +    }
    +
    +    // Find the new partitions, and get their earliest offsets
    +    val newPartitions = 
untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet)
    +    val newPartitionOffsets = if (newPartitions.nonEmpty) {
    +      fetchNewPartitionEarliestOffsets(newPartitions.toSeq)
    +    } else {
    +      Map.empty[TopicPartition, Long]
    +    }
    +    if (newPartitionOffsets.keySet != newPartitions) {
    +      // We cannot get from offsets for some partitions. It means they got 
deleted.
    +      val deletedPartitions = 
newPartitions.diff(newPartitionOffsets.keySet)
    +      reportDataLoss(
    +        s"Cannot find earliest offsets of ${deletedPartitions}. Some data 
may have been missed")
    +    }
    +    logInfo(s"Partitions added: $newPartitionOffsets")
    +    newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
    +      reportDataLoss(
    +        s"Added partition $p starts from $o instead of 0. Some data may 
have been missed")
    +    }
    +
    +    val deletedPartitions = 
fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet)
    +    logWarning(s"$deletedPartitions are gone. Some data may have been 
missed")
    +
    +    // Use the until partitions to calculate offset ranges to ignore 
partitions that have
    +    // been deleted
    +    val topicPartitions = untilPartitionOffsets.keySet.filter { tp =>
    +      // Ignore partitions that we don't know the from offsets.
    +      newPartitionOffsets.contains(tp) || fromPartitionOffsets.contains(tp)
    +    }.toSeq
    +    logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
    +
    +    val sortedExecutors = getSortedExecutorList(sc)
    +    val numExecutors = sortedExecutors.length
    +    logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
    +
    +    // Calculate offset ranges
    +    val offsetRanges = topicPartitions.map { tp =>
    +      val fromOffset = fromPartitionOffsets.get(tp).getOrElse {
    +        newPartitionOffsets.getOrElse(tp, {
    +          // This should not happen since newPartitionOffsets contains all 
partitions not in
    +          // fromPartitionOffsets
    +          throw new IllegalStateException(s"$tp doesn't have a from 
offset")
    +        })
    +      }
    +      val untilOffset = untilPartitionOffsets(tp)
    +      val preferredLoc = if (numExecutors > 0) {
    +        // This allows cached KafkaConsumers in the executors to be 
re-used to read the same
    +        // partition in every batch.
    +        Some(sortedExecutors(floorMod(tp.hashCode, numExecutors)))
    +      } else None
    +      KafkaSourceRDDOffsetRange(tp, fromOffset, untilOffset, preferredLoc)
    +    }.filter { range =>
    +      if (range.untilOffset < range.fromOffset) {
    +        reportDataLoss(s"Partition ${range.topicPartition}'s offset was 
changed from " +
    +          s"${range.fromOffset} to ${range.untilOffset}, some data may 
have been missed")
    +        false
    +      } else {
    +        true
    +      }
    +    }.toArray
    +
    +    // Create a RDD that reads from Kafka and get the (key, value) pair as 
byte arrays.
    +    val rdd = new KafkaSourceRDD(
    +      sc, executorKafkaParams, offsetRanges, pollTimeoutMs).map { cr =>
    +      Row(cr.key, cr.value, cr.topic, cr.partition, cr.offset, 
cr.timestamp, cr.timestampType.id)
    +    }
    +
    +    logInfo("GetBatch generating RDD of offset range: " +
    +      offsetRanges.sortBy(_.topicPartition.toString).mkString(", "))
    +    sqlContext.createDataFrame(rdd, schema)
    +  }
    +
    +  /** Stop this source and free any resources it has allocated. */
    +  override def stop(): Unit = synchronized {
    +    consumer.close()
    +  }
    +
    +  override def toString(): String = s"KafkaSource[$consumerStrategy]"
    +
    +  /**
    +   * Fetch the offset of a partition, either seek to the latest offsets or 
use the current offsets
    +   * in the consumer.
    +   */
    +  private def fetchPartitionOffsets(
    +      seekToEnd: Boolean): Map[TopicPartition, Long] = 
withRetriesWithoutInterrupt {
    +    // Make sure `KafkaConsumer.poll` won't be interrupted (KAFKA-1894)
    +    assert(Thread.currentThread().isInstanceOf[StreamExecutionThread])
    +    // Poll to get the latest assigned partitions
    +    consumer.poll(0)
    +    val partitions = consumer.assignment()
    +    consumer.pause(partitions)
    +    logDebug(s"Partitioned assigned to consumer: $partitions")
    +
    +    // Get the current or latest offset of each partition
    +    if (seekToEnd) {
    +      consumer.seekToEnd(partitions)
    +      logDebug("Seeked to the end")
    +    }
    +    val partitionOffsets = partitions.asScala.map(p => p -> 
consumer.position(p)).toMap
    +    logDebug(s"Got offsets for partition : $partitionOffsets")
    +    partitionOffsets
    +  }
    +
    +  /**
    +   * Fetch the earliest offsets for newly discovered partitions. The 
return results may not contain
    --- End diff --
    
    done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to