dubeejw closed pull request #260: Update MessageHubMultiWorkersTest to use ExtendedCouchDbRestClient URL: https://github.com/apache/incubator-openwhisk-package-kafka/pull/260
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala b/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala index f6ce723..7ea14ec 100644 --- a/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala +++ b/tests/src/test/scala/system/packages/MessageHubMultiWorkersTest.scala @@ -20,24 +20,26 @@ import system.utils.KafkaUtils import scala.concurrent.duration.DurationInt import scala.language.postfixOps - import org.junit.runner.RunWith import org.scalatest.BeforeAndAfterAll import org.scalatest.FlatSpec import org.scalatest.Matchers import org.scalatest.junit.JUnitRunner - import common.JsHelpers import common.TestHelpers import common.StreamLogging +import common.WhiskProperties import common.Wsk import common.WskActorSystem import common.WskProps import common.WskTestHelpers import spray.json.DefaultJsonProtocol._ import spray.json.{pimpAny, _} -import whisk.core.database.test.DatabaseScriptTestUtils -import whisk.utils.JsHelpers +import whisk.core.WhiskConfig +import whisk.core.database.test.ExtendedCouchDbRestClient +import whisk.utils.{JsHelpers, retry} + +import scala.concurrent.Await @RunWith(classOf[JUnitRunner]) class MessageHubMultiWorkersTest extends FlatSpec @@ -47,8 +49,7 @@ class MessageHubMultiWorkersTest extends FlatSpec with TestHelpers with WskTestHelpers with JsHelpers - with StreamLogging - with DatabaseScriptTestUtils { + with StreamLogging { val topic = "test" @@ -57,8 +58,17 @@ class MessageHubMultiWorkersTest extends FlatSpec val messagingPackage = "/whisk.system/messaging" val messageHubFeed = "messageHubFeed" + + val dbProtocol = WhiskProperties.getProperty("db.protocol") + val dbHost = WhiskProperties.getProperty("db.host") + val dbPort = WhiskProperties.getProperty("db.port").toInt + val dbUsername = WhiskProperties.getProperty("db.username") + val dbPassword = WhiskProperties.getProperty("db.password") + val dbPrefix = WhiskProperties.getProperty(WhiskConfig.dbPrefix) val dbName = s"${dbPrefix}ow_kafka_triggers" + val client = new ExtendedCouchDbRestClient(dbProtocol, dbHost, dbPort, dbUsername, dbPassword, dbName) + val kafkaUtils = new KafkaUtils behavior of "Mussage Hub Feed" @@ -76,10 +86,14 @@ class MessageHubMultiWorkersTest extends FlatSpec createTrigger(assetHelper, firstTrigger, parameters) createTrigger(assetHelper, secondTrigger, parameters) - val documents = getAllDocs(dbName).fields("rows").convertTo[List[JsObject]] + retry({ + val result = Await.result(client.getAllDocs(includeDocs = Some(true)), 15.seconds) + result should be('right) + val documents = result.right.get.fields("rows").convertTo[List[JsObject]] - validateTriggerAssignment(documents, firstTrigger, worker0) - validateTriggerAssignment(documents, secondTrigger, worker0) + validateTriggerAssignment(documents, firstTrigger, worker0) + validateTriggerAssignment(documents, secondTrigger, worker0) + }) } it should "assign a trigger to worker0 and a trigger to worker1 when both workers are available" in withAssetCleaner(wskprops) { @@ -96,10 +110,14 @@ class MessageHubMultiWorkersTest extends FlatSpec createTrigger(assetHelper, firstTrigger, parameters) createTrigger(assetHelper, secondTrigger, parameters) - val documents = getAllDocs(dbName).fields("rows").convertTo[List[JsObject]] + retry({ + val result = Await.result(client.getAllDocs(includeDocs = Some(true)), 15.seconds) + result should be('right) + val documents = result.right.get.fields("rows").convertTo[List[JsObject]] - validateTriggerAssignment(documents, firstTrigger, worker0) - validateTriggerAssignment(documents, secondTrigger, worker1) + validateTriggerAssignment(documents, firstTrigger, worker0) + validateTriggerAssignment(documents, secondTrigger, worker1) + }) } it should "assign a trigger to worker1 when worker0 is removed and there is an assignment imbalance" in withAssetCleaner(wskprops) { @@ -120,12 +138,16 @@ class MessageHubMultiWorkersTest extends FlatSpec createTrigger(assetHelper, thirdTrigger, parameters = constructParams(List(worker0, worker1))) createTrigger(assetHelper, fourthTrigger, parameters = constructParams(List(worker1))) - val documents = getAllDocs(dbName).fields("rows").convertTo[List[JsObject]] + retry({ + val result = Await.result(client.getAllDocs(includeDocs = Some(true)), 15.seconds) + result should be('right) + val documents = result.right.get.fields("rows").convertTo[List[JsObject]] - validateTriggerAssignment(documents, firstTrigger, worker1) - validateTriggerAssignment(documents, secondTrigger, worker1) - validateTriggerAssignment(documents, thirdTrigger, worker0) - validateTriggerAssignment(documents, fourthTrigger, worker1) + validateTriggerAssignment(documents, firstTrigger, worker1) + validateTriggerAssignment(documents, secondTrigger, worker1) + validateTriggerAssignment(documents, thirdTrigger, worker0) + validateTriggerAssignment(documents, fourthTrigger, worker1) + }) } it should "balance the load accross workers when a worker is added" in withAssetCleaner(wskprops) { @@ -151,14 +173,18 @@ class MessageHubMultiWorkersTest extends FlatSpec createTrigger(assetHelper, fifthTrigger, updatedParameters) createTrigger(assetHelper, sixthTrigger, updatedParameters) - val documents = getAllDocs(dbName).fields("rows").convertTo[List[JsObject]] - - validateTriggerAssignment(documents, firstTrigger, worker0) - validateTriggerAssignment(documents, secondTrigger, worker0) - validateTriggerAssignment(documents, thirdTrigger, worker1) - validateTriggerAssignment(documents, fourthTrigger, worker1) - validateTriggerAssignment(documents, fifthTrigger, worker0) - validateTriggerAssignment(documents, sixthTrigger, worker1) + retry({ + val result = Await.result(client.getAllDocs(includeDocs = Some(true)), 15.seconds) + result should be('right) + val documents = result.right.get.fields("rows").convertTo[List[JsObject]] + + validateTriggerAssignment(documents, firstTrigger, worker0) + validateTriggerAssignment(documents, secondTrigger, worker0) + validateTriggerAssignment(documents, thirdTrigger, worker1) + validateTriggerAssignment(documents, fourthTrigger, worker1) + validateTriggerAssignment(documents, fifthTrigger, worker0) + validateTriggerAssignment(documents, sixthTrigger, worker1) + }) } def createTrigger(assetHelper: AssetCleaner, name: String, parameters: Map[String, spray.json.JsValue]) = { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services