dajac commented on a change in pull request #11953:
URL: https://github.com/apache/kafka/pull/11953#discussion_r837137526



##########
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##########
@@ -62,19 +62,22 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
 
   def resizeThreadPool(newSize: Int): Unit = {
     def migratePartitions(newSize: Int): Unit = {
+      val allRemovedPartitionsMap = mutable.Map[TopicPartition, 
InitialFetchState]()
       fetcherThreadMap.forKeyValue { (id, thread) =>
         val partitionStates = removeFetcherForPartitions(thread.partitions)
         if (id.fetcherId >= newSize)
           thread.shutdown()
-        val fetchStates = partitionStates.map { case (topicPartition, 
currentFetchState) =>
-          val initialFetchState = InitialFetchState(currentFetchState.topicId, 
thread.sourceBroker,
-            currentLeaderEpoch = currentFetchState.currentLeaderEpoch,
-            initOffset = currentFetchState.fetchOffset)
-          topicPartition -> initialFetchState
+        partitionStates.forKeyValue {
+          (topicPartition, currentFetchState) =>

Review comment:
       nit: We usually keep `(topicPartition, currentFetchState) =>` on the 
previous line.

##########
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -210,4 +217,103 @@ class AbstractFetcherManagerTest {
     verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
     verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
   }
+
+  @Test
+  def testExpandThreadPool(): Unit = {
+    val topicPartitions = mockTopicPartition(10, 100)
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", 10) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker)
+      }
+    }
+    try {
+      resizeAndCheckFetcherPartitionDistribution(fetcherManager, 
topicPartitions, 25)
+    } finally {
+      fetcherManager.closeAllFetchers()
+    }
+  }
+
+  @Test
+  def testShrinkThreadPool(): Unit = {
+    val topicPartitions = mockTopicPartition(10, 100)
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", 20) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker)
+      }
+    }
+    try {
+      resizeAndCheckFetcherPartitionDistribution(fetcherManager, 
topicPartitions, 5)
+    } finally {
+      fetcherManager.closeAllFetchers()
+    }
+  }
+
+  def resizeAndCheckFetcherPartitionDistribution(fetcherManager: 
AbstractFetcherManager[AbstractFetcherThread], topicPartitions: 
Set[TopicPartition], fetcherNum: Int): Unit = {

Review comment:
       nit: Could we break this long line?
   
   ```
   def resizeAndCheckFetcherPartitionDistribution(
     fetcherManager: AbstractFetcherManager[AbstractFetcherThread],
     topicPartitions: Set[TopicPartition],
     fetcherNum: Int
   ): Unit = {
   ```

##########
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -210,4 +217,103 @@ class AbstractFetcherManagerTest {
     verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
     verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
   }
+
+  @Test
+  def testExpandThreadPool(): Unit = {
+    val topicPartitions = mockTopicPartition(10, 100)
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", 10) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker)
+      }
+    }
+    try {
+      resizeAndCheckFetcherPartitionDistribution(fetcherManager, 
topicPartitions, 25)
+    } finally {
+      fetcherManager.closeAllFetchers()
+    }
+  }
+
+  @Test
+  def testShrinkThreadPool(): Unit = {
+    val topicPartitions = mockTopicPartition(10, 100)
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", 20) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker)
+      }
+    }
+    try {
+      resizeAndCheckFetcherPartitionDistribution(fetcherManager, 
topicPartitions, 5)
+    } finally {
+      fetcherManager.closeAllFetchers()
+    }
+  }
+
+  def resizeAndCheckFetcherPartitionDistribution(fetcherManager: 
AbstractFetcherManager[AbstractFetcherThread], topicPartitions: 
Set[TopicPartition], fetcherNum: Int): Unit = {
+
+    fetcherManager.addFetcherForPartitions(topicPartitions.map(tp => {
+      val brokerId = Utils.abs(tp.topic().hashCode % 5)
+      val brokerEndPoint = new BrokerEndPoint(brokerId, 
s"kafka-host-$brokerId", 9092)
+      tp -> InitialFetchState(None, brokerEndPoint, 0, 0)
+    }).toMap)
+    fetcherManager.resizeThreadPool(fetcherNum)
+    val ownedPartitions = mutable.Set.empty[TopicPartition]
+    fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId, 
fetcherThread) =>
+      val fetcherId = brokerIdAndFetcherId.fetcherId
+      fetcherThread.partitions.foreach { tp =>
+        ownedPartitions += tp
+        assertEquals(fetcherManager.getFetcherId(tp), fetcherId)
+      }
+    }
+    // Verify that all partitions are owned by the fetcher threads.
+    assertEquals(topicPartitions, ownedPartitions)
+  }
+
+  def mockTopicPartition(topicNum: Int, partitionNum: Int): 
Set[TopicPartition] = {

Review comment:
       nit: Could we make this method private? How about calling it 
`makeTopicPartitions`? The `TopicPartition` are not mocked so using `mock` 
looks weird here.

##########
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -210,4 +217,103 @@ class AbstractFetcherManagerTest {
     verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
     verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
   }
+
+  @Test
+  def testExpandThreadPool(): Unit = {
+    val topicPartitions = mockTopicPartition(10, 100)
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", 10) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker)
+      }
+    }
+    try {
+      resizeAndCheckFetcherPartitionDistribution(fetcherManager, 
topicPartitions, 25)
+    } finally {
+      fetcherManager.closeAllFetchers()
+    }
+  }
+
+  @Test
+  def testShrinkThreadPool(): Unit = {
+    val topicPartitions = mockTopicPartition(10, 100)
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", 20) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker)
+      }
+    }
+    try {
+      resizeAndCheckFetcherPartitionDistribution(fetcherManager, 
topicPartitions, 5)
+    } finally {
+      fetcherManager.closeAllFetchers()
+    }
+  }
+
+  def resizeAndCheckFetcherPartitionDistribution(fetcherManager: 
AbstractFetcherManager[AbstractFetcherThread], topicPartitions: 
Set[TopicPartition], fetcherNum: Int): Unit = {
+
+    fetcherManager.addFetcherForPartitions(topicPartitions.map(tp => {

Review comment:
       nit: `topicPartitions.map(tp => {` -> `topicPartitions.map { tp => `

##########
File path: core/src/main/scala/kafka/server/AbstractFetcherManager.scala
##########
@@ -61,7 +61,9 @@ abstract class AbstractFetcherManager[T <: 
AbstractFetcherThread](val name: Stri
   private[server] def deadThreadCount: Int = lock synchronized { 
fetcherThreadMap.values.count(_.isThreadFailed) }
 
   def resizeThreadPool(newSize: Int): Unit = {
+    // Used to replace migratePartitions and solve the fetch problem
     def migratePartitions(newSize: Int): Unit = {
+      val allRemovedPartitionsMap = mutable.Map[TopicPartition, 
InitialFetchState]()
       fetcherThreadMap.forKeyValue { (id, thread) =>
         val partitionStates = removeFetcherForPartitions(thread.partitions)

Review comment:
       @yufeiyan1220 Are you interested in doing this in your PR? We can do as 
a follow-up otherwise.

##########
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -210,4 +217,103 @@ class AbstractFetcherManagerTest {
     verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
     verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
   }
+
+  @Test
+  def testExpandThreadPool(): Unit = {
+    val topicPartitions = mockTopicPartition(10, 100)
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", 10) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker)
+      }
+    }
+    try {
+      resizeAndCheckFetcherPartitionDistribution(fetcherManager, 
topicPartitions, 25)
+    } finally {
+      fetcherManager.closeAllFetchers()
+    }
+  }
+
+  @Test
+  def testShrinkThreadPool(): Unit = {
+    val topicPartitions = mockTopicPartition(10, 100)
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", 20) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker)
+      }
+    }
+    try {
+      resizeAndCheckFetcherPartitionDistribution(fetcherManager, 
topicPartitions, 5)
+    } finally {
+      fetcherManager.closeAllFetchers()
+    }
+  }
+
+  def resizeAndCheckFetcherPartitionDistribution(fetcherManager: 
AbstractFetcherManager[AbstractFetcherThread], topicPartitions: 
Set[TopicPartition], fetcherNum: Int): Unit = {
+
+    fetcherManager.addFetcherForPartitions(topicPartitions.map(tp => {
+      val brokerId = Utils.abs(tp.topic().hashCode % 5)

Review comment:
       nit: You can omit the parenthesis after `topic`. We usually don't put 
them for getters.

##########
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -210,4 +217,103 @@ class AbstractFetcherManagerTest {
     verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
     verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
   }
+
+  @Test
+  def testExpandThreadPool(): Unit = {
+    val topicPartitions = mockTopicPartition(10, 100)
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", 10) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker)
+      }
+    }
+    try {
+      resizeAndCheckFetcherPartitionDistribution(fetcherManager, 
topicPartitions, 25)
+    } finally {
+      fetcherManager.closeAllFetchers()
+    }
+  }
+
+  @Test
+  def testShrinkThreadPool(): Unit = {
+    val topicPartitions = mockTopicPartition(10, 100)
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", 20) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker)
+      }
+    }
+    try {
+      resizeAndCheckFetcherPartitionDistribution(fetcherManager, 
topicPartitions, 5)
+    } finally {
+      fetcherManager.closeAllFetchers()
+    }
+  }
+
+  def resizeAndCheckFetcherPartitionDistribution(fetcherManager: 
AbstractFetcherManager[AbstractFetcherThread], topicPartitions: 
Set[TopicPartition], fetcherNum: Int): Unit = {
+
+    fetcherManager.addFetcherForPartitions(topicPartitions.map(tp => {
+      val brokerId = Utils.abs(tp.topic().hashCode % 5)
+      val brokerEndPoint = new BrokerEndPoint(brokerId, 
s"kafka-host-$brokerId", 9092)

Review comment:
       Does using different broker ids bring any value to this test? If it 
does, it might be good to also verify the broker id when we verify the fetch id.

##########
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -210,4 +217,103 @@ class AbstractFetcherManagerTest {
     verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
     verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
   }
+
+  @Test
+  def testExpandThreadPool(): Unit = {
+    val topicPartitions = mockTopicPartition(10, 100)
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", 10) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker)
+      }
+    }
+    try {
+      resizeAndCheckFetcherPartitionDistribution(fetcherManager, 
topicPartitions, 25)
+    } finally {
+      fetcherManager.closeAllFetchers()
+    }
+  }
+
+  @Test
+  def testShrinkThreadPool(): Unit = {
+    val topicPartitions = mockTopicPartition(10, 100)
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", 20) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker)
+      }
+    }
+    try {
+      resizeAndCheckFetcherPartitionDistribution(fetcherManager, 
topicPartitions, 5)
+    } finally {
+      fetcherManager.closeAllFetchers()
+    }

Review comment:
       `testShrinkThreadPool` is almost identical to `testExpandThreadPool`. 
Would it make sense to define `testResizeThreadPool(currentSize, newSize)` and 
use it in both?

##########
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -210,4 +217,103 @@ class AbstractFetcherManagerTest {
     verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
     verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
   }
+
+  @Test
+  def testExpandThreadPool(): Unit = {
+    val topicPartitions = mockTopicPartition(10, 100)
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", 10) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker)
+      }
+    }
+    try {
+      resizeAndCheckFetcherPartitionDistribution(fetcherManager, 
topicPartitions, 25)
+    } finally {
+      fetcherManager.closeAllFetchers()
+    }
+  }
+
+  @Test
+  def testShrinkThreadPool(): Unit = {
+    val topicPartitions = mockTopicPartition(10, 100)
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", 20) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker)
+      }
+    }
+    try {
+      resizeAndCheckFetcherPartitionDistribution(fetcherManager, 
topicPartitions, 5)
+    } finally {
+      fetcherManager.closeAllFetchers()
+    }
+  }
+
+  def resizeAndCheckFetcherPartitionDistribution(fetcherManager: 
AbstractFetcherManager[AbstractFetcherThread], topicPartitions: 
Set[TopicPartition], fetcherNum: Int): Unit = {
+
+    fetcherManager.addFetcherForPartitions(topicPartitions.map(tp => {
+      val brokerId = Utils.abs(tp.topic().hashCode % 5)
+      val brokerEndPoint = new BrokerEndPoint(brokerId, 
s"kafka-host-$brokerId", 9092)
+      tp -> InitialFetchState(None, brokerEndPoint, 0, 0)
+    }).toMap)
+    fetcherManager.resizeThreadPool(fetcherNum)
+    val ownedPartitions = mutable.Set.empty[TopicPartition]
+    fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId, 
fetcherThread) =>
+      val fetcherId = brokerIdAndFetcherId.fetcherId
+      fetcherThread.partitions.foreach { tp =>
+        ownedPartitions += tp
+        assertEquals(fetcherManager.getFetcherId(tp), fetcherId)
+      }
+    }
+    // Verify that all partitions are owned by the fetcher threads.
+    assertEquals(topicPartitions, ownedPartitions)
+  }
+
+  def mockTopicPartition(topicNum: Int, partitionNum: Int): 
Set[TopicPartition] = {
+    val res = mutable.Set[TopicPartition]()
+    val topicPrefix = "topic_"
+    for (i <- 0 to topicNum) {
+      val topic = topicPrefix + i
+      for (j <- 0 to partitionNum) {
+        res += new TopicPartition(topic, j)
+      }
+    }
+    res.toSet
+  }
+
+  class TestResizeFetcherThread(sourceBroker: BrokerEndPoint)
+    extends AbstractFetcherThread("test-resize-fetcher",
+      clientId = "mock-fetcher",
+      sourceBroker,
+      new FailedPartitions,
+      fetchBackOffMs = 0,
+      brokerTopicStats = new BrokerTopicStats) {
+    override protected def processPartitionData(topicPartition: 
TopicPartition, fetchOffset: Long, partitionData: FetchData): 
Option[LogAppendInfo] = {

Review comment:
       nit: Could we add an empty line before this method?

##########
File path: 
core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
##########
@@ -210,4 +217,103 @@ class AbstractFetcherManagerTest {
     verify(fetcher).maybeUpdateTopicIds(Set(tp1), topicIds)
     verify(fetcher).maybeUpdateTopicIds(Set(tp2), topicIds)
   }
+
+  @Test
+  def testExpandThreadPool(): Unit = {
+    val topicPartitions = mockTopicPartition(10, 100)
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", 10) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker)
+      }
+    }
+    try {
+      resizeAndCheckFetcherPartitionDistribution(fetcherManager, 
topicPartitions, 25)
+    } finally {
+      fetcherManager.closeAllFetchers()
+    }
+  }
+
+  @Test
+  def testShrinkThreadPool(): Unit = {
+    val topicPartitions = mockTopicPartition(10, 100)
+    val fetcherManager = new 
AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", 
"fetcher-manager", 20) {
+      override def createFetcherThread(fetcherId: Int, sourceBroker: 
BrokerEndPoint): AbstractFetcherThread = {
+        new TestResizeFetcherThread(sourceBroker)
+      }
+    }
+    try {
+      resizeAndCheckFetcherPartitionDistribution(fetcherManager, 
topicPartitions, 5)
+    } finally {
+      fetcherManager.closeAllFetchers()
+    }
+  }
+
+  def resizeAndCheckFetcherPartitionDistribution(fetcherManager: 
AbstractFetcherManager[AbstractFetcherThread], topicPartitions: 
Set[TopicPartition], fetcherNum: Int): Unit = {
+
+    fetcherManager.addFetcherForPartitions(topicPartitions.map(tp => {
+      val brokerId = Utils.abs(tp.topic().hashCode % 5)
+      val brokerEndPoint = new BrokerEndPoint(brokerId, 
s"kafka-host-$brokerId", 9092)
+      tp -> InitialFetchState(None, brokerEndPoint, 0, 0)
+    }).toMap)
+    fetcherManager.resizeThreadPool(fetcherNum)
+    val ownedPartitions = mutable.Set.empty[TopicPartition]
+    fetcherManager.fetcherThreadMap.forKeyValue { (brokerIdAndFetcherId, 
fetcherThread) =>
+      val fetcherId = brokerIdAndFetcherId.fetcherId
+      fetcherThread.partitions.foreach { tp =>
+        ownedPartitions += tp
+        assertEquals(fetcherManager.getFetcherId(tp), fetcherId)
+      }
+    }
+    // Verify that all partitions are owned by the fetcher threads.
+    assertEquals(topicPartitions, ownedPartitions)
+  }
+
+  def mockTopicPartition(topicNum: Int, partitionNum: Int): 
Set[TopicPartition] = {
+    val res = mutable.Set[TopicPartition]()
+    val topicPrefix = "topic_"
+    for (i <- 0 to topicNum) {
+      val topic = topicPrefix + i
+      for (j <- 0 to partitionNum) {
+        res += new TopicPartition(topic, j)
+      }
+    }
+    res.toSet
+  }
+
+  class TestResizeFetcherThread(sourceBroker: BrokerEndPoint)
+    extends AbstractFetcherThread("test-resize-fetcher",
+      clientId = "mock-fetcher",
+      sourceBroker,
+      new FailedPartitions,
+      fetchBackOffMs = 0,
+      brokerTopicStats = new BrokerTopicStats) {

Review comment:
       * nit: Could we also name the parameter for `test-resize-fetcher`?
   * nit: Could we align the parameters as well? We can either align all of 
them on the first one or move the first one on a new line.

##########
File path: core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
##########
@@ -20,13 +20,11 @@ package kafka.server
 import java.nio.ByteBuffer
 import java.util.Optional
 import java.util.concurrent.atomic.AtomicInteger
-
 import kafka.cluster.BrokerEndPoint
 import kafka.log.LogAppendInfo
 import kafka.message.NoCompressionCodec
 import kafka.metrics.KafkaYammerMetrics
-import kafka.server.AbstractFetcherThread.ReplicaFetch
-import kafka.server.AbstractFetcherThread.ResultWithPartitions
+import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}

Review comment:
       Could we revert changes in this file as there are not necessary for the 
patch?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to