dajac commented on a change in pull request #11173:
URL: https://github.com/apache/kafka/pull/11173#discussion_r762038197
##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -222,11 +240,19 @@ object GetOffsetShell {
}
/**
- * Return the partition infos. Filter them with topicPartitionFilter.
+ * Return the partition infos. Filter them with topicFilter and
topicPartitionFilter.
*/
- private def listPartitionInfos(consumer: KafkaConsumer[_, _],
topicPartitionFilter: PartitionInfo => Boolean): Seq[PartitionInfo] = {
- consumer.listTopics.asScala.values.flatMap { partitions =>
- partitions.asScala.filter(topicPartitionFilter)
+ private def listPartitionInfos(client: Admin,
+ topicFilter: String => Boolean,
+ topicPartitionFilter: PartitionInfo =>
Boolean,
+ excludeInternalTopics: Boolean):
Seq[PartitionInfo] = {
+ val topics = client.listTopics(new
ListTopicsOptions().listInternal(!excludeInternalTopics)).names().get().asScala.filter(topicFilter)
Review comment:
nit: This line is quite hard to read. Could we extract `new
ListTopicsOptions().listInternal(!excludeInternalTopics)` in a variable? You
can also remove the parenthesis of all the getters (e.g. names, get).
##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -195,16 +205,24 @@ object GetOffsetShell {
val upperRange = group(4).map(_.toInt).getOrElse(Int.MaxValue)
(p: Int) => p >= lowerRange && p < upperRange
}
-
- tp => topicFilter.isTopicAllowed(tp.topic, excludeInternalTopics) &&
partitionFilter(tp.partition)
+ (
+ topic => topicFilter.isTopicAllowed(topic, excludeInternalTopics),
+ tp => topicFilter.isTopicAllowed(tp.topic, excludeInternalTopics) &&
partitionFilter(tp.partition)
+ )
Review comment:
The code is getting a bit complicated with all those filter functions. I
wonder if we should introduce an abstraction for them now. Something along the
lines of the following. The name are not very good but the idea is there. What
do you think?
```scala
trait TopicPartitionFilter {
def isTopicAllowed(topic: String): Boolean
def isPartitionAllowed(partition: PartitionInfo): Boolean
}
case class UniqueTopicPartitionFilter(
topicRegex: String,
partition: Int
) extends TopicPartitionFilter {
...
}
case class RangeTopicPartitionFilter(
topicRegex: String,
lower: Int,
upper: Int
) extends TopicPartitionFilter {
...
}
case class SetTopicPartitionFilter(
topic: String,
partitionIds: Set[Int]
) extends TopicPartitionFilter {
...
}
case class CompositePartitionFilter(
filters: List[TopicPartitionFilter]
) extends TopicPartitionFilter {
...
}
```
##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -164,20 +167,27 @@ object GetOffsetShell {
}
/**
- * Creates a topic-partition filter based on a list of patterns.
+ * Creates a topic filter and a topic-partition filter based on a list of
patterns.
* Expected format:
* List: TopicPartitionPattern(, TopicPartitionPattern)*
* TopicPartitionPattern: TopicPattern(:PartitionPattern)? |
:PartitionPattern
* TopicPattern: REGEX
* PartitionPattern: NUMBER | NUMBER-(NUMBER)? | -NUMBER
*/
- def createTopicPartitionFilterWithPatternList(topicPartitions: String,
excludeInternalTopics: Boolean): PartitionInfo => Boolean = {
+ def createTopicPartitionFilterWithPatternList(topicPartitions: String,
+ excludeInternalTopics: Boolean
+ ): (String => Boolean,
PartitionInfo => Boolean) = {
Review comment:
nit: Could we format methods as follow? We tend to follow this pattern
nowadays.
```scala
def createTopicPartitionFilterWithPatternList(
topicPartitions: String,
excludeInternalTopics: Boolean
): (String => Boolean, PartitionInfo => Boolean) = {
}
```
##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -222,11 +240,19 @@ object GetOffsetShell {
}
/**
- * Return the partition infos. Filter them with topicPartitionFilter.
+ * Return the partition infos. Filter them with topicFilter and
topicPartitionFilter.
*/
- private def listPartitionInfos(consumer: KafkaConsumer[_, _],
topicPartitionFilter: PartitionInfo => Boolean): Seq[PartitionInfo] = {
- consumer.listTopics.asScala.values.flatMap { partitions =>
- partitions.asScala.filter(topicPartitionFilter)
+ private def listPartitionInfos(client: Admin,
+ topicFilter: String => Boolean,
+ topicPartitionFilter: PartitionInfo =>
Boolean,
+ excludeInternalTopics: Boolean):
Seq[PartitionInfo] = {
+ val topics = client.listTopics(new
ListTopicsOptions().listInternal(!excludeInternalTopics)).names().get().asScala.filter(topicFilter)
+
client.describeTopics(topics.asJavaCollection).allTopicNames().get().asScala.flatMap
{ case (topic, description) =>
+ description
+ .partitions()
+ .asScala
+ .map(tp => new PartitionInfo(topic, tp.partition(), tp.leader(),
tp.replicas().asScala.toArray, tp.isr().asScala.toArray))
Review comment:
Instead of converting to `PartitionInfo`, could we adapt the filters to
work with what we have here?
##########
File path: core/src/main/scala/kafka/tools/GetOffsetShell.scala
##########
@@ -123,10 +123,10 @@ object GetOffsetShell {
new Properties
config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
config.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientId)
Review comment:
Should we use `AdminConfig`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]