This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 771a310 Update Kafka clients to 2.4.0, make kafka related dependencies consistent. (#4844) 771a310 is described below commit 771a310935c9daa10982f2d3e9b01f832d3d31d6 Author: Markus Thömmes <markusthoem...@me.com> AuthorDate: Tue Mar 3 07:56:40 2020 +0100 Update Kafka clients to 2.4.0, make kafka related dependencies consistent. (#4844) --- common/scala/build.gradle | 2 +- core/monitoring/user-events/build.gradle | 1 + .../openwhisk/core/monitoring/metrics/EventConsumer.scala | 13 ++++++------- settings.gradle | 2 +- tests/build.gradle | 2 +- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/common/scala/build.gradle b/common/scala/build.gradle index 9b6f098..4f72527 100644 --- a/common/scala/build.gradle +++ b/common/scala/build.gradle @@ -52,7 +52,7 @@ dependencies { compile "commons-codec:commons-codec:1.9" compile "commons-io:commons-io:2.6" compile "commons-collections:commons-collections:3.2.2" - compile "org.apache.kafka:kafka-clients:2.0.0" + compile "org.apache.kafka:kafka-clients:2.4.0" compile "org.apache.httpcomponents:httpclient:4.5.5" compile "com.fasterxml.uuid:java-uuid-generator:3.1.3" compile "com.github.ben-manes.caffeine:caffeine:2.6.2" diff --git a/core/monitoring/user-events/build.gradle b/core/monitoring/user-events/build.gradle index a657586..a458c8e 100644 --- a/core/monitoring/user-events/build.gradle +++ b/core/monitoring/user-events/build.gradle @@ -40,6 +40,7 @@ dependencies { testCompile "junit:junit:4.11" testCompile "org.scalatest:scalatest_${gradle.scala.depVersion}:3.0.8" + testCompile "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0" testCompile "com.typesafe.akka:akka-stream-kafka-testkit_${gradle.scala.depVersion}:${gradle.akka_kafka.version}" testCompile "com.typesafe.akka:akka-testkit_${gradle.scala.depVersion}:${gradle.akka.version}" testCompile "com.typesafe.akka:akka-stream-testkit_${gradle.scala.depVersion}:${gradle.akka.version}" diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala index 7b5709e..ab5023c 100644 --- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala +++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala @@ -21,12 +21,11 @@ import java.lang.management.ManagementFactory import akka.Done import akka.actor.ActorSystem -import akka.kafka.ConsumerMessage.CommittableOffsetBatch -import akka.kafka.scaladsl.Consumer +import akka.kafka.scaladsl.{Committer, Consumer} import akka.kafka.scaladsl.Consumer.DrainingControl -import akka.kafka.{ConsumerSettings, Subscriptions} +import akka.kafka.{CommitterSettings, ConsumerSettings, Subscriptions} import akka.stream.ActorMaterializer -import akka.stream.scaladsl.{Keep, Sink} +import akka.stream.scaladsl.Keep import javax.management.ObjectName import org.apache.kafka.clients.consumer.ConsumerConfig import kamon.Kamon @@ -85,6 +84,8 @@ case class EventConsumer(settings: ConsumerSettings[String, String], override def metrics(): Future[Map[MetricName, common.Metric]] = control.metrics + private val committerSettings = CommitterSettings(system).withMaxBatch(20) + //TODO Use RestartSource private val control: DrainingControl[Done] = Consumer .committableSource(updatedSettings, Subscriptions.topics(userEventTopic)) @@ -92,9 +93,7 @@ case class EventConsumer(settings: ConsumerSettings[String, String], processEvent(msg.record.value()) msg.committableOffset } - .batch(max = 20, CommittableOffsetBatch(_))(_.updated(_)) - .mapAsync(3)(_.commitScaladsl()) - .toMat(Sink.ignore)(Keep.both) + .toMat(Committer.sink(committerSettings))(Keep.both) .mapMaterializedValue(DrainingControl.apply) .run() diff --git a/settings.gradle b/settings.gradle index 69ea863..501293f 100644 --- a/settings.gradle +++ b/settings.gradle @@ -46,7 +46,7 @@ gradle.ext.scalafmt = [ ] gradle.ext.akka = [version : '2.5.26'] -gradle.ext.akka_kafka = [version : '1.1.0'] +gradle.ext.akka_kafka = [version : '2.0.2'] gradle.ext.akka_http = [version : '10.1.11'] gradle.ext.akka_management = [version : '1.0.5'] diff --git a/tests/build.gradle b/tests/build.gradle index 396fb1f..fa9bff7 100644 --- a/tests/build.gradle +++ b/tests/build.gradle @@ -207,9 +207,9 @@ dependencies { compile "io.opentracing:opentracing-mock:0.31.0" compile "org.apache.curator:curator-test:${gradle.curator.version}" compile "com.atlassian.oai:swagger-request-validator-core:1.4.5" + compile "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0" compile "com.typesafe.akka:akka-stream-kafka-testkit_${gradle.scala.depVersion}:${gradle.akka_kafka.version}" compile "com.typesafe.akka:akka-stream-testkit_${gradle.scala.depVersion}:${gradle.akka.version}" - compile "com.typesafe.akka:akka-stream-testkit_${gradle.scala.depVersion}:${gradle.akka.version}" compile "io.fabric8:kubernetes-server-mock:${gradle.kube_client.version}" compile "com.amazonaws:aws-java-sdk-s3:1.11.295"