Github user jerryshao commented on a diff in the pull request:
https://github.com/apache/spark/pull/2991#discussion_r20215912
--- Diff:
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
---
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.util.Properties
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.Map
+import scala.collection.mutable
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.common.TopicAndPartition
+import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector,
KafkaStream}
+import kafka.serializer.Decoder
+import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer,
VerifiableProperties}
+import org.I0Itec.zkclient.ZkClient
+
+import org.apache.spark.{SparkEnv, Logging}
+import org.apache.spark.storage.{StreamBlockId, StorageLevel}
+import org.apache.spark.streaming.receiver.{BlockGeneratorListener,
BlockGenerator, Receiver}
+import org.apache.spark.util.Utils
+
+
+/**
+ * ReliableKafkaReceiver offers the ability to reliably store data into
BlockManager without loss.
+ * It is turned off by default and will be enabled when
+ * spark.streaming.receiver.writeAheadLog.enable is true. The difference
compared to KafkaReceiver
+ * is that this receiver manages topic-partition/offset itself and updates
the offset information
+ * after data is reliably stored as write-ahead log. Offsets will only be
updated when data is
+ * reliably stored, so the potential data loss problem of KafkaReceiver
can be eliminated.
+ *
+ * Note: ReliableKafkaReceiver will set auto.commit.enable to false to
turn off automatic offset
+ * commit mechanism in Kafka consumer. So setting this configuration
manually within kafkaParams
+ * will not take effect.
+ */
+private[streaming]
+class ReliableKafkaReceiver[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag](
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ storageLevel: StorageLevel)
+ extends Receiver[(K, V)](storageLevel) with Logging {
+
+ private val groupId = kafkaParams("group.id")
+
+ private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
+
+ private def conf() = SparkEnv.get.conf
+
+ /** High level consumer to connect to Kafka. */
+ private var consumerConnector: ConsumerConnector = null
+
+ /** zkClient to connect to Zookeeper to commit the offsets. */
+ private var zkClient: ZkClient = null
+
+ /**
+ * A HashMap to manage the offset for each topic/partition, this HashMap
is called in
+ * synchronized block, so mutable HashMap will not meet concurrency
issue.
+ */
+ private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition,
Long] = null
+
+ /** A concurrent HashMap to store the stream block id and related offset
snapshot. */
+ private var blockOffsetMap: ConcurrentHashMap[StreamBlockId,
Map[TopicAndPartition, Long]] = null
+
+ /**
+ * Manage the BlockGenerator in receiver itself for better managing
block store and offset
+ * commit.
+ */
+ private var blockGenerator: BlockGenerator = null
+
+ /** Kafka offsets checkpoint listener to register into BlockGenerator
for offsets checkpoint. */
+ private final class OffsetCheckpointListener extends
BlockGeneratorListener {
+
+ override def onGenerateBlock(blockId: StreamBlockId): Unit = {
+ // Get a snapshot of current offset map and store with related block
id. Since this hook
+ // function is called in synchronized block, so we can get the
snapshot without explicit lock.
+ val offsetSnapshot = topicPartitionOffsetMap.toMap
--- End diff --
Because this hook function `onGenerateBlock` is called in function
`updateCurrentBuffer`, which is synchronized with BlockGenerator object, so I
don't think we need to add another lock here. At the time `updateCurrentBuffer`
is called, because we get a lock, so there is no data will add into
BlockGenerator and insert offset into topicPartitionOffsetMap.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]