sven-lange-last closed pull request #3912: Fix max.poll.interval.ms setting of 
KafkaConsumer.
URL: https://github.com/apache/incubator-openwhisk/pull/3912
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/resources/application.conf 
b/common/scala/src/main/resources/application.conf
index 86e5213b96..0c0e4e49b4 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -72,7 +72,12 @@ whisk {
             heartbeat-interval-ms = 10000
             enable-auto-commit = false
             auto-offset-reset = earliest
-            max-poll-interval = 360000
+
+            // request-timeout-ms always needs to be larger than 
max-poll-interval-ms per
+            // https://kafka.apache.org/documentation/#upgrade_1010_notable
+            max-poll-interval-ms = 1800000 // 30 minutes
+            request-timeout-ms = 1860000 // 31 minutes
+
             // This value controls the server-side wait time which affects 
polling latency.
             // A low value improves latency performance but it is important to 
not set it too low
             // as that will cause excessive busy-waiting.
diff --git 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index eeec4a4a04..7111573ae1 100644
--- 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -127,6 +127,8 @@ class KafkaConsumerConnector(
       configMapToKafkaConfig(loadConfigOrThrow[Map[String, 
String]](ConfigKeys.kafkaCommon)) ++
       configMapToKafkaConfig(loadConfigOrThrow[Map[String, 
String]](ConfigKeys.kafkaConsumer))
 
+    verifyConfig(config, ConsumerConfig.configNames().asScala.toSet)
+
     val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new 
ByteArrayDeserializer)
     consumer.subscribe(Seq(topic).asJavaCollection)
     consumer
diff --git 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
index 6843cbc1e7..b7373aa27c 100644
--- 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
+++ 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -101,4 +101,24 @@ object KafkaConfiguration {
   def configMapToKafkaConfig(configMap: Map[String, String]): Map[String, 
String] = configMap.map {
     case (key, value) => configToKafkaKey(key) -> value
   }
+
+  /**
+   * Prints a warning for each unknown configuration item and returns false if 
at least one item is unknown.
+   *
+   * @param config the config to be checked
+   * @param validKeys known valid keys to configure
+   * @return true if all configuration keys are known, false if at least one 
is unknown
+   */
+  def verifyConfig(config: Map[String, String], validKeys: 
Set[String])(implicit logging: Logging): Boolean = {
+    val passedKeys = config.keySet
+    val knownKeys = validKeys intersect passedKeys
+    val unknownKeys = passedKeys -- knownKeys
+
+    if (unknownKeys.nonEmpty) {
+      logging.warn(this, s"potential misconfiguration, unknown settings: 
${unknownKeys.mkString(",")}")
+      false
+    } else {
+      true
+    }
+  }
 }
diff --git 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
index f82acaf85d..aea6b3c275 100644
--- 
a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
+++ 
b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
@@ -29,6 +29,7 @@ import whisk.core.ConfigKeys
 import whisk.core.connector.{Message, MessageProducer}
 import whisk.core.entity.UUIDs
 
+import scala.collection.JavaConverters._
 import scala.concurrent.duration._
 import scala.concurrent.{blocking, ExecutionContext, Future, Promise}
 import scala.util.{Failure, Success}
@@ -96,6 +97,8 @@ class KafkaProducerConnector(kafkahosts: String, id: String = 
UUIDs.randomUUID()
       configMapToKafkaConfig(loadConfigOrThrow[Map[String, 
String]](ConfigKeys.kafkaCommon)) ++
       configMapToKafkaConfig(loadConfigOrThrow[Map[String, 
String]](ConfigKeys.kafkaProducer))
 
+    verifyConfig(config, ProducerConfig.configNames().asScala.toSet)
+
     new KafkaProducer(config, new StringSerializer, new StringSerializer)
   }
 
diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala 
b/tests/src/test/scala/services/KafkaConnectorTests.scala
index 57e2e56ad2..297b1d4ade 100644
--- a/tests/src/test/scala/services/KafkaConnectorTests.scala
+++ b/tests/src/test/scala/services/KafkaConnectorTests.scala
@@ -55,6 +55,7 @@ class KafkaConnectorTests
   val groupid = "kafkatest"
   val topic = "KafkaConnectorTestTopic"
   val maxPollInterval = 10.seconds
+  System.setProperty("whisk.kafka.consumer.max-poll-interval-ms", 
maxPollInterval.toMillis.toString)
 
   // Need to overwrite replication factor for tests that shut down and start
   // Kafka instances intentionally. These tests will fail if there is more than


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to