This is an automated email from the ASF dual-hosted git repository. japetrsn pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git
The following commit(s) were added to refs/heads/master by this push: new a13ef12 Verify trigger fire by sideeffect. (#272) a13ef12 is described below commit a13ef12ada9ddb5f18c10551939dcae74aab6894 Author: Christian Bickel <git...@cbickel.de> AuthorDate: Thu Jun 28 19:22:53 2018 +0200 Verify trigger fire by sideeffect. (#272) Until now, we verify the kafkaprovider, by looking if the action in OpenWhisk has been executed with `activation list`. The problem with this call is, that the activation may not be in the list in time, because it only returns the result of an CouchDB view. If there is much load on the database, the view-computation may be behind and not return the activation. As side effect, we use a trigger creation. The name of the trigger will be read from the kafka message. All other things (like credentials, ...) are already present in the action, that is invoked anyway. To check if the trigger exists, we use the ID of the trigger. So no view is involved anymore. --- tests/dat/createTriggerActions.js | 11 +++++ .../test/scala/system/health/BasicHealthTest.scala | 51 ++++++++++++---------- 2 files changed, 39 insertions(+), 23 deletions(-) diff --git a/tests/dat/createTriggerActions.js b/tests/dat/createTriggerActions.js new file mode 100644 index 0000000..58044ea --- /dev/null +++ b/tests/dat/createTriggerActions.js @@ -0,0 +1,11 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more contributor +// license agreements; and to You under the Apache License, Version 2.0. + +var openwhisk = require('openwhisk'); + +function main(params) { + console.log(JSON.stringify(params)); + var name = params.messages[0].value; + var ow = openwhisk(); + return ow.triggers.create({name: name}); +} diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala b/tests/src/test/scala/system/health/BasicHealthTest.scala index 090352c..2b99938 100644 --- a/tests/src/test/scala/system/health/BasicHealthTest.scala +++ b/tests/src/test/scala/system/health/BasicHealthTest.scala @@ -19,25 +19,20 @@ package system.health import java.util.concurrent.{TimeUnit, TimeoutException} +import com.jayway.restassured.RestAssured +import common.TestUtils.NOT_FOUND +import common._ +import org.apache.kafka.clients.producer.ProducerRecord +import org.junit.runner.RunWith +import org.scalatest.junit.JUnitRunner +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Inside, Matchers} +import spray.json.DefaultJsonProtocol._ +import spray.json._ import system.utils.KafkaUtils +import whisk.utils.retry import scala.concurrent.duration.DurationInt import scala.language.postfixOps -import org.junit.runner.RunWith -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Inside, Matchers} -import org.scalatest.junit.JUnitRunner -import common.JsHelpers -import common.TestHelpers -import common.TestUtils -import common.Wsk -import common.WskActorSystem -import common.WskProps -import common.WskTestHelpers -import spray.json._ -import spray.json.DefaultJsonProtocol._ -import com.jayway.restassured.RestAssured -import org.apache.kafka.clients.producer.ProducerRecord -import whisk.utils.retry; @RunWith(classOf[JUnitRunner]) class BasicHealthTest @@ -180,7 +175,10 @@ class BasicHealthTest }, N = 10, waitBeforeRetry = Some(1.second)) } - val defaultAction = Some(TestUtils.getTestActionFilename("hello.js")) + // This action creates a trigger if it gets executed. + // The name of the trigger will be the message, that has been send to kafka. + // We only create this trigger to verify, that the action has been executed after sending the message to kafka. + val defaultAction = Some("dat/createTriggerActions.js") val defaultActionName = s"helloKafka-$currentTime" assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) => @@ -194,9 +192,17 @@ class BasicHealthTest // key to use for the produced message val key = "TheKey" - println(s"Producing message with key: $key and value: $currentTime") + val verificationName = s"trigger-$currentTime" + + // Check that the verification trigger does not exist before the action ran. + // This will also clean up the trigger after the test. + assetHelper.withCleaner(wsk.trigger, verificationName) { (trigger, name) => + trigger.get(name, NOT_FOUND) + } + + println(s"Producing message with key: $key and value: $verificationName") val producer = kafkaUtils.createProducer() - val record = new ProducerRecord(topic, key, currentTime) + val record = new ProducerRecord(topic, key, verificationName) val future = producer.send(record) producer.flush() @@ -212,11 +218,10 @@ class BasicHealthTest case e: Exception => throw e } - retry({ - println("Polling for activations") - val activations = wsk.activation.pollFor(N = 1, Some(triggerName), retries = maxRetries) - assert(activations.nonEmpty) - }, N = 3) + // Check if the trigger, that should have been created as reaction on the kafka-message, has been created. + // The trigger should have been created by the action, that has been triggered by the kafka message. + // If we cannot find it, the most probably the action did not run. + retry(wsk.trigger.get(verificationName), 60, Some(1.second)) } it should "return correct status and configuration" in withAssetCleaner(wskprops) {