This is an automated email from the ASF dual-hosted git repository.

japetrsn pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-openwhisk-package-kafka.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b28f89  Ensure test consumers exists before producing messages (#292)
1b28f89 is described below

commit 1b28f89556c9356ebf9043bdaf78f1dec90f6370
Author: James Dubee <jwdu...@us.ibm.com>
AuthorDate: Wed Nov 7 10:22:46 2018 -0500

    Ensure test consumers exists before producing messages (#292)
    
    * Ensure test consumers exists before producing messages
    
    * Refactoring
    
    * More refactoring
---
 .../test/scala/system/health/BasicHealthTest.scala | 135 +++--------------
 .../scala/system/packages/KafkaProduceTests.scala  |   7 +-
 .../system/packages/MessageHubFeedTests.scala      | 163 ++++++++-------------
 .../packages/MessageHubMultiWorkersTest.scala      |  31 +---
 .../system/packages/MessageHubProduceTests.scala   |  65 +++-----
 .../src/test/scala/system/stress/StressTest.scala  |  15 +-
 tests/src/test/scala/system/utils/KafkaUtils.scala |  81 +++++++++-
 7 files changed, 185 insertions(+), 312 deletions(-)

diff --git a/tests/src/test/scala/system/health/BasicHealthTest.scala 
b/tests/src/test/scala/system/health/BasicHealthTest.scala
index 2b99938..949bce6 100644
--- a/tests/src/test/scala/system/health/BasicHealthTest.scala
+++ b/tests/src/test/scala/system/health/BasicHealthTest.scala
@@ -19,7 +19,6 @@ package system.health
 
 import java.util.concurrent.{TimeUnit, TimeoutException}
 
-import com.jayway.restassured.RestAssured
 import common.TestUtils.NOT_FOUND
 import common._
 import org.apache.kafka.clients.producer.ProducerRecord
@@ -43,7 +42,8 @@ class BasicHealthTest
   with TestHelpers
   with WskTestHelpers
   with Inside
-  with JsHelpers {
+  with JsHelpers
+  with KafkaUtils {
 
   val topic = "test"
   val sessionTimeout = 10 seconds
@@ -55,125 +55,24 @@ class BasicHealthTest
   val messageHubFeed = "messageHubFeed"
   val messageHubProduce = "messageHubProduce"
   val actionName = s"$messagingPackage/$messageHubFeed"
-
-  val consumerInitTime = 10000 // ms
-
-  val kafkaUtils = new KafkaUtils
-
   val maxRetries = System.getProperty("max.retries", "60").toInt
 
   behavior of "Message Hub feed"
 
-  it should "create a new trigger" in withAssetCleaner(wskprops) {
-    val triggerName = s"newTrigger-${System.currentTimeMillis}"
-    println(s"Creating trigger $triggerName")
-
-    (wp, assetHelper) =>
-      val feedCreationResult = assetHelper.withCleaner(wsk.trigger, 
triggerName) {
-        (trigger, _) =>
-          trigger.create(triggerName, feed = 
Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
-            "user" -> kafkaUtils.getAsJson("user"),
-            "password" -> kafkaUtils.getAsJson("password"),
-            "api_key" -> kafkaUtils.getAsJson("api_key"),
-            "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
-            "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
-            "topic" -> topic.toJson
-          ))
-      }
-
-      withActivation(wsk.activation, feedCreationResult, initialWait = 5 
seconds, totalWait = 60 seconds) {
-        activation =>
-          // should be successful
-          activation.response.success shouldBe true
-
-          // It takes a moment for the consumer to fully initialize.
-          println("Giving the consumer a moment to get ready")
-          Thread.sleep(consumerInitTime)
-
-          val uuid = 
activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"",
 "")
-
-          println("Checking health endpoint(s) for existence of consumer uuid")
-          // get /health endpoint(s) and ensure it contains the new uuid
-          val healthUrls = 
System.getProperty("health_url").split("\\s*,\\s*").filterNot(_.isEmpty)
-          healthUrls shouldNot be(empty)
-
-          retry({
-            val uuids = healthUrls.flatMap(u => {
-              val response = RestAssured.given().get(u)
-              response.statusCode() should be(200)
-              response.asString()
-                .parseJson
-                .asJsObject
-                .getFields("consumers")
-                .head
-                .convertTo[JsArray]
-                .elements
-                .flatMap(c => {
-                  c.asJsObject.fields.keySet
-                })
-            }).toList
-
-            uuids should contain(uuid)
-
-          }, N = 10, waitBeforeRetry = Some(1.second))
-      }
-  }
-
-  it should "fire a trigger when a message is posted to message hub" in 
withAssetCleaner(wskprops) {
+  it should "create a consumer and fire a trigger when a message is posted to 
messagehub" in withAssetCleaner(wskprops) {
     val currentTime = s"${System.currentTimeMillis}"
 
     (wp, assetHelper) =>
       val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
-      println(s"Creating trigger $triggerName")
-
-      val feedCreationResult = assetHelper.withCleaner(wsk.trigger, 
triggerName) {
-        (trigger, _) =>
-          trigger.create(triggerName, feed = Some(actionName), parameters = 
Map(
-            "user" -> kafkaUtils.getAsJson("user"),
-            "password" -> kafkaUtils.getAsJson("password"),
-            "api_key" -> kafkaUtils.getAsJson("api_key"),
-            "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
-            "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
-            "topic" -> topic.toJson
-          ))
-      }
-
-      withActivation(wsk.activation, feedCreationResult, initialWait = 5 
seconds, totalWait = 60 seconds) {
-        activation =>
-          // should be successful
-          activation.response.success shouldBe true
 
-          // It takes a moment for the consumer to fully initialize.
-          println("Giving the consumer a moment to get ready")
-          Thread.sleep(consumerInitTime)
-
-          val uuid = 
activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"",
 "")
-
-          println("Checking health endpoint(s) for existence of consumer uuid")
-          // get /health endpoint(s) and ensure it contains the new uuid
-          val healthUrls = 
System.getProperty("health_url").split("\\s*,\\s*").filterNot(_.isEmpty)
-          healthUrls shouldNot be(empty)
-
-          retry({
-            val uuids = healthUrls.flatMap(u => {
-              val response = RestAssured.given().get(u)
-              response.statusCode() should be(200)
-              response.asString()
-                .parseJson
-                .asJsObject
-                .getFields("consumers")
-                .head
-                .convertTo[JsArray]
-                .elements
-                .flatMap(c => {
-                  c.asJsObject.fields.keySet
-                })
-            }).toList
-
-            uuids should contain(uuid)
-
-          }, N = 10, waitBeforeRetry = Some(1.second))
-      }
+      createTrigger(assetHelper, triggerName, parameters = Map(
+        "user" -> getAsJson("user"),
+        "password" -> getAsJson("password"),
+        "api_key" -> getAsJson("api_key"),
+        "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+        "kafka_brokers_sasl" -> getAsJson("brokers"),
+        "topic" -> topic.toJson
+      ))
 
       // This action creates a trigger if it gets executed.
       // The name of the trigger will be the message, that has been send to 
kafka.
@@ -201,7 +100,7 @@ class BasicHealthTest
       }
 
       println(s"Producing message with key: $key and value: $verificationName")
-      val producer = kafkaUtils.createProducer()
+      val producer = createProducer()
       val record = new ProducerRecord(topic, key, verificationName)
       val future = producer.send(record)
 
@@ -231,17 +130,17 @@ class BasicHealthTest
       val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
       println(s"Creating trigger $triggerName")
 
-      val username = kafkaUtils.getAsJson("user")
-      val password = kafkaUtils.getAsJson("password")
-      val admin_url = kafkaUtils.getAsJson("kafka_admin_url")
-      val brokers = kafkaUtils.getAsJson("brokers")
+      val username = getAsJson("user")
+      val password = getAsJson("password")
+      val admin_url = getAsJson("kafka_admin_url")
+      val brokers = getAsJson("brokers")
 
       val feedCreationResult = assetHelper.withCleaner(wsk.trigger, 
triggerName) {
         (trigger, _) =>
           trigger.create(triggerName, feed = Some(actionName), parameters = 
Map(
             "user" -> username,
             "password" -> password,
-            "api_key" -> kafkaUtils.getAsJson("api_key"),
+            "api_key" -> getAsJson("api_key"),
             "kafka_admin_url" -> admin_url,
             "kafka_brokers_sasl" -> brokers,
             "topic" -> topic.toJson,
diff --git a/tests/src/test/scala/system/packages/KafkaProduceTests.scala 
b/tests/src/test/scala/system/packages/KafkaProduceTests.scala
index 85d5007..e3d9094 100644
--- a/tests/src/test/scala/system/packages/KafkaProduceTests.scala
+++ b/tests/src/test/scala/system/packages/KafkaProduceTests.scala
@@ -45,7 +45,8 @@ class KafkaProduceTests
     with BeforeAndAfterAll
     with TestHelpers
     with WskTestHelpers
-    with JsHelpers {
+    with JsHelpers
+    with KafkaUtils {
 
     val topic = "test"
     val sessionTimeout = 10 seconds
@@ -56,8 +57,6 @@ class KafkaProduceTests
     val actionName = "kafkaProduceAction"
     val actionFile = "../action/kafkaProduce.py"
 
-    val kafkaUtils = new KafkaUtils
-
     behavior of "Kafka Produce action"
 
     override def beforeAll() {
@@ -73,7 +72,7 @@ class KafkaProduceTests
     def testMissingParameter(missingParam : String) = {
         var fullParamsMap = Map(
             "topic" -> topic.toJson,
-            "brokers" -> kafkaUtils.getAsJson("brokers"),
+            "brokers" -> getAsJson("brokers"),
             "value" -> "This will fail".toJson)
         var missingParamsMap = fullParamsMap.filterKeys(_ != missingParam)
 
diff --git a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala 
b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
index 04ed5c9..4263a1d 100644
--- a/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubFeedTests.scala
@@ -36,7 +36,6 @@ import common.WskActorSystem
 import common.WskProps
 import common.WskTestHelpers
 import ActionHelper._
-
 import common.TestUtils.NOT_FOUND
 import whisk.utils.retry
 
@@ -49,19 +48,15 @@ class MessageHubFeedTests
   with BeforeAndAfterAll
   with TestHelpers
   with WskTestHelpers
-  with JsHelpers {
+  with JsHelpers
+  with KafkaUtils {
 
   val topic = "test"
   val sessionTimeout = 10 seconds
-
   val messagingPackage = "/whisk.system/messaging"
   val messageHubFeed = "messageHubFeed"
   val messageHubProduce = "messageHubProduce"
-
   val consumerInitTime = 10000 // ms
-
-  val kafkaUtils = new KafkaUtils
-
   val maxRetries = System.getProperty("max.retries", "60").toInt
 
   implicit val wskprops = WskProps()
@@ -126,14 +121,13 @@ class MessageHubFeedTests
 
     (wp, assetHelper) =>
       val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
-      println(s"Creating trigger ${triggerName}")
 
       createTrigger(assetHelper, triggerName, parameters = Map(
-        "user" -> kafkaUtils.getAsJson("user"),
-        "password" -> kafkaUtils.getAsJson("password"),
-        "api_key" -> kafkaUtils.getAsJson("api_key"),
-        "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
-        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+        "user" -> getAsJson("user"),
+        "password" -> getAsJson("password"),
+        "api_key" -> getAsJson("api_key"),
+        "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+        "kafka_brokers_sasl" -> getAsJson("brokers"),
         "topic" -> topic.toJson,
         "isBinaryKey" -> false.toJson,
         "isBinaryValue" -> false.toJson))
@@ -158,20 +152,15 @@ class MessageHubFeedTests
         trigger.get(name, NOT_FOUND)
       }
 
-      // It takes a moment for the consumer to fully initialize.
-      println("Giving the consumer a moment to get ready")
-      Thread.sleep(consumerInitTime)
-
       // Rapidly produce two messages whose size are each greater than half 
the allowed payload limit.
       // This should ensure that the feed fires these as two separate triggers.
       println("Rapidly producing two large messages")
-      val producer = kafkaUtils.createProducer()
+      val producer = createProducer()
       val firstMessage = new ProducerRecord(topic, verificationName1, 
generateMessage(s"first${currentTime}", testPayloadSize))
       val secondMessage = new ProducerRecord(topic, verificationName2, 
generateMessage(s"second${currentTime}", testPayloadSize))
       producer.send(firstMessage)
       producer.send(secondMessage)
       producer.close()
-
       retry(wsk.trigger.get(verificationName1), 60, Some(1.second))
       retry(wsk.trigger.get(verificationName2), 60, Some(1.second))
   }
@@ -185,14 +174,13 @@ class MessageHubFeedTests
 
     (wp, assetHelper) =>
       val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
-      println(s"Creating trigger ${triggerName}")
 
       createTrigger(assetHelper, triggerName, parameters = Map(
-        "user" -> kafkaUtils.getAsJson("user"),
-        "password" -> kafkaUtils.getAsJson("password"),
-        "api_key" -> kafkaUtils.getAsJson("api_key"),
-        "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
-        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+        "user" -> getAsJson("user"),
+        "password" -> getAsJson("password"),
+        "api_key" -> getAsJson("api_key"),
+        "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+        "kafka_brokers_sasl" -> getAsJson("brokers"),
         "topic" -> topic.toJson,
         "isBinaryKey" -> false.toJson,
         "isBinaryValue" -> false.toJson))
@@ -211,12 +199,8 @@ class MessageHubFeedTests
 
       wsk.trigger.get(verificationName, NOT_FOUND)
 
-      // It takes a moment for the consumer to fully initialize.
-      println("Giving the consumer a moment to get ready")
-      Thread.sleep(consumerInitTime)
-
       println("Producing an oversized message")
-      val producer = kafkaUtils.createProducer()
+      val producer = createProducer()
       val bigMessage = new ProducerRecord(topic, verificationName, 
generateMessage(s"${currentTime}", testPayloadSize))
       producer.send(bigMessage)
       producer.close()
@@ -229,17 +213,15 @@ class MessageHubFeedTests
 
     (wp, assetHelper) =>
       val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
-      println(s"Creating trigger ${triggerName}")
-
-      val username = kafkaUtils.getAsJson("user")
-      val password = kafkaUtils.getAsJson("password")
-      val admin_url = kafkaUtils.getAsJson("kafka_admin_url")
-      val brokers = kafkaUtils.getAsJson("brokers")
+      val username = getAsJson("user")
+      val password = getAsJson("password")
+      val admin_url = getAsJson("kafka_admin_url")
+      val brokers = getAsJson("brokers")
 
       createTrigger(assetHelper, triggerName, parameters = Map(
         "user" -> username,
         "password" -> password,
-        "api_key" -> kafkaUtils.getAsJson("api_key"),
+        "api_key" -> getAsJson("api_key"),
         "kafka_admin_url" -> admin_url,
         "kafka_brokers_sasl" -> brokers,
         "topic" -> topic.toJson,
@@ -263,17 +245,15 @@ class MessageHubFeedTests
 
     (wp, assetHelper) =>
       val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
-      println(s"Creating trigger $triggerName")
-
-      val username = kafkaUtils.getAsJson("user")
-      val password = kafkaUtils.getAsJson("password")
-      val admin_url = kafkaUtils.getAsJson("kafka_admin_url")
-      val brokers = kafkaUtils.getAsJson("brokers")
+      val username = getAsJson("user")
+      val password = getAsJson("password")
+      val admin_url = getAsJson("kafka_admin_url")
+      val brokers = getAsJson("brokers")
 
       createTrigger(assetHelper, triggerName, parameters = Map(
         "user" -> username,
         "password" -> password,
-        "api_key" -> kafkaUtils.getAsJson("api_key"),
+        "api_key" -> getAsJson("api_key"),
         "kafka_admin_url" -> admin_url,
         "kafka_brokers_sasl" -> brokers,
         "topic" -> topic.toJson,
@@ -299,31 +279,22 @@ class MessageHubFeedTests
 
     (wp, assetHelper) =>
       val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
-      println(s"Creating trigger $triggerName")
-
-      val username = kafkaUtils.getAsJson("user")
-      val password = kafkaUtils.getAsJson("password")
-      val admin_url = kafkaUtils.getAsJson("kafka_admin_url")
-      val brokers = kafkaUtils.getAsJson("brokers")
-
-      val feedCreationResult = assetHelper.withCleaner(wsk.trigger, 
triggerName) {
-        (trigger, _) =>
-          trigger.create(triggerName, feed = Some(actionName), parameters = 
Map(
-            "user" -> username,
-            "password" -> password,
-            "api_key" -> kafkaUtils.getAsJson("api_key"),
-            "kafka_admin_url" -> admin_url,
-            "kafka_brokers_sasl" -> brokers,
-            "topic" -> topic.toJson,
-            "isJSONData" -> true.toJson,
-            "isBinaryKey" -> false.toJson,
-            "isBinaryValue" -> false.toJson
-          ))
-      }
+      val username = getAsJson("user")
+      val password = getAsJson("password")
+      val admin_url = getAsJson("kafka_admin_url")
+      val brokers = getAsJson("brokers")
 
-      withActivation(wsk.activation, feedCreationResult, initialWait = 5 
seconds, totalWait = 60 seconds) {
-        _.response.success shouldBe true
-      }
+      createTrigger(assetHelper, triggerName, parameters = Map(
+        "user" -> username,
+        "password" -> password,
+        "api_key" -> getAsJson("api_key"),
+        "kafka_admin_url" -> admin_url,
+        "kafka_brokers_sasl" -> brokers,
+        "topic" -> topic.toJson,
+        "isJSONData" -> true.toJson,
+        "isBinaryKey" -> false.toJson,
+        "isBinaryValue" -> false.toJson
+      ))
 
       val readRunResult = wsk.action.invoke(actionName, parameters = Map(
         "triggerName" -> triggerName.toJson,
@@ -384,14 +355,13 @@ class MessageHubFeedTests
     (wp, assetHelper) =>
       val key = "TheKey"
       val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
-      println(s"Creating trigger $triggerName")
 
       createTrigger(assetHelper, triggerName, parameters = Map(
-        "user" -> kafkaUtils.getAsJson("user"),
-        "password" -> kafkaUtils.getAsJson("password"),
-        "api_key" -> kafkaUtils.getAsJson("api_key"),
-        "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
-        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+        "user" -> getAsJson("user"),
+        "password" -> getAsJson("password"),
+        "api_key" -> getAsJson("api_key"),
+        "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+        "kafka_brokers_sasl" -> getAsJson("brokers"),
         "topic" -> topic.toJson
       ))
 
@@ -411,14 +381,11 @@ class MessageHubFeedTests
         trigger.get(name, NOT_FOUND)
       }
 
-      println("Giving the consumer a moment to get ready")
-      Thread.sleep(consumerInitTime)
-
       println("Producing a message")
       withActivation(wsk.activation, 
wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
-        "user" -> kafkaUtils.getAsJson("user"),
-        "password" -> kafkaUtils.getAsJson("password"),
-        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+        "user" -> getAsJson("user"),
+        "password" -> getAsJson("password"),
+        "kafka_brokers_sasl" -> getAsJson("brokers"),
         "topic" -> topic.toJson,
         "key" -> key.toJson,
         "value" -> verificationName1.toJson
@@ -455,9 +422,9 @@ class MessageHubFeedTests
 
       println("Producing a message")
       withActivation(wsk.activation, 
wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
-        "user" -> kafkaUtils.getAsJson("user"),
-        "password" -> kafkaUtils.getAsJson("password"),
-        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+        "user" -> getAsJson("user"),
+        "password" -> getAsJson("password"),
+        "kafka_brokers_sasl" -> getAsJson("brokers"),
         "topic" -> topic.toJson,
         "key" -> key.toJson,
         "value" -> verificationName2.toJson
@@ -474,16 +441,15 @@ class MessageHubFeedTests
     (wp, assetHelper) =>
       val key = "TheKey"
       val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
-      println(s"Creating trigger $triggerName")
 
       createTrigger(assetHelper, triggerName, parameters = Map(
         "__bx_creds" -> Map(
           "messagehub" -> Map(
-            "user" -> kafkaUtils.getAsJson("user"),
-            "password" -> kafkaUtils.getAsJson("password"),
-            "api_key" -> kafkaUtils.getAsJson("api_key"),
-            "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
-            "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"))).toJson,
+            "user" -> getAsJson("user"),
+            "password" -> getAsJson("password"),
+            "api_key" -> getAsJson("api_key"),
+            "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+            "kafka_brokers_sasl" -> getAsJson("brokers"))).toJson,
         "topic" -> topic.toJson
       ))
 
@@ -508,9 +474,9 @@ class MessageHubFeedTests
 
       println("Producing a message")
       withActivation(wsk.activation, 
wsk.action.invoke(s"$messagingPackage/$messageHubProduce", Map(
-        "user" -> kafkaUtils.getAsJson("user"),
-        "password" -> kafkaUtils.getAsJson("password"),
-        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+        "user" -> getAsJson("user"),
+        "password" -> getAsJson("password"),
+        "kafka_brokers_sasl" -> getAsJson("brokers"),
         "topic" -> topic.toJson,
         "key" -> key.toJson,
         "value" -> verificationName1.toJson
@@ -521,19 +487,6 @@ class MessageHubFeedTests
       retry(wsk.trigger.get(verificationName1), 60, Some(1.second))
   }
 
-  def createTrigger(assetHelper: AssetCleaner, name: String, parameters: 
Map[String, spray.json.JsValue]) = {
-    val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) {
-      (trigger, _) =>
-        trigger.create(name, feed = 
Some(s"$messagingPackage/$messageHubFeed"), parameters = parameters)
-    }
-
-    withActivation(wsk.activation, feedCreationResult, initialWait = 5 
seconds, totalWait = 60 seconds) {
-      activation =>
-        // should be successful
-        activation.response.success shouldBe true
-    }
-  }
-
   def generateMessage(prefix: String, size: Int): String = {
     val longString = Array.fill[String](size)("0").mkString("")
     s"${prefix}${longString}"
diff --git 
a/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala 
b/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
index 4170ab2..baedd0c 100644
--- a/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
+++ b/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
@@ -19,7 +19,6 @@ package system.packages
 import system.utils.KafkaUtils
 
 import scala.concurrent.duration.DurationInt
-import scala.language.postfixOps
 import org.junit.runner.RunWith
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FlatSpec
@@ -49,7 +48,8 @@ class MessageHubMultiWorkersTest extends FlatSpec
   with TestHelpers
   with WskTestHelpers
   with JsHelpers
-  with StreamLogging {
+  with StreamLogging
+  with KafkaUtils {
 
   val topic = "test"
 
@@ -58,7 +58,6 @@ class MessageHubMultiWorkersTest extends FlatSpec
 
   val messagingPackage = "/whisk.system/messaging"
   val messageHubFeed = "messageHubFeed"
-
   val dbProtocol = WhiskProperties.getProperty("db.protocol")
   val dbHost = WhiskProperties.getProperty("db.host")
   val dbPort = WhiskProperties.getProperty("db.port").toInt
@@ -66,11 +65,8 @@ class MessageHubMultiWorkersTest extends FlatSpec
   val dbPassword = WhiskProperties.getProperty("db.password")
   val dbPrefix = WhiskProperties.getProperty(WhiskConfig.dbPrefix)
   val dbName = s"${dbPrefix}ow_kafka_triggers"
-
   val client = new ExtendedCouchDbRestClient(dbProtocol, dbHost, dbPort, 
dbUsername, dbPassword, dbName)
 
-  val kafkaUtils = new KafkaUtils
-
   behavior of "Mussage Hub Feed"
 
   ignore should "assign two triggers to same worker when only worker0 is 
available" in withAssetCleaner(wskprops) {
@@ -187,26 +183,13 @@ class MessageHubMultiWorkersTest extends FlatSpec
       })
   }
 
-  def createTrigger(assetHelper: AssetCleaner, name: String, parameters: 
Map[String, spray.json.JsValue]) = {
-    val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) {
-      (trigger, _) =>
-        trigger.create(name, feed = 
Some(s"$messagingPackage/$messageHubFeed"), parameters = parameters)
-    }
-
-    withActivation(wsk.activation, feedCreationResult, initialWait = 5 
seconds, totalWait = 60 seconds) {
-      activation =>
-        // should be successful
-        activation.response.success shouldBe true
-    }
-  }
-
   def constructParams(workers: List[String]) = {
     Map(
-      "user" -> kafkaUtils.getAsJson("user"),
-      "password" -> kafkaUtils.getAsJson("password"),
-      "api_key" -> kafkaUtils.getAsJson("api_key"),
-      "kafka_admin_url" -> kafkaUtils.getAsJson("kafka_admin_url"),
-      "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+      "user" -> getAsJson("user"),
+      "password" -> getAsJson("password"),
+      "api_key" -> getAsJson("api_key"),
+      "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+      "kafka_brokers_sasl" -> getAsJson("brokers"),
       "topic" -> topic.toJson,
       "workers" -> workers.toJson
     )
diff --git a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala 
b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
index 5cddefb..4ba4064 100644
--- a/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
+++ b/tests/src/test/scala/system/packages/MessageHubProduceTests.scala
@@ -52,7 +52,8 @@ class MessageHubProduceTests
     with BeforeAndAfterAll
     with TestHelpers
     with WskTestHelpers
-    with JsHelpers {
+    with JsHelpers
+    with KafkaUtils {
 
     val topic = "test"
     val sessionTimeout = 10 seconds
@@ -63,19 +64,15 @@ class MessageHubProduceTests
     val messagingPackage = "/whisk.system/messaging"
     val messageHubFeed = "messageHubFeed"
     val messageHubProduce = "messageHubProduce"
-
     val consumerInitTime = 10000 // ms
-
-    val kafkaUtils = new KafkaUtils
-
     val maxRetries = System.getProperty("max.retries", "60").toInt
 
     // these parameter values are 100% valid and should work as-is
     val validParameters = Map(
-        "user" -> kafkaUtils.getAsJson("user"),
-        "password" -> kafkaUtils.getAsJson("password"),
+        "user" -> getAsJson("user"),
+        "password" -> getAsJson("password"),
         "topic" -> topic.toJson,
-        "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+        "kafka_brokers_sasl" -> getAsJson("brokers"),
         "value" -> "Big Trouble is actually a really good Tim Allen movie. 
Seriously.".toJson)
 
     behavior of "Message Hub Produce action"
@@ -135,26 +132,14 @@ class MessageHubProduceTests
 
         (wp, assetHelper) =>
             val triggerName = s"/_/binaryValueTrigger-$currentTime"
-            println(s"Creating trigger ${triggerName}")
-
-            val feedCreationResult = assetHelper.withCleaner(wsk.trigger, 
triggerName) {
-                (trigger, _) =>
-                    trigger.create(triggerName, feed = 
Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
-                        "user" -> kafkaUtils.getAsJson("user"),
-                        "password" -> kafkaUtils.getAsJson("password"),
-                        "api_key" -> kafkaUtils.getAsJson("api_key"),
-                        "kafka_admin_url" -> 
kafkaUtils.getAsJson("kafka_admin_url"),
-                        "kafka_brokers_sasl" -> 
kafkaUtils.getAsJson("brokers"),
-                        "topic" -> topic.toJson))
-            }
 
-            withActivation(wsk.activation, feedCreationResult, initialWait = 5 
seconds, totalWait = 60 seconds) {
-                _.response.success shouldBe true
-            }
-
-            // It takes a moment for the consumer to fully initialize.
-            println("Giving the consumer a moment to get ready")
-            Thread.sleep(consumerInitTime)
+            createTrigger(assetHelper, triggerName, parameters = Map(
+                "user" -> getAsJson("user"),
+                "password" -> getAsJson("password"),
+                "api_key" -> getAsJson("api_key"),
+                "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+                "kafka_brokers_sasl" -> getAsJson("brokers"),
+                "topic" -> topic.toJson))
 
             val defaultAction = Some("dat/createTriggerActions.js")
             val defaultActionName = s"helloKafka-${currentTime}"
@@ -191,26 +176,14 @@ class MessageHubProduceTests
 
         (wp, assetHelper) =>
             val triggerName = s"/_/binaryKeyTrigger-$currentTime"
-            println(s"Creating trigger ${triggerName}")
-
-            val feedCreationResult = assetHelper.withCleaner(wsk.trigger, 
triggerName) {
-                (trigger, _) =>
-                    trigger.create(triggerName, feed = 
Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
-                        "user" -> kafkaUtils.getAsJson("user"),
-                        "password" -> kafkaUtils.getAsJson("password"),
-                        "api_key" -> kafkaUtils.getAsJson("api_key"),
-                        "kafka_admin_url" -> 
kafkaUtils.getAsJson("kafka_admin_url"),
-                        "kafka_brokers_sasl" -> 
kafkaUtils.getAsJson("brokers"),
-                        "topic" -> topic.toJson))
-            }
-
-            withActivation(wsk.activation, feedCreationResult, initialWait = 5 
seconds, totalWait = 60 seconds) {
-                _.response.success shouldBe true
-            }
 
-            // It takes a moment for the consumer to fully initialize.
-            println("Giving the consumer a moment to get ready")
-            Thread.sleep(consumerInitTime)
+            createTrigger(assetHelper, triggerName, parameters = Map(
+                "user" -> getAsJson("user"),
+                "password" -> getAsJson("password"),
+                "api_key" -> getAsJson("api_key"),
+                "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+                "kafka_brokers_sasl" -> getAsJson("brokers"),
+                "topic" -> topic.toJson))
 
             val defaultAction = Some("dat/createTriggerActionsFromKey.js")
             val defaultActionName = s"helloKafka-${currentTime}"
diff --git a/tests/src/test/scala/system/stress/StressTest.scala 
b/tests/src/test/scala/system/stress/StressTest.scala
index b174323..bd413e0 100644
--- a/tests/src/test/scala/system/stress/StressTest.scala
+++ b/tests/src/test/scala/system/stress/StressTest.scala
@@ -42,7 +42,8 @@ class BasicStressTest
     with Matchers
     with WskActorSystem
     with TestHelpers
-    with WskTestHelpers {
+    with WskTestHelpers
+    with KafkaUtils {
 
     val topic = "test"
     val sessionTimeout = 10 seconds
@@ -54,8 +55,6 @@ class BasicStressTest
     val messageHubFeed = "messageHubFeed"
     val messageHubProduce = "messageHubProduce"
 
-    val kafkaUtils = new KafkaUtils
-
     behavior of "Message Hub provider"
 
     it should "rapidly create and delete many triggers" in {
@@ -80,11 +79,11 @@ class BasicStressTest
             val triggerName = s"/_/dummyMessageHubTrigger-$currentTime"
             println(s"\nCreating trigger #${iterationLabel}: ${triggerName}")
             val feedCreationResult = wsk.trigger.create(triggerName, feed = 
Some(s"$messagingPackage/$messageHubFeed"), parameters = Map(
-                    "user" -> kafkaUtils.getAsJson("user"),
-                    "password" -> kafkaUtils.getAsJson("password"),
-                    "api_key" -> kafkaUtils.getAsJson("api_key"),
-                    "kafka_admin_url" -> 
kafkaUtils.getAsJson("kafka_admin_url"),
-                    "kafka_brokers_sasl" -> kafkaUtils.getAsJson("brokers"),
+                    "user" -> getAsJson("user"),
+                    "password" -> getAsJson("password"),
+                    "api_key" -> getAsJson("api_key"),
+                    "kafka_admin_url" -> getAsJson("kafka_admin_url"),
+                    "kafka_brokers_sasl" -> getAsJson("brokers"),
                     "topic" -> topic.toJson))
 
             println("Waiting for trigger create")
diff --git a/tests/src/test/scala/system/utils/KafkaUtils.scala 
b/tests/src/test/scala/system/utils/KafkaUtils.scala
index be17d6c..e01c715 100644
--- a/tests/src/test/scala/system/utils/KafkaUtils.scala
+++ b/tests/src/test/scala/system/utils/KafkaUtils.scala
@@ -17,24 +17,29 @@
 
 package system.utils
 
-import common.TestUtils
-
 import java.util.HashMap
 import java.util.Properties
+
+import com.jayway.restassured.RestAssured
+import com.jayway.restassured.config.{RestAssuredConfig, SSLConfig}
 import javax.security.auth.login.Configuration
 import javax.security.auth.login.AppConfigurationEntry
-
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.KafkaProducer
 
 import scala.collection.mutable.ListBuffer
-
 import spray.json.DefaultJsonProtocol._
 import spray.json._
-
+import system.packages.ActionHelper._
 import whisk.utils.JsHelpers
 
+import scala.concurrent.duration.DurationInt
+import scala.language.postfixOps
+import common.TestHelpers
+import common.TestUtils
+import common.WskTestHelpers
+import whisk.utils.retry
 
-class KafkaUtils {
+trait KafkaUtils extends TestHelpers with WskTestHelpers {
     lazy val messageHubProps = KafkaUtils.initializeMessageHub()
 
     def createProducer() : KafkaProducer[String, String] = {
@@ -52,9 +57,71 @@ class KafkaUtils {
             case key => this(key).asInstanceOf[String].toJson
         }
     }
+
+    val sslconfig = {
+        val inner = new SSLConfig().allowAllHostnames()
+        val config = inner.relaxedHTTPSValidation()
+        new RestAssuredConfig().sslConfig(config)
+    }
+
+    def createTrigger(assetHelper: AssetCleaner, name: String, parameters: 
Map[String, spray.json.JsValue]) = {
+        println(s"Creating trigger $name")
+
+        val feedCreationResult = assetHelper.withCleaner(wsk.trigger, name) {
+            (trigger, _) =>
+                trigger.create(name, feed = 
Some(s"/whisk.system/messaging/messageHubFeed"), parameters = parameters)
+        }
+
+        withActivation(wsk.activation, feedCreationResult, initialWait = 5 
seconds, totalWait = 60 seconds) {
+            activation =>
+                // should be successful
+                activation.response.success shouldBe true
+
+                // It takes a moment for the consumer to fully initialize.
+                println("Giving the consumer a moment to get ready")
+                Thread.sleep(KafkaUtils.consumerInitTime)
+
+                val uuid = 
activation.response.result.get.fields.get("uuid").get.toString().replaceAll("\"",
 "")
+                consumerExists(uuid)
+        }
+    }
+
+
+    def consumerExists(uuid: String) = {
+        println("Checking health endpoint(s) for existence of consumer uuid")
+        // get /health endpoint(s) and ensure it contains the new uuid
+        val healthUrls: Array[String] = 
System.getProperty("health_url").split("\\s*,\\s*").filterNot(_.isEmpty)
+        assert(healthUrls.size != 0)
+
+        retry({
+            val uuids: Array[(String, JsValue)] = healthUrls.flatMap(u => {
+                val response = RestAssured.given().config(sslconfig).get(u)
+                assert(response.statusCode() == 200)
+
+                response.asString()
+                  .parseJson
+                  .asJsObject
+                  .getFields("consumers")
+                  .head
+                  .convertTo[JsArray]
+                  .elements
+                  .flatMap(c => {
+                      val consumer = c.asJsObject.fields.head
+                      consumer match {
+                          case (u, v) if u == uuid && 
v.asJsObject.getFields("currentState").head == "Running".toJson => 
Some(consumer)
+                          case _ => None
+                      }
+                  })
+            })
+
+            assert(uuids.nonEmpty)
+        }, N = 60, waitBeforeRetry = Some(1.second))
+    }
 }
 
 object KafkaUtils {
+    val consumerInitTime = 10000 // ms
+
     def asKafkaProducerProps(props : Map[String,Object]) : Properties = {
         val requiredKeys = List("brokers",
                                 "user",

Reply via email to