This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 301abf0  KAFKA-9677: Fix consumer fetch with small consume bandwidth 
quotas (#8290)
301abf0 is described below

commit 301abf040b39b217cae9732520b963cd63b12d35
Author: Anna Povzner <a...@confluent.io>
AuthorDate: Sat Mar 14 13:45:50 2020 -0700

    KAFKA-9677: Fix consumer fetch with small consume bandwidth quotas (#8290)
    
    When we changed quota communication with KIP-219, fetch requests get 
throttled by returning empty response with the delay in throttle_time_ms and 
Kafka consumer retries again after the delay. With default configs, the maximum 
fetch size could be as big as 50MB (or 10MB per partition). The default broker 
config (1-second window, 10 full windows of tracked bandwidth/thread 
utilization usage) means that < 5MB/s consumer quota (per broker) may block 
consumers from being able to fetch any data.
    
    This PR ensures that consumers cannot get blocked by quota by capping 
fetchMaxBytes in KafkaApis.handleFetchRequest() to quota window * consume 
bandwidth quota. In the example of default configs (10-second quota window) and 
1MB/s consumer bandwidth quota, fetchMaxBytes would be capped to 10MB.
    
    Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com>
---
 .../scala/kafka/server/ClientQuotaManager.scala    | 16 +++++++++++++
 core/src/main/scala/kafka/server/KafkaApis.scala   | 10 +++++++-
 .../integration/kafka/api/BaseQuotaTest.scala      | 28 +++++++++++++++++++---
 .../kafka/api/CustomQuotaCallbackTest.scala        |  6 +++--
 .../unit/kafka/server/ClientQuotaManagerTest.scala | 24 +++++++++++++++++++
 5 files changed, 78 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala 
b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 4526528..8316d0c 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -233,6 +233,22 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     }
   }
 
+  /**
+   * Returns maximum value (produced/consume bytes or request processing time) 
that could be recorded without guaranteed throttling.
+   * Recording any larger value will always be throttled, even if no other 
values were recorded in the quota window.
+   * This is used for deciding the maximum bytes that can be fetched at once
+   */
+  def getMaxValueInQuotaWindow(session: Session, clientId: String): Double = {
+    if (quotasEnabled) {
+      val clientSensors = getOrCreateQuotaSensors(session, clientId)
+      Option(quotaCallback.quotaLimit(clientQuotaType, 
clientSensors.metricTags.asJava))
+        .map(_.toDouble * (config.numQuotaSamples - 1) * 
config.quotaWindowSizeSeconds)
+        .getOrElse(Double.MaxValue)
+    } else {
+      Double.MaxValue
+    }
+  }
+
   def recordAndGetThrottleTimeMs(session: Session, clientId: String, value: 
Double, timeMs: Long): Int = {
     var throttleTimeMs = 0
     val clientSensors = getOrCreateQuotaSensors(session, clientId)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 8a14cd7..e613ab5 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -800,7 +800,15 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     }
 
-    val fetchMaxBytes = Math.min(fetchRequest.maxBytes, config.fetchMaxBytes)
+    // for fetch from consumer, cap fetchMaxBytes to the maximum bytes that 
could be fetched without being throttled given
+    // no bytes were recorded in the recent quota window
+    // trying to fetch more bytes would result in a guaranteed throttling 
potentially blocking consumer progress
+    val maxQuotaWindowBytes = if (fetchRequest.isFromFollower)
+      Int.MaxValue
+    else
+      quotas.fetch.getMaxValueInQuotaWindow(request.session, clientId).toInt
+
+    val fetchMaxBytes = Math.min(Math.min(fetchRequest.maxBytes, 
config.fetchMaxBytes), maxQuotaWindowBytes)
     val fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes)
     if (interesting.isEmpty)
       processResponseCallback(Seq.empty)
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 651eef6..4d9d23e 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -15,6 +15,7 @@
 package kafka.api
 
 import java.time.Duration
+import java.util.concurrent.TimeUnit
 import java.util.{Collections, HashMap, Properties}
 
 import kafka.api.QuotaTestClients._
@@ -83,7 +84,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness {
     quotaTestClients.verifyProduceThrottle(expectThrottle = true)
 
     // Consumer should read in a bursty manner and get throttled immediately
-    quotaTestClients.consumeUntilThrottled(produced)
+    assertTrue("Should have consumed at least one record", 
quotaTestClients.consumeUntilThrottled(produced) > 0)
     quotaTestClients.verifyConsumeThrottle(expectThrottle = true)
   }
 
@@ -107,6 +108,23 @@ abstract class BaseQuotaTest extends 
IntegrationTestHarness {
   }
 
   @Test
+  def testProducerConsumerOverrideLowerQuota(): Unit = {
+    // consumer quota is set such that consumer quota * default quota window 
(10 seconds) is less than
+    // MAX_PARTITION_FETCH_BYTES_CONFIG, so that we can test consumer ability 
to fetch in this case
+    // In this case, 250 * 10 < 4096
+    quotaTestClients.overrideQuotas(2000, 250, Int.MaxValue)
+    quotaTestClients.waitForQuotaUpdate(2000, 250, Int.MaxValue)
+
+    val numRecords = 1000
+    val produced = quotaTestClients.produceUntilThrottled(numRecords)
+    quotaTestClients.verifyProduceThrottle(expectThrottle = true)
+
+    // Consumer should be able to consume at least one record, even when 
throttled
+    assertTrue("Should have consumed at least one record", 
quotaTestClients.consumeUntilThrottled(produced) > 0)
+    quotaTestClients.verifyConsumeThrottle(expectThrottle = true)
+  }
+
+  @Test
   def testQuotaOverrideDelete(): Unit = {
     // Override producer and consumer quotas to unlimited
     quotaTestClients.overrideQuotas(Long.MaxValue, Long.MaxValue, Int.MaxValue)
@@ -194,19 +212,23 @@ abstract class QuotaTestClients(topic: String,
   }
 
   def consumeUntilThrottled(maxRecords: Int, waitForRequestCompletion: Boolean 
= true): Int = {
+    val timeoutMs = TimeUnit.MINUTES.toMillis(1)
+
     consumer.subscribe(Collections.singleton(topic))
     var numConsumed = 0
     var throttled = false
+    val startMs = System.currentTimeMillis
     do {
       numConsumed += consumer.poll(Duration.ofMillis(100L)).count
       val metric = throttleMetric(QuotaType.Fetch, consumerClientId)
       throttled = metric != null && metricValue(metric) > 0
-    }  while (numConsumed < maxRecords && !throttled)
+    }  while (numConsumed < maxRecords && !throttled && 
System.currentTimeMillis < startMs + timeoutMs)
 
     // If throttled, wait for the records from the last fetch to be received
     if (throttled && numConsumed < maxRecords && waitForRequestCompletion) {
       val minRecords = numConsumed + 1
-      while (numConsumed < minRecords)
+      val startMs = System.currentTimeMillis
+      while (numConsumed < minRecords && System.currentTimeMillis < startMs + 
timeoutMs)
         numConsumed += consumer.poll(Duration.ofMillis(100L)).count
     }
     numConsumed
diff --git 
a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala 
b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
index e442889..091a178 100644
--- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
+++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala
@@ -100,9 +100,11 @@ class CustomQuotaCallbackTest extends 
IntegrationTestHarness with SaslSetup {
     quotaLimitCalls.values.foreach(_.set(0))
     user.produceConsume(expectProduceThrottle = false, expectConsumeThrottle = 
false)
 
-    // ClientQuotaCallback#quotaLimit is invoked by each quota manager once 
for each new client
+    // ClientQuotaCallback#quotaLimit is invoked by each quota manager once 
per throttled produce request for each client
     assertEquals(1, quotaLimitCalls(ClientQuotaType.PRODUCE).get)
-    assertEquals(1, quotaLimitCalls(ClientQuotaType.FETCH).get)
+    // ClientQuotaCallback#quotaLimit is invoked once per each unthrottled and 
two for each throttled request
+    // since we don't know the total number of requests, we verify it was 
called at least twice (at least one throttled request)
+    assertTrue("quotaLimit must be called at least twice", 
quotaLimitCalls(ClientQuotaType.FETCH).get > 2)
     assertTrue(s"Too many quotaLimit calls $quotaLimitCalls", 
quotaLimitCalls(ClientQuotaType.REQUEST).get <= 10) // sanity check
     // Large quota updated to small quota, should throttle
     user.configureAndWaitForQuota(9000, 3000)
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index e568268..a704163 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -194,6 +194,11 @@ class ClientQuotaManagerTest {
 
   private def checkQuota(quotaManager: ClientQuotaManager, user: String, 
clientId: String, expectedBound: Long, value: Int, expectThrottle: Boolean): 
Unit = {
     assertEquals(expectedBound, quotaManager.quota(user, clientId).bound, 0.0)
+    val session = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), 
InetAddress.getLocalHost)
+    val expectedMaxValueInQuotaWindow =
+      if (expectedBound < Long.MaxValue) config.quotaWindowSizeSeconds * 
(config.numQuotaSamples - 1) * expectedBound else Double.MaxValue
+    assertEquals(expectedMaxValueInQuotaWindow, 
quotaManager.getMaxValueInQuotaWindow(session, clientId), 0.01)
+
     val throttleTimeMs = maybeRecord(quotaManager, user, clientId, value * 
config.numQuotaSamples)
     if (expectThrottle)
       assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", 
throttleTimeMs > 0)
@@ -202,6 +207,25 @@ class ClientQuotaManagerTest {
   }
 
   @Test
+  def testGetMaxValueInQuotaWindowWithNonDefaultQuotaWindow(): Unit = {
+    val numFullQuotaWindows = 3   // 3 seconds window (vs. 10 seconds default)
+    val nonDefaultConfig = ClientQuotaManagerConfig(quotaBytesPerSecondDefault 
= Long.MaxValue, numQuotaSamples = numFullQuotaWindows + 1)
+    val quotaManager = new ClientQuotaManager(nonDefaultConfig, metrics, 
Fetch, time, "")
+    val userSession = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
"userA"), InetAddress.getLocalHost)
+
+    try {
+      // no quota set
+      assertEquals(Double.MaxValue, 
quotaManager.getMaxValueInQuotaWindow(userSession, "client1"), 0.01)
+
+      // Set default <user> quota config
+      quotaManager.updateQuota(Some(ConfigEntityName.Default), None, None, 
Some(new Quota(10, true)))
+      assertEquals(10 * numFullQuotaWindows, 
quotaManager.getMaxValueInQuotaWindow(userSession, "client1"), 0.01)
+    } finally {
+      quotaManager.shutdown()
+    }
+  }
+
+  @Test
   def testSetAndRemoveDefaultUserQuota(): Unit = {
     // quotaTypesEnabled will be QuotaTypes.NoQuotas initially
     val quotaManager = new 
ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 
Long.MaxValue),

Reply via email to