This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 2312b7d Recreate HTTP client on Container.resume(). (#4185) 2312b7d is described below commit 2312b7d6b913fc30bcd56643285fa29aaa18815c Author: tysonnorris <tysonnor...@gmail.com> AuthorDate: Mon Jan 21 02:13:48 2019 -0800 Recreate HTTP client on Container.resume(). (#4185) reopen connections only once, during Container.resume() --- .../openwhisk/core/containerpool/Container.scala | 44 +++++++++++++++------- .../apache/openwhisk/core/mesos/MesosTask.scala | 4 +- .../containerpool/docker/DockerContainer.scala | 5 ++- .../kubernetes/KubernetesContainer.scala | 3 +- .../test/DockerToActivationLogStoreTests.scala | 2 +- .../containerpool/test/ContainerProxyTests.scala | 18 ++++++--- 6 files changed, 50 insertions(+), 26 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala index a2e2709..354ec38 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/containerpool/Container.scala @@ -62,6 +62,11 @@ object Container { loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool) } +/** + * Abstraction for Container operations. + * Container manipulation (specifically suspend/resume/destroy) is NOT thread-safe and MUST be synchronized by caller. + * Container access (specifically run) is thread-safe (e.g. for concurrent activation processing). + */ trait Container { implicit protected val as: ActorSystem @@ -73,7 +78,11 @@ trait Container { /** HTTP connection to the container, will be lazily established by callContainer */ protected var httpConnection: Option[ContainerClient] = None - /** Stops the container from consuming CPU cycles. */ + /** maxConcurrent+timeout are cached during first init, so that resuming connections can reference */ + protected var containerHttpMaxConcurrent: Int = 1 + protected var containerHttpTimeout: FiniteDuration = 60.seconds + + /** Stops the container from consuming CPU cycles. NOT thread-safe - caller must synchronize. */ def suspend()(implicit transid: TransactionId): Future[Unit] = { //close connection first, then close connection pool //(testing pool recreation vs connection closing, time was similar - so using the simpler recreation approach) @@ -82,8 +91,11 @@ trait Container { closeConnections(toClose) } - /** Dual of halt. */ - def resume()(implicit transid: TransactionId): Future[Unit] + /** Dual of halt. NOT thread-safe - caller must synchronize.*/ + def resume()(implicit transid: TransactionId): Future[Unit] = { + httpConnection = Some(openConnections(containerHttpTimeout, containerHttpMaxConcurrent)) + Future.successful({}) + } /** Obtains logs up to a given threshold from the container. Optionally waits for a sentinel to appear. */ def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Source[ByteString, Any] @@ -101,7 +113,8 @@ trait Container { LoggingMarkers.INVOKER_ACTIVATION_INIT, s"sending initialization to $id $addr", logLevel = InfoLevel) - + containerHttpMaxConcurrent = maxConcurrent + containerHttpTimeout = timeout val body = JsObject("value" -> initializer) callContainer("/init", body, timeout, maxConcurrent, retry = true) .andThen { // never fails @@ -132,7 +145,7 @@ trait Container { } } - /** Runs code in the container. */ + /** Runs code in the container. Thread-safe - caller may invoke concurrently for concurrent activation processing. */ def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration, maxConcurrent: Int)( implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = { val actionName = environment.fields.get("action_name").map(_.convertTo[String]).getOrElse("") @@ -185,15 +198,7 @@ trait Container { retry: Boolean = false)(implicit transid: TransactionId): Future[RunResult] = { val started = Instant.now() val http = httpConnection.getOrElse { - val conn = if (Container.config.akkaClient) { - new AkkaContainerClient(addr.host, addr.port, timeout, ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT, 1024) - } else { - new ApacheBlockingContainerClient( - s"${addr.host}:${addr.port}", - timeout, - ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT, - maxConcurrent) - } + val conn = openConnections(timeout, maxConcurrent) httpConnection = Some(conn) conn } @@ -204,6 +209,17 @@ trait Container { RunResult(Interval(started, finished), response) } } + private def openConnections(timeout: FiniteDuration, maxConcurrent: Int) = { + if (Container.config.akkaClient) { + new AkkaContainerClient(addr.host, addr.port, timeout, ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT, 1024) + } else { + new ApacheBlockingContainerClient( + s"${addr.host}:${addr.port}", + timeout, + ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT, + maxConcurrent) + } + } private def closeConnections(toClose: Option[ContainerClient]): Future[Unit] = { toClose.map(_.close()).getOrElse(Future.successful(())) } diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala index 373b123..968f942 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/mesos/MesosTask.scala @@ -205,8 +205,8 @@ class MesosTask(override protected val id: ContainerId, /** Dual of halt. */ override def resume()(implicit transid: TransactionId): Future[Unit] = { - // resume not supported - Future.successful(Unit) + super.resume() + // resume not supported (just return result from super) } /** Completely destroys this instance of the container. */ diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala index 7ac95e4..731966b 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/docker/DockerContainer.scala @@ -180,8 +180,9 @@ class DockerContainer(protected val id: ContainerId, override def suspend()(implicit transid: TransactionId): Future[Unit] = { super.suspend().flatMap(_ => if (useRunc) runc.pause(id) else docker.pause(id)) } - def resume()(implicit transid: TransactionId): Future[Unit] = - if (useRunc) { runc.resume(id) } else { docker.unpause(id) } + override def resume()(implicit transid: TransactionId): Future[Unit] = { + (if (useRunc) { runc.resume(id) } else { docker.unpause(id) }).flatMap(_ => super.resume()) + } override def destroy()(implicit transid: TransactionId): Future[Unit] = { super.destroy() docker.rm(id) diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala index 4a61647..74fd292 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/kubernetes/KubernetesContainer.scala @@ -104,7 +104,8 @@ class KubernetesContainer(protected[core] val id: ContainerId, super.suspend().flatMap(_ => kubernetes.suspend(this)) } - def resume()(implicit transid: TransactionId): Future[Unit] = kubernetes.resume(this) + override def resume()(implicit transid: TransactionId): Future[Unit] = + kubernetes.resume(this).flatMap(_ => super.resume()) override def destroy()(implicit transid: TransactionId): Future[Unit] = { super.destroy() diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala index 349fc31..6f4caf2 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala @@ -107,7 +107,7 @@ class DockerToActivationLogStoreTests extends FlatSpec with Matchers with WskAct val logging: Logging) extends Container { override def suspend()(implicit transid: TransactionId): Future[Unit] = ??? - def resume()(implicit transid: TransactionId): Future[Unit] = ??? + override def resume()(implicit transid: TransactionId): Future[Unit] = ??? def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId) = lines diff --git a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala index d61cb84..0477fcc 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/containerpool/test/ContainerProxyTests.scala @@ -18,7 +18,6 @@ package org.apache.openwhisk.core.containerpool.test import java.time.Instant - import akka.actor.FSM.{CurrentState, SubscribeTransitionCallBack, Transition} import akka.actor.{ActorRef, ActorSystem, FSM} import akka.stream.scaladsl.Source @@ -26,7 +25,6 @@ import akka.testkit.{ImplicitSender, TestKit} import akka.util.ByteString import common.{LoggedFunction, StreamLogging, SynchronizedLoggedFunction, WhiskProperties} import java.util.concurrent.atomic.AtomicInteger - import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner import org.scalatest.{BeforeAndAfterAll, FlatSpecLike, Matchers} @@ -41,7 +39,7 @@ import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.entity.size._ import org.apache.openwhisk.http.Messages import org.apache.openwhisk.core.database.UserContext - +import scala.concurrent.Await import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future, Promise} @@ -1079,11 +1077,19 @@ class ContainerProxyTests def runCount = atomicRunCount.get() override def suspend()(implicit transid: TransactionId): Future[Unit] = { suspendCount += 1 - super.suspend() + val s = super.suspend() + Await.result(s, 5.seconds) + //verify that httpconn is closed + httpConnection should be(None) + s } - def resume()(implicit transid: TransactionId): Future[Unit] = { + override def resume()(implicit transid: TransactionId): Future[Unit] = { resumeCount += 1 - Future.successful(()) + val r = super.resume() + Await.result(r, 5.seconds) + //verify that httpconn is recreated + httpConnection should be('defined) + r } override def destroy()(implicit transid: TransactionId): Future[Unit] = { destroyCount += 1