Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/16686#discussion_r99227617 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -71,94 +77,152 @@ private[kafka010] class KafkaSourceProvider extends StreamSourceProvider .map { k => k.drop(6).toString -> parameters(k) } .toMap - val deserClassName = classOf[ByteArrayDeserializer].getName - // Each running query should use its own group id. Otherwise, the query may be only assigned - // partial data since Kafka will assign partitions to multiple consumers having the same group - // id. Hence, we should generate a unique id for each query. - val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}" - - val startingOffsets = + val startingStreamOffsets = caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match { case Some("latest") => LatestOffsets case Some("earliest") => EarliestOffsets case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json)) case None => LatestOffsets } - val kafkaParamsForDriver = - ConfigUpdater("source", specifiedKafkaParams) - .set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName) - .set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName) - - // Set to "earliest" to avoid exceptions. However, KafkaSource will fetch the initial - // offsets by itself instead of counting on KafkaConsumer. - .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") - - // So that consumers in the driver does not commit offsets unnecessarily - .set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") - - // So that the driver does not pull too much data - .set(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, new java.lang.Integer(1)) - - // If buffer config is not set, set it to reasonable value to work around - // buffer issues (see KAFKA-3135) - .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer) - .build() - - val kafkaParamsForExecutors = - ConfigUpdater("executor", specifiedKafkaParams) - .set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName) - .set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName) - - // Make sure executors do only what the driver tells them. - .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") + val kafkaOffsetReader = new KafkaOffsetReaderImpl( + strategy(caseInsensitiveParams), + kafkaParamsForDriver(specifiedKafkaParams), + parameters, + driverGroupIdPrefix = s"$uniqueGroupId-driver") - // So that consumers in executors do not mess with any existing group id - .set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-executor") + new KafkaSource( + sqlContext, + kafkaOffsetReader, + kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId), + parameters, + metadataPath, + startingStreamOffsets, + failOnDataLoss(caseInsensitiveParams)) + } - // So that consumers in executors does not commit offsets unnecessarily - .set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + /** + * Returns a new base relation with the given parameters. + * + * @note The parameters' keywords are case insensitive and this insensitivity is enforced + * by the Map that is passed to the function. + */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String]): BaseRelation = { + validateOptions(parameters, Batch) + // Each running query should use its own group id. Otherwise, the query may be only assigned + // partial data since Kafka will assign partitions to multiple consumers having the same group + // id. Hence, we should generate a unique id for each query. + val uniqueGroupId = s"spark-kafka-relation-${UUID.randomUUID}" + val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase, v) } + val specifiedKafkaParams = + parameters + .keySet + .filter(_.toLowerCase.startsWith("kafka.")) + .map { k => k.drop(6).toString -> parameters(k) } + .toMap - // If buffer config is not set, set it to reasonable value to work around - // buffer issues (see KAFKA-3135) - .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer) - .build() + val startingRelationOffsets = + caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match { + case Some("earliest") => EarliestOffsets + case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json)) + case None => EarliestOffsets + } - val strategy = caseInsensitiveParams.find(x => STRATEGY_OPTION_KEYS.contains(x._1)).get match { - case ("assign", value) => - AssignStrategy(JsonUtils.partitions(value)) - case ("subscribe", value) => - SubscribeStrategy(value.split(",").map(_.trim()).filter(_.nonEmpty)) - case ("subscribepattern", value) => - SubscribePatternStrategy(value.trim()) - case _ => - // Should never reach here as we are already matching on - // matched strategy names - throw new IllegalArgumentException("Unknown option") - } + val endingRelationOffsets = + caseInsensitiveParams.get(ENDING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) match { + case Some("latest") => LatestOffsets + case Some(json) => SpecificOffsets(JsonUtils.partitionOffsets(json)) + case None => LatestOffsets + } - val failOnDataLoss = - caseInsensitiveParams.getOrElse(FAIL_ON_DATA_LOSS_OPTION_KEY, "true").toBoolean + val kafkaOffsetReaderImpl = new KafkaOffsetReaderImpl( + strategy(caseInsensitiveParams), + kafkaParamsForDriver(specifiedKafkaParams), + parameters, + driverGroupIdPrefix = s"$uniqueGroupId-driver") + val kafkaOffsetReader = new UninterruptibleKafkaOffsetReader(kafkaOffsetReaderImpl) - new KafkaSource( + new KafkaRelation( sqlContext, - strategy, - kafkaParamsForDriver, - kafkaParamsForExecutors, + kafkaOffsetReader, + kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId), parameters, - metadataPath, - startingOffsets, - failOnDataLoss, - driverGroupIdPrefix = s"$uniqueGroupId-driver") + failOnDataLoss(caseInsensitiveParams), + startingRelationOffsets, + endingRelationOffsets) } - private def validateOptions(parameters: Map[String, String]): Unit = { + private def kafkaParamsForDriver(specifiedKafkaParams: Map[String, String]) = + ConfigUpdater("source", specifiedKafkaParams) + .set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName) + .set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName) + + // Set to "earliest" to avoid exceptions. However, KafkaSource will fetch the initial + // offsets by itself instead of counting on KafkaConsumer. + .set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + + // So that consumers in the driver does not commit offsets unnecessarily + .set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + + // So that the driver does not pull too much data + .set(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, new java.lang.Integer(1)) + + // If buffer config is not set, set it to reasonable value to work around + // buffer issues (see KAFKA-3135) + .setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer) + .build() + + private def kafkaParamsForExecutors( + specifiedKafkaParams: Map[String, String], uniqueGroupId: String) = --- End diff -- also convention is to have each param in different line.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org