This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 2312b7d  Recreate HTTP client on Container.resume(). (#4185)
2312b7d is described below

commit 2312b7d6b913fc30bcd56643285fa29aaa18815c
Author: tysonnorris <tysonnor...@gmail.com>
AuthorDate: Mon Jan 21 02:13:48 2019 -0800

    Recreate HTTP client on Container.resume(). (#4185)
    
    reopen connections only once, during Container.resume()
---
 .../openwhisk/core/containerpool/Container.scala   | 44 +++++++++++++++-------
 .../apache/openwhisk/core/mesos/MesosTask.scala    |  4 +-
 .../containerpool/docker/DockerContainer.scala     |  5 ++-
 .../kubernetes/KubernetesContainer.scala           |  3 +-
 .../test/DockerToActivationLogStoreTests.scala     |  2 +-
 .../containerpool/test/ContainerProxyTests.scala   | 18 ++++++---
 6 files changed, 50 insertions(+), 26 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
index a2e2709..354ec38 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala
@@ -62,6 +62,11 @@ object Container {
     loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool)
 }
 
+/**
+ * Abstraction for Container operations.
+ * Container manipulation (specifically suspend/resume/destroy) is NOT 
thread-safe and MUST be synchronized by caller.
+ * Container access (specifically run) is thread-safe (e.g. for concurrent 
activation processing).
+ */
 trait Container {
 
   implicit protected val as: ActorSystem
@@ -73,7 +78,11 @@ trait Container {
   /** HTTP connection to the container, will be lazily established by 
callContainer */
   protected var httpConnection: Option[ContainerClient] = None
 
-  /** Stops the container from consuming CPU cycles. */
+  /** maxConcurrent+timeout are cached during first init, so that resuming 
connections can reference */
+  protected var containerHttpMaxConcurrent: Int = 1
+  protected var containerHttpTimeout: FiniteDuration = 60.seconds
+
+  /** Stops the container from consuming CPU cycles. NOT thread-safe - caller 
must synchronize. */
   def suspend()(implicit transid: TransactionId): Future[Unit] = {
     //close connection first, then close connection pool
     //(testing pool recreation vs connection closing, time was similar - so 
using the simpler recreation approach)
@@ -82,8 +91,11 @@ trait Container {
     closeConnections(toClose)
   }
 
-  /** Dual of halt. */
-  def resume()(implicit transid: TransactionId): Future[Unit]
+  /** Dual of halt. NOT thread-safe - caller must synchronize.*/
+  def resume()(implicit transid: TransactionId): Future[Unit] = {
+    httpConnection = Some(openConnections(containerHttpTimeout, 
containerHttpMaxConcurrent))
+    Future.successful({})
+  }
 
   /** Obtains logs up to a given threshold from the container. Optionally 
waits for a sentinel to appear. */
   def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: 
TransactionId): Source[ByteString, Any]
@@ -101,7 +113,8 @@ trait Container {
       LoggingMarkers.INVOKER_ACTIVATION_INIT,
       s"sending initialization to $id $addr",
       logLevel = InfoLevel)
-
+    containerHttpMaxConcurrent = maxConcurrent
+    containerHttpTimeout = timeout
     val body = JsObject("value" -> initializer)
     callContainer("/init", body, timeout, maxConcurrent, retry = true)
       .andThen { // never fails
@@ -132,7 +145,7 @@ trait Container {
       }
   }
 
-  /** Runs code in the container. */
+  /** Runs code in the container. Thread-safe - caller may invoke concurrently 
for concurrent activation processing. */
   def run(parameters: JsObject, environment: JsObject, timeout: 
FiniteDuration, maxConcurrent: Int)(
     implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = 
{
     val actionName = 
environment.fields.get("action_name").map(_.convertTo[String]).getOrElse("")
@@ -185,15 +198,7 @@ trait Container {
                               retry: Boolean = false)(implicit transid: 
TransactionId): Future[RunResult] = {
     val started = Instant.now()
     val http = httpConnection.getOrElse {
-      val conn = if (Container.config.akkaClient) {
-        new AkkaContainerClient(addr.host, addr.port, timeout, 
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT, 1024)
-      } else {
-        new ApacheBlockingContainerClient(
-          s"${addr.host}:${addr.port}",
-          timeout,
-          ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
-          maxConcurrent)
-      }
+      val conn = openConnections(timeout, maxConcurrent)
       httpConnection = Some(conn)
       conn
     }
@@ -204,6 +209,17 @@ trait Container {
         RunResult(Interval(started, finished), response)
       }
   }
+  private def openConnections(timeout: FiniteDuration, maxConcurrent: Int) = {
+    if (Container.config.akkaClient) {
+      new AkkaContainerClient(addr.host, addr.port, timeout, 
ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT, 1024)
+    } else {
+      new ApacheBlockingContainerClient(
+        s"${addr.host}:${addr.port}",
+        timeout,
+        ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT,
+        maxConcurrent)
+    }
+  }
   private def closeConnections(toClose: Option[ContainerClient]): Future[Unit] 
= {
     toClose.map(_.close()).getOrElse(Future.successful(()))
   }
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala
index 373b123..968f942 100644
--- 
a/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala
@@ -205,8 +205,8 @@ class MesosTask(override protected val id: ContainerId,
 
   /** Dual of halt. */
   override def resume()(implicit transid: TransactionId): Future[Unit] = {
-    // resume not supported
-    Future.successful(Unit)
+    super.resume()
+    // resume not supported (just return result from super)
   }
 
   /** Completely destroys this instance of the container. */
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
index 7ac95e4..731966b 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala
@@ -180,8 +180,9 @@ class DockerContainer(protected val id: ContainerId,
   override def suspend()(implicit transid: TransactionId): Future[Unit] = {
     super.suspend().flatMap(_ => if (useRunc) runc.pause(id) else 
docker.pause(id))
   }
-  def resume()(implicit transid: TransactionId): Future[Unit] =
-    if (useRunc) { runc.resume(id) } else { docker.unpause(id) }
+  override def resume()(implicit transid: TransactionId): Future[Unit] = {
+    (if (useRunc) { runc.resume(id) } else { docker.unpause(id) }).flatMap(_ 
=> super.resume())
+  }
   override def destroy()(implicit transid: TransactionId): Future[Unit] = {
     super.destroy()
     docker.rm(id)
diff --git 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
index 4a61647..74fd292 100644
--- 
a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
+++ 
b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -104,7 +104,8 @@ class KubernetesContainer(protected[core] val id: 
ContainerId,
     super.suspend().flatMap(_ => kubernetes.suspend(this))
   }
 
-  def resume()(implicit transid: TransactionId): Future[Unit] = 
kubernetes.resume(this)
+  override def resume()(implicit transid: TransactionId): Future[Unit] =
+    kubernetes.resume(this).flatMap(_ => super.resume())
 
   override def destroy()(implicit transid: TransactionId): Future[Unit] = {
     super.destroy()
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
index 349fc31..6f4caf2 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala
@@ -107,7 +107,7 @@ class DockerToActivationLogStoreTests extends FlatSpec with 
Matchers with WskAct
                                                                                
    val logging: Logging)
       extends Container {
     override def suspend()(implicit transid: TransactionId): Future[Unit] = ???
-    def resume()(implicit transid: TransactionId): Future[Unit] = ???
+    override def resume()(implicit transid: TransactionId): Future[Unit] = ???
 
     def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: 
TransactionId) = lines
 
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
index d61cb84..0477fcc 100644
--- 
a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala
@@ -18,7 +18,6 @@
 package org.apache.openwhisk.core.containerpool.test
 
 import java.time.Instant
-
 import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack, Transition}
 import akka.actor.{ActorRef, ActorSystem, FSM}
 import akka.stream.scaladsl.Source
@@ -26,7 +25,6 @@ import akka.testkit.{ImplicitSender, TestKit}
 import akka.util.ByteString
 import common.{LoggedFunction, StreamLogging, SynchronizedLoggedFunction, 
WhiskProperties}
 import java.util.concurrent.atomic.AtomicInteger
-
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers}
@@ -41,7 +39,7 @@ import org.apache.openwhisk.core.entity._
 import org.apache.openwhisk.core.entity.size._
 import org.apache.openwhisk.http.Messages
 import org.apache.openwhisk.core.database.UserContext
-
+import scala.concurrent.Await
 import scala.concurrent.duration._
 import scala.concurrent.{ExecutionContext, Future, Promise}
 
@@ -1079,11 +1077,19 @@ class ContainerProxyTests
     def runCount = atomicRunCount.get()
     override def suspend()(implicit transid: TransactionId): Future[Unit] = {
       suspendCount += 1
-      super.suspend()
+      val s = super.suspend()
+      Await.result(s, 5.seconds)
+      //verify that httpconn is closed
+      httpConnection should be(None)
+      s
     }
-    def resume()(implicit transid: TransactionId): Future[Unit] = {
+    override def resume()(implicit transid: TransactionId): Future[Unit] = {
       resumeCount += 1
-      Future.successful(())
+      val r = super.resume()
+      Await.result(r, 5.seconds)
+      //verify that httpconn is recreated
+      httpConnection should be('defined)
+      r
     }
     override def destroy()(implicit transid: TransactionId): Future[Unit] = {
       destroyCount += 1

Reply via email to