This is an automated email from the ASF dual-hosted git repository.

japetrsn pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new a13ef12  Verify trigger fire by sideeffect. (#272)
a13ef12 is described below

commit a13ef12ada9ddb5f18c10551939dcae74aab6894
Author: Christian Bickel <git...@cbickel.de>
AuthorDate: Thu Jun 28 19:22:53 2018 +0200

    Verify trigger fire by sideeffect. (#272)
    
    Until now, we verify the kafkaprovider, by looking if the action in 
OpenWhisk has been executed with `activation list`. The problem with this call 
is, that the activation may not be in the list in time, because it only returns 
the result of an CouchDB view. If there is much load on the database, the 
view-computation may be behind and not return the activation.
    As side effect, we use a trigger creation. The name of the trigger will be 
read from the kafka message. All other things (like credentials, ...) are 
already present in the action, that is invoked anyway.
    To check if the trigger exists, we use the ID of the trigger. So no view is 
involved anymore.
---
 tests/dat/createTriggerActions.js                  | 11 +++++
 .../test/scala/system/health/BasicHealthTest.scala | 51 ++++++++++++----------
 2 files changed, 39 insertions(+), 23 deletions(-)

diff --git a/tests/dat/createTriggerActions.js 
b/tests/dat/createTriggerActions.js
new file mode 100644
index 0000000..58044ea
--- /dev/null
+++ b/tests/dat/createTriggerActions.js
@@ -0,0 +1,11 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more 
contributor
+// license agreements; and to You under the Apache License, Version 2.0.
+
+var openwhisk = require('openwhisk');
+
+function main(params) {
+    console.log(JSON.stringify(params));
+    var name = params.messages[0].value;
+    var ow = openwhisk();
+    return ow.triggers.create({name: name});
+}
diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala 
b/tests/src/test/scala/system/health/BasicHealthTest.scala
index 090352c..2b99938 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -19,25 +19,20 @@ package system.health
 
 import java.util.concurrent.{TimeUnit, TimeoutException}
 
+import com.jayway.restassured.RestAssured
+import common.TestUtils.NOT_FOUND
+import common._
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Inside, Matchers}
+import spray.json.DefaultJsonProtocol._
+import spray.json._
 import system.utils.KafkaUtils
+import whisk.utils.retry
 
 import scala.concurrent.duration.DurationInt
 import scala.language.postfixOps
-import org.junit.runner.RunWith
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Inside, Matchers}
-import org.scalatest.junit.JUnitRunner
-import common.JsHelpers
-import common.TestHelpers
-import common.TestUtils
-import common.Wsk
-import common.WskActorSystem
-import common.WskProps
-import common.WskTestHelpers
-import spray.json._
-import spray.json.DefaultJsonProtocol._
-import com.jayway.restassured.RestAssured
-import org.apache.kafka.clients.producer.ProducerRecord
-import whisk.utils.retry;
 
 @RunWith(classOf[JUnitRunner])
 class BasicHealthTest
@@ -180,7 +175,10 @@ class BasicHealthTest
           }, N = 10, waitBeforeRetry = Some(1.second))
       }
 
-      val defaultAction = Some(TestUtils.getTestActionFilename("hello.js"))
+      // This action creates a trigger if it gets executed.
+      // The name of the trigger will be the message, that has been send to 
kafka.
+      // We only create this trigger to verify, that the action has been 
executed after sending the message to kafka.
+      val defaultAction = Some("dat/createTriggerActions.js")
       val defaultActionName = s"helloKafka-$currentTime"
 
       assetHelper.withCleaner(wsk.action, defaultActionName) { (action, name) 
=>
@@ -194,9 +192,17 @@ class BasicHealthTest
       // key to use for the produced message
       val key = "TheKey"
 
-      println(s"Producing message with key: $key and value: $currentTime")
+      val verificationName = s"trigger-$currentTime"
+
+      // Check that the verification trigger does not exist before the action 
ran.
+      // This will also clean up the trigger after the test.
+      assetHelper.withCleaner(wsk.trigger, verificationName) { (trigger, name) 
=>
+        trigger.get(name, NOT_FOUND)
+      }
+
+      println(s"Producing message with key: $key and value: $verificationName")
       val producer = kafkaUtils.createProducer()
-      val record = new ProducerRecord(topic, key, currentTime)
+      val record = new ProducerRecord(topic, key, verificationName)
       val future = producer.send(record)
 
       producer.flush()
@@ -212,11 +218,10 @@ class BasicHealthTest
         case e: Exception => throw e
       }
 
-      retry({
-        println("Polling for activations")
-        val activations = wsk.activation.pollFor(N = 1, Some(triggerName), 
retries = maxRetries)
-        assert(activations.nonEmpty)
-      }, N = 3)
+      // Check if the trigger, that should have been created as reaction on 
the kafka-message, has been created.
+      // The trigger should have been created by the action, that has been 
triggered by the kafka message.
+      // If we cannot find it, the most probably the action did not run.
+      retry(wsk.trigger.get(verificationName), 60, Some(1.second))
   }
 
   it should "return correct status and configuration" in 
withAssetCleaner(wskprops) {

Reply via email to