Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16686#discussion_r99227617
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
    @@ -71,94 +77,152 @@ private[kafka010] class KafkaSourceProvider extends 
StreamSourceProvider
             .map { k => k.drop(6).toString -> parameters(k) }
             .toMap
     
    -    val deserClassName = classOf[ByteArrayDeserializer].getName
    -    // Each running query should use its own group id. Otherwise, the 
query may be only assigned
    -    // partial data since Kafka will assign partitions to multiple 
consumers having the same group
    -    // id. Hence, we should generate a unique id for each query.
    -    val uniqueGroupId = 
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
    -
    -    val startingOffsets =
    +    val startingStreamOffsets =
           
caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) 
match {
             case Some("latest") => LatestOffsets
             case Some("earliest") => EarliestOffsets
             case Some(json) => 
SpecificOffsets(JsonUtils.partitionOffsets(json))
             case None => LatestOffsets
           }
     
    -    val kafkaParamsForDriver =
    -      ConfigUpdater("source", specifiedKafkaParams)
    -        .set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
    -        .set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
deserClassName)
    -
    -        // Set to "earliest" to avoid exceptions. However, KafkaSource 
will fetch the initial
    -        // offsets by itself instead of counting on KafkaConsumer.
    -        .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    -
    -        // So that consumers in the driver does not commit offsets 
unnecessarily
    -        .set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
    -
    -        // So that the driver does not pull too much data
    -        .set(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, new 
java.lang.Integer(1))
    -
    -        // If buffer config is not set, set it to reasonable value to work 
around
    -        // buffer issues (see KAFKA-3135)
    -        .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
    -        .build()
    -
    -    val kafkaParamsForExecutors =
    -      ConfigUpdater("executor", specifiedKafkaParams)
    -        .set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
    -        .set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
deserClassName)
    -
    -        // Make sure executors do only what the driver tells them.
    -        .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
    +    val kafkaOffsetReader = new KafkaOffsetReaderImpl(
    +      strategy(caseInsensitiveParams),
    +      kafkaParamsForDriver(specifiedKafkaParams),
    +      parameters,
    +      driverGroupIdPrefix = s"$uniqueGroupId-driver")
     
    -        // So that consumers in executors do not mess with any existing 
group id
    -        .set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-executor")
    +    new KafkaSource(
    +      sqlContext,
    +      kafkaOffsetReader,
    +      kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
    +      parameters,
    +      metadataPath,
    +      startingStreamOffsets,
    +      failOnDataLoss(caseInsensitiveParams))
    +  }
     
    -        // So that consumers in executors does not commit offsets 
unnecessarily
    -        .set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
    +  /**
    +   * Returns a new base relation with the given parameters.
    +   *
    +   * @note The parameters' keywords are case insensitive and this 
insensitivity is enforced
    +   *       by the Map that is passed to the function.
    +   */
    +  override def createRelation(
    +    sqlContext: SQLContext,
    +    parameters: Map[String, String]): BaseRelation = {
    +    validateOptions(parameters, Batch)
    +    // Each running query should use its own group id. Otherwise, the 
query may be only assigned
    +    // partial data since Kafka will assign partitions to multiple 
consumers having the same group
    +    // id. Hence, we should generate a unique id for each query.
    +    val uniqueGroupId = s"spark-kafka-relation-${UUID.randomUUID}"
    +    val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase, v) }
    +    val specifiedKafkaParams =
    +      parameters
    +        .keySet
    +        .filter(_.toLowerCase.startsWith("kafka."))
    +        .map { k => k.drop(6).toString -> parameters(k) }
    +        .toMap
     
    -        // If buffer config is not set, set it to reasonable value to work 
around
    -        // buffer issues (see KAFKA-3135)
    -        .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
    -        .build()
    +    val startingRelationOffsets =
    +      
caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) 
match {
    +        case Some("earliest") => EarliestOffsets
    +        case Some(json) => 
SpecificOffsets(JsonUtils.partitionOffsets(json))
    +        case None => EarliestOffsets
    +      }
     
    -    val strategy = caseInsensitiveParams.find(x => 
STRATEGY_OPTION_KEYS.contains(x._1)).get match {
    -      case ("assign", value) =>
    -        AssignStrategy(JsonUtils.partitions(value))
    -      case ("subscribe", value) =>
    -        
SubscribeStrategy(value.split(",").map(_.trim()).filter(_.nonEmpty))
    -      case ("subscribepattern", value) =>
    -        SubscribePatternStrategy(value.trim())
    -      case _ =>
    -        // Should never reach here as we are already matching on
    -        // matched strategy names
    -        throw new IllegalArgumentException("Unknown option")
    -    }
    +    val endingRelationOffsets =
    +      
caseInsensitiveParams.get(ENDING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) 
match {
    +        case Some("latest") => LatestOffsets
    +        case Some(json) => 
SpecificOffsets(JsonUtils.partitionOffsets(json))
    +        case None => LatestOffsets
    +      }
     
    -    val failOnDataLoss =
    -      caseInsensitiveParams.getOrElse(FAIL_ON_DATA_LOSS_OPTION_KEY, 
"true").toBoolean
    +    val kafkaOffsetReaderImpl = new KafkaOffsetReaderImpl(
    +      strategy(caseInsensitiveParams),
    +      kafkaParamsForDriver(specifiedKafkaParams),
    +      parameters,
    +      driverGroupIdPrefix = s"$uniqueGroupId-driver")
    +    val kafkaOffsetReader = new 
UninterruptibleKafkaOffsetReader(kafkaOffsetReaderImpl)
     
    -    new KafkaSource(
    +    new KafkaRelation(
           sqlContext,
    -      strategy,
    -      kafkaParamsForDriver,
    -      kafkaParamsForExecutors,
    +      kafkaOffsetReader,
    +      kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
           parameters,
    -      metadataPath,
    -      startingOffsets,
    -      failOnDataLoss,
    -      driverGroupIdPrefix = s"$uniqueGroupId-driver")
    +      failOnDataLoss(caseInsensitiveParams),
    +      startingRelationOffsets,
    +      endingRelationOffsets)
       }
     
    -  private def validateOptions(parameters: Map[String, String]): Unit = {
    +  private def kafkaParamsForDriver(specifiedKafkaParams: Map[String, 
String]) =
    +    ConfigUpdater("source", specifiedKafkaParams)
    +      .set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
    +      .set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName)
    +
    +      // Set to "earliest" to avoid exceptions. However, KafkaSource will 
fetch the initial
    +      // offsets by itself instead of counting on KafkaConsumer.
    +      .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    +
    +      // So that consumers in the driver does not commit offsets 
unnecessarily
    +      .set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
    +
    +      // So that the driver does not pull too much data
    +      .set(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, new 
java.lang.Integer(1))
    +
    +      // If buffer config is not set, set it to reasonable value to work 
around
    +      // buffer issues (see KAFKA-3135)
    +      .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
    +      .build()
    +
    +  private def kafkaParamsForExecutors(
    +    specifiedKafkaParams: Map[String, String], uniqueGroupId: String) =
    --- End diff --
    
    also convention is to have each param in different line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to