gaborgsomogyi commented on a change in pull request #24738: [SPARK-23098][SQL] Migrate Kafka Batch source to v2. URL: https://github.com/apache/spark/pull/24738#discussion_r299029916
########## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatch.scala ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Network.NETWORK_TIMEOUT +import org.apache.spark.sql.sources.v2.reader.{Batch, InputPartition, PartitionReaderFactory} + + +private[kafka010] class KafkaBatch( Review comment: You're right, there is. Originally I've considered it, made steps towards but realized that only this part can be extracted cleanly: ``` val kafkaOffsetReader = new KafkaOffsetReader( strategy, KafkaSourceProvider.kafkaParamsForDriver(specifiedKafkaParams), sourceOptions, driverGroupIdPrefix = s"$uniqueGroupId-driver") // Leverage the KafkaReader to obtain the relevant partition offsets val (fromPartitionOffsets, untilPartitionOffsets) = { try { (kafkaOffsetReader.fetchPartitionOffsets(startingOffsets), kafkaOffsetReader.fetchPartitionOffsets(endingOffsets)) } finally { kafkaOffsetReader.close() } } // Obtain topicPartitions in both from and until partition offset, ignoring // topic partitions that were added and/or deleted between the two above calls. if (fromPartitionOffsets.keySet != untilPartitionOffsets.keySet) { implicit val topicOrdering: Ordering[TopicPartition] = Ordering.by(t => t.topic()) val fromTopics = fromPartitionOffsets.keySet.toList.sorted.mkString(",") val untilTopics = untilPartitionOffsets.keySet.toList.sorted.mkString(",") throw new IllegalStateException("different topic partitions " + s"for starting offsets topics[${fromTopics}] and " + s"ending offsets topics[${untilTopics}]") } ``` To extract it one need either an object function or a trait (abstract class not working because of `KafkaRelation`) which: * Increase the complexity * Decrease readability * Not too much gain Still, if you think it worth it can be done. WDYT? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
