Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2991#discussion_r20210320
  
    --- Diff: 
external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala
 ---
    @@ -0,0 +1,251 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.streaming.kafka
    +
    +import java.util.Properties
    +import java.util.concurrent.ConcurrentHashMap
    +
    +import scala.collection.Map
    +import scala.collection.mutable
    +import scala.reflect.{classTag, ClassTag}
    +
    +import kafka.common.TopicAndPartition
    +import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, 
KafkaStream}
    +import kafka.serializer.Decoder
    +import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, 
VerifiableProperties}
    +import org.I0Itec.zkclient.ZkClient
    +
    +import org.apache.spark.{SparkEnv, Logging}
    +import org.apache.spark.storage.{StreamBlockId, StorageLevel}
    +import org.apache.spark.streaming.receiver.{BlockGeneratorListener, 
BlockGenerator, Receiver}
    +import org.apache.spark.util.Utils
    +
    +
    +/**
    + * ReliableKafkaReceiver offers the ability to reliably store data into 
BlockManager without loss.
    + * It is turned off by default and will be enabled when
    + * spark.streaming.receiver.writeAheadLog.enable is true. The difference 
compared to KafkaReceiver
    + * is that this receiver manages topic-partition/offset itself and updates 
the offset information
    + * after data is reliably stored as write-ahead log. Offsets will only be 
updated when data is
    + * reliably stored, so the potential data loss problem of KafkaReceiver 
can be eliminated.
    + *
    + * Note: ReliableKafkaReceiver will set auto.commit.enable to false to 
turn off automatic offset
    + * commit mechanism in Kafka consumer. So setting this configuration 
manually within kafkaParams
    + * will not take effect.
    + */
    +private[streaming]
    +class ReliableKafkaReceiver[
    +  K: ClassTag,
    +  V: ClassTag,
    +  U <: Decoder[_]: ClassTag,
    +  T <: Decoder[_]: ClassTag](
    +    kafkaParams: Map[String, String],
    +    topics: Map[String, Int],
    +    storageLevel: StorageLevel)
    +    extends Receiver[(K, V)](storageLevel) with Logging {
    +
    +  private val groupId = kafkaParams("group.id")
    +
    +  private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
    +
    +  private def conf() = SparkEnv.get.conf
    +
    +  /** High level consumer to connect to Kafka. */
    +  private var consumerConnector: ConsumerConnector = null
    +
    +  /** zkClient to connect to Zookeeper to commit the offsets. */
    +  private var zkClient: ZkClient = null
    +
    +  /**
    +   * A HashMap to manage the offset for each topic/partition, this HashMap 
is called in
    +   * synchronized block, so mutable HashMap will not meet concurrency 
issue.
    +   */
    +  private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, 
Long] = null
    +
    +  /** A concurrent HashMap to store the stream block id and related offset 
snapshot. */
    +  private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, 
Map[TopicAndPartition, Long]] = null
    +
    +  /**
    +   * Manage the BlockGenerator in receiver itself for better managing 
block store and offset
    +   * commit.
    +   */
    +  private var blockGenerator: BlockGenerator = null
    +
    +  /** Kafka offsets checkpoint listener to register into BlockGenerator 
for offsets checkpoint. */
    +  private final class OffsetCheckpointListener extends 
BlockGeneratorListener {
    +
    +    override def onGenerateBlock(blockId: StreamBlockId): Unit = {
    +      // Get a snapshot of current offset map and store with related block 
id. Since this hook
    +      // function is called in synchronized block, so we can get the 
snapshot without explicit lock.
    +      val offsetSnapshot = topicPartitionOffsetMap.toMap
    --- End diff --
    
    There is no synchronization between on the blockGenerator here. Since the 
insertion is synchronized with the block generator, getting the map also has to 
be synchronized on the same object.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to