This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 08efb58  Replace remaining usages of parallel collections with 
explicit concurrency. (#4843)
08efb58 is described below

commit 08efb58cf5eef3475bdb41e45db448250bcf9c17
Author: Markus Thömmes <markusthoem...@me.com>
AuthorDate: Thu Feb 27 16:52:11 2020 +0100

    Replace remaining usages of parallel collections with explicit concurrency. 
(#4843)
---
 .../core/database/test/CacheConcurrencyTests.scala | 43 ++++++++++++----------
 1 file changed, 23 insertions(+), 20 deletions(-)

diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/database/test/CacheConcurrencyTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/database/test/CacheConcurrencyTests.scala
index 90c8bb7..0026df8 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/database/test/CacheConcurrencyTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/database/test/CacheConcurrencyTests.scala
@@ -17,9 +17,9 @@
 
 package org.apache.openwhisk.core.database.test
 
-import scala.collection.parallel._
 import scala.concurrent.duration.DurationInt
-import java.util.concurrent.ForkJoinPool
+import java.util.concurrent.Executors
+
 import org.junit.runner.RunWith
 import org.scalatest.BeforeAndAfterEach
 import org.scalatest.FlatSpec
@@ -32,9 +32,17 @@ import spray.json.JsString
 import org.apache.openwhisk.common.TransactionId
 import org.apache.openwhisk.utils.retry
 
-@RunWith(classOf[JUnitRunner])
-class CacheConcurrencyTests extends FlatSpec with WskTestHelpers with 
WskActorSystem with BeforeAndAfterEach {
+import scala.concurrent.ExecutionContext
 
+@RunWith(classOf[JUnitRunner])
+class CacheConcurrencyTests
+    extends FlatSpec
+    with WskTestHelpers
+    with WskActorSystem
+    with BeforeAndAfterEach
+    with ConcurrencyHelpers {
+
+  val timeout = 5.minutes
   println(s"Running tests on # proc: 
${Runtime.getRuntime.availableProcessors()}")
 
   implicit private val transId = TransactionId.testing
@@ -43,15 +51,14 @@ class CacheConcurrencyTests extends FlatSpec with 
WskTestHelpers with WskActorSy
 
   val nExternalIters = 1
   val nInternalIters = 5
-  val nThreads = nInternalIters * 30
+  val nThreads = nInternalIters * 30 // The maximum number of tasks running in 
parallel at any given time
+  val parallelismExecutionContext = 
ExecutionContext.fromExecutor(Executors.newFixedThreadPool(nThreads))
 
-  val parallel = (1 to nInternalIters).par
-  parallel.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(nThreads))
-
-  def run[W](phase: String)(block: String => W) = parallel.map { i =>
-    val name = s"testy${i}"
-    withClue(s"$phase: failed for $name") { (name, block(name)) }
-  }
+  def run[W](phase: String)(block: String => W) =
+    concurrently((1 to nInternalIters), timeout) { i =>
+      val name = s"testy${i}"
+      withClue(s"$phase: failed for $name") { (name, block(name)) }
+    }(parallelismExecutionContext)
 
   override def beforeEach() = {
     run("pre-test sanitize") { name =>
@@ -79,9 +86,7 @@ class CacheConcurrencyTests extends FlatSpec with 
WskTestHelpers with WskActorSy
 
       run("delete+get") { name =>
         // run 30 operations in parallel: 15 get, 1 delete, 14 more get
-        val para = (1 to 30).par
-        para.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(nThreads))
-        para.map { i =>
+        concurrently((1 to 30), timeout) { i =>
           if (i != 16) {
             val rr = wsk.action.get(name, expectedExitCode = DONTCARE_EXIT)
             withClue(s"expecting get to either succeed or fail with not found: 
$rr") {
@@ -91,7 +96,7 @@ class CacheConcurrencyTests extends FlatSpec with 
WskTestHelpers with WskActorSy
           } else {
             wsk.action.delete(name)
           }
-        }
+        }(parallelismExecutionContext)
       }
 
       // Give some time to replicate the state between the controllers
@@ -117,9 +122,7 @@ class CacheConcurrencyTests extends FlatSpec with 
WskTestHelpers with WskActorSy
 
       run("update+get") { name =>
         // run 30 operations in parallel: 15 get, 1 update, 14 more get
-        val para = (1 to 30).par
-        para.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(nThreads))
-        para.map { i =>
+        concurrently((1 to 30), timeout) { i =>
           if (i != 16) {
             val rr = wsk.action.get(name, expectedExitCode = DONTCARE_EXIT)
             withClue(s"expecting get to either succeed or fail with not found: 
$rr") {
@@ -129,7 +132,7 @@ class CacheConcurrencyTests extends FlatSpec with 
WskTestHelpers with WskActorSy
           } else {
             wsk.action.create(name, None, parameters = Map("color" -> 
JsString("blue")), update = true)
           }
-        }
+        }(parallelismExecutionContext)
       }
 
       // All controllers should have the correct action

Reply via email to