dubeejw closed pull request #260: Update MessageHubMultiWorkersTest to use 
ExtendedCouchDbRestClient 
URL: https://github.com/apache/incubator-openwhisk-package-kafka/pull/260
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala 
b/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
index f6ce723..7ea14ec 100644
--- a/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
+++ b/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala
@@ -20,24 +20,26 @@ import system.utils.KafkaUtils
 
 import scala.concurrent.duration.DurationInt
 import scala.language.postfixOps
-
 import org.junit.runner.RunWith
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FlatSpec
 import org.scalatest.Matchers
 import org.scalatest.junit.JUnitRunner
-
 import common.JsHelpers
 import common.TestHelpers
 import common.StreamLogging
+import common.WhiskProperties
 import common.Wsk
 import common.WskActorSystem
 import common.WskProps
 import common.WskTestHelpers
 import spray.json.DefaultJsonProtocol._
 import spray.json.{pimpAny, _}
-import whisk.core.database.test.DatabaseScriptTestUtils
-import whisk.utils.JsHelpers
+import whisk.core.WhiskConfig
+import whisk.core.database.test.ExtendedCouchDbRestClient
+import whisk.utils.{JsHelpers, retry}
+
+import scala.concurrent.Await
 
 @RunWith(classOf[JUnitRunner])
 class MessageHubMultiWorkersTest extends FlatSpec
@@ -47,8 +49,7 @@ class MessageHubMultiWorkersTest extends FlatSpec
   with TestHelpers
   with WskTestHelpers
   with JsHelpers
-  with StreamLogging
-  with DatabaseScriptTestUtils {
+  with StreamLogging {
 
   val topic = "test"
 
@@ -57,8 +58,17 @@ class MessageHubMultiWorkersTest extends FlatSpec
 
   val messagingPackage = "/whisk.system/messaging"
   val messageHubFeed = "messageHubFeed"
+
+  val dbProtocol = WhiskProperties.getProperty("db.protocol")
+  val dbHost = WhiskProperties.getProperty("db.host")
+  val dbPort = WhiskProperties.getProperty("db.port").toInt
+  val dbUsername = WhiskProperties.getProperty("db.username")
+  val dbPassword = WhiskProperties.getProperty("db.password")
+  val dbPrefix = WhiskProperties.getProperty(WhiskConfig.dbPrefix)
   val dbName = s"${dbPrefix}ow_kafka_triggers"
 
+  val client = new ExtendedCouchDbRestClient(dbProtocol, dbHost, dbPort, 
dbUsername, dbPassword, dbName)
+
   val kafkaUtils = new KafkaUtils
 
   behavior of "Mussage Hub Feed"
@@ -76,10 +86,14 @@ class MessageHubMultiWorkersTest extends FlatSpec
       createTrigger(assetHelper, firstTrigger, parameters)
       createTrigger(assetHelper, secondTrigger, parameters)
 
-      val documents = 
getAllDocs(dbName).fields("rows").convertTo[List[JsObject]]
+      retry({
+        val result = Await.result(client.getAllDocs(includeDocs = Some(true)), 
15.seconds)
+        result should be('right)
+        val documents = 
result.right.get.fields("rows").convertTo[List[JsObject]]
 
-      validateTriggerAssignment(documents, firstTrigger, worker0)
-      validateTriggerAssignment(documents, secondTrigger, worker0)
+        validateTriggerAssignment(documents, firstTrigger, worker0)
+        validateTriggerAssignment(documents, secondTrigger, worker0)
+      })
   }
 
   it should "assign a trigger to worker0 and a trigger to worker1 when both 
workers are available" in withAssetCleaner(wskprops) {
@@ -96,10 +110,14 @@ class MessageHubMultiWorkersTest extends FlatSpec
       createTrigger(assetHelper, firstTrigger, parameters)
       createTrigger(assetHelper, secondTrigger, parameters)
 
-      val documents = 
getAllDocs(dbName).fields("rows").convertTo[List[JsObject]]
+      retry({
+        val result = Await.result(client.getAllDocs(includeDocs = Some(true)), 
15.seconds)
+        result should be('right)
+        val documents = 
result.right.get.fields("rows").convertTo[List[JsObject]]
 
-      validateTriggerAssignment(documents, firstTrigger, worker0)
-      validateTriggerAssignment(documents, secondTrigger, worker1)
+        validateTriggerAssignment(documents, firstTrigger, worker0)
+        validateTriggerAssignment(documents, secondTrigger, worker1)
+      })
   }
 
   it should "assign a trigger to worker1 when worker0 is removed and there is 
an assignment imbalance" in withAssetCleaner(wskprops) {
@@ -120,12 +138,16 @@ class MessageHubMultiWorkersTest extends FlatSpec
       createTrigger(assetHelper, thirdTrigger, parameters = 
constructParams(List(worker0, worker1)))
       createTrigger(assetHelper, fourthTrigger, parameters = 
constructParams(List(worker1)))
 
-      val documents = 
getAllDocs(dbName).fields("rows").convertTo[List[JsObject]]
+      retry({
+        val result = Await.result(client.getAllDocs(includeDocs = Some(true)), 
15.seconds)
+        result should be('right)
+        val documents = 
result.right.get.fields("rows").convertTo[List[JsObject]]
 
-      validateTriggerAssignment(documents, firstTrigger, worker1)
-      validateTriggerAssignment(documents, secondTrigger, worker1)
-      validateTriggerAssignment(documents, thirdTrigger, worker0)
-      validateTriggerAssignment(documents, fourthTrigger, worker1)
+        validateTriggerAssignment(documents, firstTrigger, worker1)
+        validateTriggerAssignment(documents, secondTrigger, worker1)
+        validateTriggerAssignment(documents, thirdTrigger, worker0)
+        validateTriggerAssignment(documents, fourthTrigger, worker1)
+      })
   }
 
   it should "balance the load accross workers when a worker is added" in 
withAssetCleaner(wskprops) {
@@ -151,14 +173,18 @@ class MessageHubMultiWorkersTest extends FlatSpec
       createTrigger(assetHelper, fifthTrigger, updatedParameters)
       createTrigger(assetHelper, sixthTrigger, updatedParameters)
 
-      val documents = 
getAllDocs(dbName).fields("rows").convertTo[List[JsObject]]
-
-      validateTriggerAssignment(documents, firstTrigger, worker0)
-      validateTriggerAssignment(documents, secondTrigger, worker0)
-      validateTriggerAssignment(documents, thirdTrigger, worker1)
-      validateTriggerAssignment(documents, fourthTrigger, worker1)
-      validateTriggerAssignment(documents, fifthTrigger, worker0)
-      validateTriggerAssignment(documents, sixthTrigger, worker1)
+      retry({
+        val result = Await.result(client.getAllDocs(includeDocs = Some(true)), 
15.seconds)
+        result should be('right)
+        val documents = 
result.right.get.fields("rows").convertTo[List[JsObject]]
+
+        validateTriggerAssignment(documents, firstTrigger, worker0)
+        validateTriggerAssignment(documents, secondTrigger, worker0)
+        validateTriggerAssignment(documents, thirdTrigger, worker1)
+        validateTriggerAssignment(documents, fourthTrigger, worker1)
+        validateTriggerAssignment(documents, fifthTrigger, worker0)
+        validateTriggerAssignment(documents, sixthTrigger, worker1)
+      })
   }
 
   def createTrigger(assetHelper: AssetCleaner, name: String, parameters: 
Map[String, spray.json.JsValue]) = {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to