This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 2.5 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push: new 301abf0 KAFKA-9677: Fix consumer fetch with small consume bandwidth quotas (#8290) 301abf0 is described below commit 301abf040b39b217cae9732520b963cd63b12d35 Author: Anna Povzner <a...@confluent.io> AuthorDate: Sat Mar 14 13:45:50 2020 -0700 KAFKA-9677: Fix consumer fetch with small consume bandwidth quotas (#8290) When we changed quota communication with KIP-219, fetch requests get throttled by returning empty response with the delay in throttle_time_ms and Kafka consumer retries again after the delay. With default configs, the maximum fetch size could be as big as 50MB (or 10MB per partition). The default broker config (1-second window, 10 full windows of tracked bandwidth/thread utilization usage) means that < 5MB/s consumer quota (per broker) may block consumers from being able to fetch any data. This PR ensures that consumers cannot get blocked by quota by capping fetchMaxBytes in KafkaApis.handleFetchRequest() to quota window * consume bandwidth quota. In the example of default configs (10-second quota window) and 1MB/s consumer bandwidth quota, fetchMaxBytes would be capped to 10MB. Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com> --- .../scala/kafka/server/ClientQuotaManager.scala | 16 +++++++++++++ core/src/main/scala/kafka/server/KafkaApis.scala | 10 +++++++- .../integration/kafka/api/BaseQuotaTest.scala | 28 +++++++++++++++++++--- .../kafka/api/CustomQuotaCallbackTest.scala | 6 +++-- .../unit/kafka/server/ClientQuotaManagerTest.scala | 24 +++++++++++++++++++ 5 files changed, 78 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index 4526528..8316d0c 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -233,6 +233,22 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } } + /** + * Returns maximum value (produced/consume bytes or request processing time) that could be recorded without guaranteed throttling. + * Recording any larger value will always be throttled, even if no other values were recorded in the quota window. + * This is used for deciding the maximum bytes that can be fetched at once + */ + def getMaxValueInQuotaWindow(session: Session, clientId: String): Double = { + if (quotasEnabled) { + val clientSensors = getOrCreateQuotaSensors(session, clientId) + Option(quotaCallback.quotaLimit(clientQuotaType, clientSensors.metricTags.asJava)) + .map(_.toDouble * (config.numQuotaSamples - 1) * config.quotaWindowSizeSeconds) + .getOrElse(Double.MaxValue) + } else { + Double.MaxValue + } + } + def recordAndGetThrottleTimeMs(session: Session, clientId: String, value: Double, timeMs: Long): Int = { var throttleTimeMs = 0 val clientSensors = getOrCreateQuotaSensors(session, clientId) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 8a14cd7..e613ab5 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -800,7 +800,15 @@ class KafkaApis(val requestChannel: RequestChannel, } } - val fetchMaxBytes = Math.min(fetchRequest.maxBytes, config.fetchMaxBytes) + // for fetch from consumer, cap fetchMaxBytes to the maximum bytes that could be fetched without being throttled given + // no bytes were recorded in the recent quota window + // trying to fetch more bytes would result in a guaranteed throttling potentially blocking consumer progress + val maxQuotaWindowBytes = if (fetchRequest.isFromFollower) + Int.MaxValue + else + quotas.fetch.getMaxValueInQuotaWindow(request.session, clientId).toInt + + val fetchMaxBytes = Math.min(Math.min(fetchRequest.maxBytes, config.fetchMaxBytes), maxQuotaWindowBytes) val fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes) if (interesting.isEmpty) processResponseCallback(Seq.empty) diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala index 651eef6..4d9d23e 100644 --- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala @@ -15,6 +15,7 @@ package kafka.api import java.time.Duration +import java.util.concurrent.TimeUnit import java.util.{Collections, HashMap, Properties} import kafka.api.QuotaTestClients._ @@ -83,7 +84,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness { quotaTestClients.verifyProduceThrottle(expectThrottle = true) // Consumer should read in a bursty manner and get throttled immediately - quotaTestClients.consumeUntilThrottled(produced) + assertTrue("Should have consumed at least one record", quotaTestClients.consumeUntilThrottled(produced) > 0) quotaTestClients.verifyConsumeThrottle(expectThrottle = true) } @@ -107,6 +108,23 @@ abstract class BaseQuotaTest extends IntegrationTestHarness { } @Test + def testProducerConsumerOverrideLowerQuota(): Unit = { + // consumer quota is set such that consumer quota * default quota window (10 seconds) is less than + // MAX_PARTITION_FETCH_BYTES_CONFIG, so that we can test consumer ability to fetch in this case + // In this case, 250 * 10 < 4096 + quotaTestClients.overrideQuotas(2000, 250, Int.MaxValue) + quotaTestClients.waitForQuotaUpdate(2000, 250, Int.MaxValue) + + val numRecords = 1000 + val produced = quotaTestClients.produceUntilThrottled(numRecords) + quotaTestClients.verifyProduceThrottle(expectThrottle = true) + + // Consumer should be able to consume at least one record, even when throttled + assertTrue("Should have consumed at least one record", quotaTestClients.consumeUntilThrottled(produced) > 0) + quotaTestClients.verifyConsumeThrottle(expectThrottle = true) + } + + @Test def testQuotaOverrideDelete(): Unit = { // Override producer and consumer quotas to unlimited quotaTestClients.overrideQuotas(Long.MaxValue, Long.MaxValue, Int.MaxValue) @@ -194,19 +212,23 @@ abstract class QuotaTestClients(topic: String, } def consumeUntilThrottled(maxRecords: Int, waitForRequestCompletion: Boolean = true): Int = { + val timeoutMs = TimeUnit.MINUTES.toMillis(1) + consumer.subscribe(Collections.singleton(topic)) var numConsumed = 0 var throttled = false + val startMs = System.currentTimeMillis do { numConsumed += consumer.poll(Duration.ofMillis(100L)).count val metric = throttleMetric(QuotaType.Fetch, consumerClientId) throttled = metric != null && metricValue(metric) > 0 - } while (numConsumed < maxRecords && !throttled) + } while (numConsumed < maxRecords && !throttled && System.currentTimeMillis < startMs + timeoutMs) // If throttled, wait for the records from the last fetch to be received if (throttled && numConsumed < maxRecords && waitForRequestCompletion) { val minRecords = numConsumed + 1 - while (numConsumed < minRecords) + val startMs = System.currentTimeMillis + while (numConsumed < minRecords && System.currentTimeMillis < startMs + timeoutMs) numConsumed += consumer.poll(Duration.ofMillis(100L)).count } numConsumed diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala index e442889..091a178 100644 --- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala +++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala @@ -100,9 +100,11 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { quotaLimitCalls.values.foreach(_.set(0)) user.produceConsume(expectProduceThrottle = false, expectConsumeThrottle = false) - // ClientQuotaCallback#quotaLimit is invoked by each quota manager once for each new client + // ClientQuotaCallback#quotaLimit is invoked by each quota manager once per throttled produce request for each client assertEquals(1, quotaLimitCalls(ClientQuotaType.PRODUCE).get) - assertEquals(1, quotaLimitCalls(ClientQuotaType.FETCH).get) + // ClientQuotaCallback#quotaLimit is invoked once per each unthrottled and two for each throttled request + // since we don't know the total number of requests, we verify it was called at least twice (at least one throttled request) + assertTrue("quotaLimit must be called at least twice", quotaLimitCalls(ClientQuotaType.FETCH).get > 2) assertTrue(s"Too many quotaLimit calls $quotaLimitCalls", quotaLimitCalls(ClientQuotaType.REQUEST).get <= 10) // sanity check // Large quota updated to small quota, should throttle user.configureAndWaitForQuota(9000, 3000) diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala index e568268..a704163 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala @@ -194,6 +194,11 @@ class ClientQuotaManagerTest { private def checkQuota(quotaManager: ClientQuotaManager, user: String, clientId: String, expectedBound: Long, value: Int, expectThrottle: Boolean): Unit = { assertEquals(expectedBound, quotaManager.quota(user, clientId).bound, 0.0) + val session = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, user), InetAddress.getLocalHost) + val expectedMaxValueInQuotaWindow = + if (expectedBound < Long.MaxValue) config.quotaWindowSizeSeconds * (config.numQuotaSamples - 1) * expectedBound else Double.MaxValue + assertEquals(expectedMaxValueInQuotaWindow, quotaManager.getMaxValueInQuotaWindow(session, clientId), 0.01) + val throttleTimeMs = maybeRecord(quotaManager, user, clientId, value * config.numQuotaSamples) if (expectThrottle) assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0) @@ -202,6 +207,25 @@ class ClientQuotaManagerTest { } @Test + def testGetMaxValueInQuotaWindowWithNonDefaultQuotaWindow(): Unit = { + val numFullQuotaWindows = 3 // 3 seconds window (vs. 10 seconds default) + val nonDefaultConfig = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = Long.MaxValue, numQuotaSamples = numFullQuotaWindows + 1) + val quotaManager = new ClientQuotaManager(nonDefaultConfig, metrics, Fetch, time, "") + val userSession = Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "userA"), InetAddress.getLocalHost) + + try { + // no quota set + assertEquals(Double.MaxValue, quotaManager.getMaxValueInQuotaWindow(userSession, "client1"), 0.01) + + // Set default <user> quota config + quotaManager.updateQuota(Some(ConfigEntityName.Default), None, None, Some(new Quota(10, true))) + assertEquals(10 * numFullQuotaWindows, quotaManager.getMaxValueInQuotaWindow(userSession, "client1"), 0.01) + } finally { + quotaManager.shutdown() + } + } + + @Test def testSetAndRemoveDefaultUserQuota(): Unit = { // quotaTypesEnabled will be QuotaTypes.NoQuotas initially val quotaManager = new ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault = Long.MaxValue),