Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/2991#discussion_r20247329
--- Diff:
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
---
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka
+
+import java.util.Properties
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.Map
+import scala.collection.mutable
+import scala.reflect.{classTag, ClassTag}
+
+import kafka.common.TopicAndPartition
+import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector,
KafkaStream}
+import kafka.serializer.Decoder
+import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer,
VerifiableProperties}
+import org.I0Itec.zkclient.ZkClient
+
+import org.apache.spark.{SparkEnv, Logging}
+import org.apache.spark.storage.{StreamBlockId, StorageLevel}
+import org.apache.spark.streaming.receiver.{BlockGeneratorListener,
BlockGenerator, Receiver}
+import org.apache.spark.util.Utils
+
+
+/**
+ * ReliableKafkaReceiver offers the ability to reliably store data into
BlockManager without loss.
+ * It is turned off by default and will be enabled when
+ * spark.streaming.receiver.writeAheadLog.enable is true. The difference
compared to KafkaReceiver
+ * is that this receiver manages topic-partition/offset itself and updates
the offset information
+ * after data is reliably stored as write-ahead log. Offsets will only be
updated when data is
+ * reliably stored, so the potential data loss problem of KafkaReceiver
can be eliminated.
+ *
+ * Note: ReliableKafkaReceiver will set auto.commit.enable to false to
turn off automatic offset
+ * commit mechanism in Kafka consumer. So setting this configuration
manually within kafkaParams
+ * will not take effect.
+ */
+private[streaming]
+class ReliableKafkaReceiver[
+ K: ClassTag,
+ V: ClassTag,
+ U <: Decoder[_]: ClassTag,
+ T <: Decoder[_]: ClassTag](
+ kafkaParams: Map[String, String],
+ topics: Map[String, Int],
+ storageLevel: StorageLevel)
+ extends Receiver[(K, V)](storageLevel) with Logging {
+
+ private val groupId = kafkaParams("group.id")
+
+ private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
+
+ private def conf() = SparkEnv.get.conf
+
+ /** High level consumer to connect to Kafka. */
+ private var consumerConnector: ConsumerConnector = null
+
+ /** zkClient to connect to Zookeeper to commit the offsets. */
+ private var zkClient: ZkClient = null
+
+ /**
+ * A HashMap to manage the offset for each topic/partition, this HashMap
is called in
+ * synchronized block, so mutable HashMap will not meet concurrency
issue.
+ */
+ private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition,
Long] = null
+
+ /** A concurrent HashMap to store the stream block id and related offset
snapshot. */
+ private var blockOffsetMap: ConcurrentHashMap[StreamBlockId,
Map[TopicAndPartition, Long]] = null
+
+ /**
+ * Manage the BlockGenerator in receiver itself for better managing
block store and offset
+ * commit.
+ */
+ private var blockGenerator: BlockGenerator = null
+
+ /** Kafka offsets checkpoint listener to register into BlockGenerator
for offsets checkpoint. */
+ private final class OffsetCheckpointListener extends
BlockGeneratorListener {
+
+ override def onGenerateBlock(blockId: StreamBlockId): Unit = {
+ // Get a snapshot of current offset map and store with related block
id. Since this hook
+ // function is called in synchronized block, so we can get the
snapshot without explicit lock.
+ val offsetSnapshot = topicPartitionOffsetMap.toMap
--- End diff --
Aaaah, I get it but this is soooo non-intuitive. That synchronized is no in
this file. This is a hard logic to understand. Maybe we should separate out the
locked functionality into two locks. The lock of the BlockGenerator is used to
replace the ArrayBuffer in the BlockGenerator, and in the
ReliableKafkaReceiver, another lock is used to update the offsets. Even though
there are two locks, thats a cleaner design as there is a clean separation of
functionality in the locks - the BlockGenerator lock does not need to be
concerned with the Receiver lock, and the Receiver lock should not have to
worry about locks in BlockGenerator (as long as deadlock is avoided). To avoid
deadlocks, the callbacks should not be called from within synchornized sections
in the BlockGenerator.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]