This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 08efb58 Replace remaining usages of parallel collections with explicit concurrency. (#4843) 08efb58 is described below commit 08efb58cf5eef3475bdb41e45db448250bcf9c17 Author: Markus Thömmes <markusthoem...@me.com> AuthorDate: Thu Feb 27 16:52:11 2020 +0100 Replace remaining usages of parallel collections with explicit concurrency. (#4843) --- .../core/database/test/CacheConcurrencyTests.scala | 43 ++++++++++++---------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/test/CacheConcurrencyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/test/CacheConcurrencyTests.scala index 90c8bb7..0026df8 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/database/test/CacheConcurrencyTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/database/test/CacheConcurrencyTests.scala @@ -17,9 +17,9 @@ package org.apache.openwhisk.core.database.test -import scala.collection.parallel._ import scala.concurrent.duration.DurationInt -import java.util.concurrent.ForkJoinPool +import java.util.concurrent.Executors + import org.junit.runner.RunWith import org.scalatest.BeforeAndAfterEach import org.scalatest.FlatSpec @@ -32,9 +32,17 @@ import spray.json.JsString import org.apache.openwhisk.common.TransactionId import org.apache.openwhisk.utils.retry -@RunWith(classOf[JUnitRunner]) -class CacheConcurrencyTests extends FlatSpec with WskTestHelpers with WskActorSystem with BeforeAndAfterEach { +import scala.concurrent.ExecutionContext +@RunWith(classOf[JUnitRunner]) +class CacheConcurrencyTests + extends FlatSpec + with WskTestHelpers + with WskActorSystem + with BeforeAndAfterEach + with ConcurrencyHelpers { + + val timeout = 5.minutes println(s"Running tests on # proc: ${Runtime.getRuntime.availableProcessors()}") implicit private val transId = TransactionId.testing @@ -43,15 +51,14 @@ class CacheConcurrencyTests extends FlatSpec with WskTestHelpers with WskActorSy val nExternalIters = 1 val nInternalIters = 5 - val nThreads = nInternalIters * 30 + val nThreads = nInternalIters * 30 // The maximum number of tasks running in parallel at any given time + val parallelismExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(nThreads)) - val parallel = (1 to nInternalIters).par - parallel.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(nThreads)) - - def run[W](phase: String)(block: String => W) = parallel.map { i => - val name = s"testy${i}" - withClue(s"$phase: failed for $name") { (name, block(name)) } - } + def run[W](phase: String)(block: String => W) = + concurrently((1 to nInternalIters), timeout) { i => + val name = s"testy${i}" + withClue(s"$phase: failed for $name") { (name, block(name)) } + }(parallelismExecutionContext) override def beforeEach() = { run("pre-test sanitize") { name => @@ -79,9 +86,7 @@ class CacheConcurrencyTests extends FlatSpec with WskTestHelpers with WskActorSy run("delete+get") { name => // run 30 operations in parallel: 15 get, 1 delete, 14 more get - val para = (1 to 30).par - para.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(nThreads)) - para.map { i => + concurrently((1 to 30), timeout) { i => if (i != 16) { val rr = wsk.action.get(name, expectedExitCode = DONTCARE_EXIT) withClue(s"expecting get to either succeed or fail with not found: $rr") { @@ -91,7 +96,7 @@ class CacheConcurrencyTests extends FlatSpec with WskTestHelpers with WskActorSy } else { wsk.action.delete(name) } - } + }(parallelismExecutionContext) } // Give some time to replicate the state between the controllers @@ -117,9 +122,7 @@ class CacheConcurrencyTests extends FlatSpec with WskTestHelpers with WskActorSy run("update+get") { name => // run 30 operations in parallel: 15 get, 1 update, 14 more get - val para = (1 to 30).par - para.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(nThreads)) - para.map { i => + concurrently((1 to 30), timeout) { i => if (i != 16) { val rr = wsk.action.get(name, expectedExitCode = DONTCARE_EXIT) withClue(s"expecting get to either succeed or fail with not found: $rr") { @@ -129,7 +132,7 @@ class CacheConcurrencyTests extends FlatSpec with WskTestHelpers with WskActorSy } else { wsk.action.create(name, None, parameters = Map("color" -> JsString("blue")), update = true) } - } + }(parallelismExecutionContext) } // All controllers should have the correct action