sven-lange-last closed pull request #3912: Fix max.poll.interval.ms setting of KafkaConsumer. URL: https://github.com/apache/incubator-openwhisk/pull/3912
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf index 86e5213b96..0c0e4e49b4 100644 --- a/common/scala/src/main/resources/application.conf +++ b/common/scala/src/main/resources/application.conf @@ -72,7 +72,12 @@ whisk { heartbeat-interval-ms = 10000 enable-auto-commit = false auto-offset-reset = earliest - max-poll-interval = 360000 + + // request-timeout-ms always needs to be larger than max-poll-interval-ms per + // https://kafka.apache.org/documentation/#upgrade_1010_notable + max-poll-interval-ms = 1800000 // 30 minutes + request-timeout-ms = 1860000 // 31 minutes + // This value controls the server-side wait time which affects polling latency. // A low value improves latency performance but it is important to not set it too low // as that will cause excessive busy-waiting. diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala index eeec4a4a04..7111573ae1 100644 --- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala +++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala @@ -127,6 +127,8 @@ class KafkaConsumerConnector( configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon)) ++ configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaConsumer)) + verifyConfig(config, ConsumerConfig.configNames().asScala.toSet) + val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer) consumer.subscribe(Seq(topic).asJavaCollection) consumer diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala index 6843cbc1e7..b7373aa27c 100644 --- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala +++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala @@ -101,4 +101,24 @@ object KafkaConfiguration { def configMapToKafkaConfig(configMap: Map[String, String]): Map[String, String] = configMap.map { case (key, value) => configToKafkaKey(key) -> value } + + /** + * Prints a warning for each unknown configuration item and returns false if at least one item is unknown. + * + * @param config the config to be checked + * @param validKeys known valid keys to configure + * @return true if all configuration keys are known, false if at least one is unknown + */ + def verifyConfig(config: Map[String, String], validKeys: Set[String])(implicit logging: Logging): Boolean = { + val passedKeys = config.keySet + val knownKeys = validKeys intersect passedKeys + val unknownKeys = passedKeys -- knownKeys + + if (unknownKeys.nonEmpty) { + logging.warn(this, s"potential misconfiguration, unknown settings: ${unknownKeys.mkString(",")}") + false + } else { + true + } + } } diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala index f82acaf85d..aea6b3c275 100644 --- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala +++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala @@ -29,6 +29,7 @@ import whisk.core.ConfigKeys import whisk.core.connector.{Message, MessageProducer} import whisk.core.entity.UUIDs +import scala.collection.JavaConverters._ import scala.concurrent.duration._ import scala.concurrent.{blocking, ExecutionContext, Future, Promise} import scala.util.{Failure, Success} @@ -96,6 +97,8 @@ class KafkaProducerConnector(kafkahosts: String, id: String = UUIDs.randomUUID() configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon)) ++ configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaProducer)) + verifyConfig(config, ProducerConfig.configNames().asScala.toSet) + new KafkaProducer(config, new StringSerializer, new StringSerializer) } diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala b/tests/src/test/scala/services/KafkaConnectorTests.scala index 57e2e56ad2..297b1d4ade 100644 --- a/tests/src/test/scala/services/KafkaConnectorTests.scala +++ b/tests/src/test/scala/services/KafkaConnectorTests.scala @@ -55,6 +55,7 @@ class KafkaConnectorTests val groupid = "kafkatest" val topic = "KafkaConnectorTestTopic" val maxPollInterval = 10.seconds + System.setProperty("whisk.kafka.consumer.max-poll-interval-ms", maxPollInterval.toMillis.toString) // Need to overwrite replication factor for tests that shut down and start // Kafka instances intentionally. These tests will fail if there is more than ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services