This is an automated email from the ASF dual-hosted git repository. dubeejw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 94d5f3f Separate container removal from job completion. (#3132) 94d5f3f is described below commit 94d5f3f2a631ebf53d7718f8af8d6a20163fb89b Author: rodric rabbah <rod...@gmail.com> AuthorDate: Wed Jan 3 08:38:06 2018 -0500 Separate container removal from job completion. (#3132) The current handshake between the container proxy and the container pool conflates container removal and job completion. The former does not necessarily indicate there is free capacity in the pool since there are conditions under which the job is rescheduled (e.g., resume failure, container aging, container is reclaimed). --- .../whisk/core/containerpool/ContainerPool.scala | 17 +++++++++++++++++ .../whisk/core/containerpool/ContainerProxy.scala | 20 +++++++++++++++----- .../core/containerpool/test/ContainerPoolTests.scala | 14 ++++++++++++++ .../containerpool/test/ContainerProxyTests.scala | 6 +++--- 4 files changed, 49 insertions(+), 8 deletions(-) diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala index 1fa6b3a..61172dd 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala @@ -81,6 +81,10 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, def receive: Receive = { // A job to run on a container + // + // Run messages are received either via the feed or from child containers which cannot process + // their requests and send them back to the pool for rescheduling (this may happen if "docker" operations + // fail for example, or a container has aged and was destroying itself when a new request was assigned) case r: Run => val container = if (busyPool.size < maxActiveContainers) { // Schedule a job to a warm container @@ -110,6 +114,9 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, freePool = freePool - actor actor ! r // forwards the run request to the container case None => + // this can also happen if createContainer fails to start a new container, or + // if a job is rescheduled but the container it was allocated to has not yet destroyed itself + // (and a new container would over commit the pool) logging.error(this, "Rescheduling Run message, too many message in the pool")(r.msg.transid) self ! r } @@ -131,8 +138,18 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, freePool = freePool - sender() busyPool.get(sender()).foreach { _ => busyPool = busyPool - sender() + // container was busy, so there is capacity to accept another job request feed ! MessageFeed.Processed } + + // This message is received for one of these reasons: + // 1. Container errored while resuming a warm container, could not process the job, and sent the job back + // 2. The container aged, is destroying itself, and was assigned a job which it had to send back + // 3. The container aged and is destroying itself + // Update the free/busy lists but no message is sent to the feed since there is no change in capacity yet + case RescheduleJob => + freePool = freePool - sender() + busyPool = busyPool - sender() } /** Creates a new container and updates state accordingly. */ diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala index dd4336e..c3f89c5 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala @@ -67,7 +67,8 @@ case object Remove // Events sent by the actor case class NeedWork(data: ContainerData) case object ContainerPaused -case object ContainerRemoved +case object ContainerRemoved // when container is destroyed +case object RescheduleJob // job is sent back to parent and could not be processed because container is being destroyed /** * A proxy that wraps a Container. It is used to keep track of the lifecycle @@ -101,6 +102,7 @@ class ContainerProxy( with Stash { implicit val ec = context.system.dispatcher implicit val logging = new AkkaLogging(context.system.log) + var rescheduleJob = false // true iff actor receives a job but cannot process it because actor will destroy itself startWith(Uninitialized, NoData()) @@ -248,7 +250,9 @@ class ContainerProxy( // Sending the message to self on a failure will cause the message // to ultimately be sent back to the parent (which will retry it) // when container removal is done. - case Failure(_) => self ! job + case Failure(_) => + rescheduleJob = true + self ! job } .flatMap(_ => initializeAndRun(data.container, job)) .map(_ => WarmedData(data.container, job.msg.user.namespace, job.action, Instant.now)) @@ -256,8 +260,10 @@ class ContainerProxy( goto(Running) - // timeout or removing - case Event(StateTimeout | Remove, data: WarmedData) => destroyContainer(data.container) + // container is reclaimed by the pool or it has become too old + case Event(StateTimeout | Remove, data: WarmedData) => + rescheduleJob = true // to supress sending message to the pool and not double count + destroyContainer(data.container) } when(Removing) { @@ -292,7 +298,11 @@ class ContainerProxy( * @param container the container to destroy */ def destroyContainer(container: Container) = { - context.parent ! ContainerRemoved + if (!rescheduleJob) { + context.parent ! ContainerRemoved + } else { + context.parent ! RescheduleJob + } val unpause = stateName match { case Paused => container.resume()(TransactionId.invokerNanny) diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala index dcfe5e4..0b53103 100644 --- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala @@ -217,6 +217,20 @@ class ContainerPoolTests containers(1).expectMsg(runMessageDifferentNamespace) } + it should "reschedule job when container is removed prematurely without sending message to feed" in within(timeout) { + val (containers, factory) = testContainers(2) + val feed = TestProbe() + + // a pool with only 1 slot + val pool = system.actorOf(ContainerPool.props(factory, 1, 1, feed.ref)) + pool ! runMessage + containers(0).expectMsg(runMessage) + containers(0).send(pool, RescheduleJob) // emulate container failure ... + containers(0).send(pool, runMessage) // ... causing job to be rescheduled + feed.expectNoMsg(100.millis) + containers(1).expectMsg(runMessage) // job resent to new actor + } + /* * CONTAINER PREWARMING */ diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala index ac8da0f..b278871 100644 --- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala @@ -220,7 +220,7 @@ class ContainerProxyTests // Another pause causes the container to be removed timeout(machine) - expectMsg(ContainerRemoved) + expectMsg(RescheduleJob) expectMsg(Transition(machine, Paused, Removing)) awaitAssert { @@ -528,7 +528,7 @@ class ContainerProxyTests val runMessage = Run(action, message) machine ! runMessage expectMsg(Transition(machine, Paused, Running)) - expectMsg(ContainerRemoved) // The message is sent as soon as the container decides to destroy itself + expectMsg(RescheduleJob) expectMsg(Transition(machine, Running, Removing)) expectMsg(runMessage) @@ -649,7 +649,7 @@ class ContainerProxyTests machine ! Run(action, message) // State-machine shuts down nonetheless. - expectMsg(ContainerRemoved) + expectMsg(RescheduleJob) expectMsg(Transition(machine, Paused, Removing)) // Pool gets the message again. -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].