gaborgsomogyi commented on a change in pull request #29729:
URL: https://github.com/apache/spark/pull/29729#discussion_r499571685
##########
File path:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
##########
@@ -36,31 +37,40 @@ import org.apache.spark.kafka010.KafkaConfigUpdater
* All three strategies have overloaded constructors that allow you to specify
* the starting offset for a particular partition.
*/
-private[kafka010] sealed trait ConsumerStrategy {
- /** Create a [[KafkaConsumer]] and subscribe to topics according to a
desired strategy */
- def createConsumer(kafkaParams: ju.Map[String, Object]):
Consumer[Array[Byte], Array[Byte]]
-
- /**
- * Updates the parameters with security if needed.
- * Added a function to hide internals and reduce code duplications because
all strategy uses it.
- */
- protected def setAuthenticationConfigIfNeeded(kafkaParams: ju.Map[String,
Object]) =
- KafkaConfigUpdater("source", kafkaParams.asScala.toMap)
+private[kafka010] sealed trait ConsumerStrategy extends Logging {
+ /** Creates an [[org.apache.kafka.clients.admin.AdminClient]] */
+ def createAdmin(kafkaParams: ju.Map[String, Object]): Admin = {
+ val updatedKafkaParams = KafkaConfigUpdater("source",
kafkaParams.asScala.toMap)
.setAuthenticationConfigIfNeeded()
.build()
+ logDebug(s"Admin params:
${KafkaRedactionUtil.redactParams(updatedKafkaParams.asScala.toSeq)}")
+ Admin.create(updatedKafkaParams)
+ }
+
+ /** Returns the assigned or subscribed [[TopicPartition]] */
+ def assignedTopicPartitions(admin: Admin): Set[TopicPartition]
+
+ protected def retrieveAllPartitions(admin: Admin, topics: Set[String]):
Set[TopicPartition] = {
+
admin.describeTopics(topics.asJava).all().get().asScala.filterNot(_._2.isInternal).flatMap
{
+ case (topic, topicDescription) =>
+ topicDescription.partitions().asScala.map { topicPartitionInfo =>
+ val partition = topicPartitionInfo.partition()
+ logDebug(s"Partition added: $topic:$partition")
Review comment:
That's a nice catch! Rephrasing it...
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]