This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e27a81  Notify the user if a container is killed due to memory 
exhaustion. (#2827)
1e27a81 is described below

commit 1e27a81195c646239ab025c90d08684b2f7c7d74
Author: Markus Thömmes <markusthoem...@me.com>
AuthorDate: Wed Oct 4 20:00:23 2017 +0200

    Notify the user if a container is killed due to memory exhaustion. (#2827)
    
    
    If a user-container runs out of memory, the HTTP connections is abruptly 
aborted and the user has no chance to get further evidence into why her action 
failed.
    
    Docker actually provides that information so it can be checked on an abrupt 
connection termination whether the container was indeed aborted by the OOM 
killer.
---
 .../scala/whisk/core/containerpool/Container.scala |  8 +--
 .../scala/whisk/core/containerpool/HttpUtils.scala |  6 +-
 .../scala/whisk/core/entity/ActivationResult.scala | 17 ++++--
 .../src/main/scala/whisk/http/ErrorResponse.scala  |  1 +
 .../core/containerpool/docker/DockerClient.scala   | 14 +++++
 .../docker/DockerClientWithFileAccess.scala        |  5 ++
 .../containerpool/docker/DockerContainer.scala     | 66 +++++++++++++++++++---
 .../test/DockerClientWithFileAccessTests.scala     | 34 +++++++++++
 .../docker/test/DockerContainerTests.scala         | 15 +++--
 .../whisk/core/limits/ActionLimitsTests.scala      | 22 ++++++--
 10 files changed, 156 insertions(+), 32 deletions(-)

diff --git 
a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala 
b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
index 0cecbe6..2e913ab 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala
@@ -57,7 +57,7 @@ trait Container {
   protected implicit val ec: ExecutionContext
 
   /** HTTP connection to the container, will be lazily established by 
callContainer */
-  private var httpConnection: Option[HttpUtils] = None
+  protected var httpConnection: Option[HttpUtils] = None
 
   /** Stops the container from consuming CPU cycles. */
   def suspend()(implicit transid: TransactionId): Future[Unit]
@@ -147,10 +147,8 @@ trait Container {
    * @param timeout timeout of the request
    * @param retry whether or not to retry the request
    */
-  protected def callContainer(path: String,
-                              body: JsObject,
-                              timeout: FiniteDuration,
-                              retry: Boolean = false): Future[RunResult] = {
+  protected def callContainer(path: String, body: JsObject, timeout: 
FiniteDuration, retry: Boolean = false)(
+    implicit transid: TransactionId): Future[RunResult] = {
     val started = Instant.now()
     val http = httpConnection.getOrElse {
       val conn = new HttpUtils(s"${addr.host}:${addr.port}", timeout, 1.MB)
diff --git 
a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala 
b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
index 2815068..e0fd37f 100644
--- a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
+++ b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala
@@ -49,7 +49,7 @@ import whisk.core.entity.size.SizeLong
  * determined why that is.
  *
  * @param hostname the host name
- * @param timeoutMsec the timeout in msecs to wait for a response
+ * @param timeout the timeout in msecs to wait for a response
  * @param maxResponse the maximum size in bytes the connection will accept
  */
 protected[core] class HttpUtils(hostname: String, timeout: FiniteDuration, 
maxResponse: ByteSize) {
@@ -68,7 +68,7 @@ protected[core] class HttpUtils(hostname: String, timeout: 
FiniteDuration, maxRe
    * @param retry whether or not to retry on connection failure
    * @return Left(Error Message) or Right(Status Code, Response as UTF-8 
String)
    */
-  def post(endpoint: String, body: JsValue, retry: Boolean): 
Either[ContainerConnectionError, ContainerResponse] = {
+  def post(endpoint: String, body: JsValue, retry: Boolean): 
Either[ContainerHttpError, ContainerResponse] = {
     val entity = new StringEntity(body.compactPrint, StandardCharsets.UTF_8)
     entity.setContentType("application/json")
 
@@ -81,7 +81,7 @@ protected[core] class HttpUtils(hostname: String, timeout: 
FiniteDuration, maxRe
 
   private def execute(request: HttpRequestBase,
                       timeoutMsec: Integer,
-                      retry: Boolean): Either[ContainerConnectionError, 
ContainerResponse] = {
+                      retry: Boolean): Either[ContainerHttpError, 
ContainerResponse] = {
     Try(connection.execute(request)).map { response =>
       val containerResponse = Option(response.getEntity)
         .map { entity =>
diff --git 
a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala 
b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
index 51d195d..85a9f36 100644
--- a/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
+++ b/common/scala/src/main/scala/whisk/core/entity/ActivationResult.scala
@@ -95,10 +95,13 @@ protected[core] object ActivationResponse extends 
DefaultJsonProtocol {
   /**
    * Class of errors for invoker-container communication.
    */
-  protected[core] sealed abstract class ContainerConnectionError
-  protected[core] case class ConnectionError(t: Throwable) extends 
ContainerConnectionError
-  protected[core] case class NoResponseReceived() extends 
ContainerConnectionError
-  protected[core] case class Timeout() extends ContainerConnectionError
+  protected[core] sealed trait ContainerConnectionError
+  protected[core] sealed trait ContainerHttpError extends 
ContainerConnectionError
+  protected[core] case class ConnectionError(t: Throwable) extends 
ContainerHttpError
+  protected[core] case class NoResponseReceived() extends ContainerHttpError
+  protected[core] case class Timeout() extends ContainerHttpError
+
+  protected[core] case class MemoryExhausted() extends ContainerConnectionError
 
   /**
    * @param statusCode the container HTTP response code (e.g., 200 OK)
@@ -154,6 +157,9 @@ protected[core] object ActivationResponse extends 
DefaultJsonProtocol {
             containerError(truncatedResponse(str, length, maxlength))
         }
 
+      case Left(_: MemoryExhausted) =>
+        containerError(memoryExhausted)
+
       case Left(e) =>
         // This indicates a terminal failure in the container (it exited 
prematurely).
         containerError(abnormalInitialization)
@@ -205,6 +211,9 @@ protected[core] object ActivationResponse extends 
DefaultJsonProtocol {
             containerError(truncatedResponse(str, length, maxlength))
         }
 
+      case Left(_: MemoryExhausted) =>
+        containerError(memoryExhausted)
+
       case Left(e) =>
         // This indicates a terminal failure in the container (it exited 
prematurely).
         containerError(abnormalRun)
diff --git a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala 
b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
index 760b9f9..09c3bfe 100644
--- a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
+++ b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
@@ -112,6 +112,7 @@ object Messages {
   /** Error messages for activations. */
   val abnormalInitialization = "The action did not initialize and exited 
unexpectedly."
   val abnormalRun = "The action did not produce a valid response and exited 
unexpectedly."
+  val memoryExhausted = "The action exhausted its memory and was aborted."
   def badEntityName(value: String) = s"Parameter is not a valid value for a 
entity name: $value"
   def badNamespace(value: String) = s"Parameter is not a valid value for a 
namespace: $value"
   def badEpoch(value: String) = s"Parameter is not a valid value for epoch 
seconds: $value"
diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
 
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
index 28b5b7a..d714a9f 100644
--- 
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
+++ 
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala
@@ -20,6 +20,7 @@ package whisk.core.containerpool.docker
 import java.io.FileNotFoundException
 import java.nio.file.Files
 import java.nio.file.Paths
+
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.util.Failure
@@ -29,6 +30,7 @@ import akka.event.Logging.ErrorLevel
 import whisk.common.Logging
 import whisk.common.LoggingMarkers
 import whisk.common.TransactionId
+
 import scala.collection.concurrent.TrieMap
 import whisk.core.containerpool.ContainerId
 import whisk.core.containerpool.ContainerAddress
@@ -100,6 +102,9 @@ class DockerClient(dockerHost: Option[String] = 
None)(executionContext: Executio
       runCmd("pull", image).map(_ => ()).andThen { case _ => 
pullsInFlight.remove(image) }
     })
 
+  def isOomKilled(id: ContainerId)(implicit transid: TransactionId): 
Future[Boolean] =
+    runCmd("inspect", id.asString, "--format", 
"{{.State.OOMKilled}}").map(_.toBoolean)
+
   private def runCmd(args: String*)(implicit transid: TransactionId): 
Future[String] = {
     val cmd = dockerCmd ++ args
     val start = transid.started(this, 
LoggingMarkers.INVOKER_DOCKER_CMD(args.head), s"running ${cmd.mkString(" ")}")
@@ -175,4 +180,13 @@ trait DockerApi {
    * @return a Future completing once the pull is complete
    */
   def pull(image: String)(implicit transid: TransactionId): Future[Unit]
+
+  /**
+   * Determines whether the given container was killed due to
+   * memory constraints.
+   *
+   * @param id the id of the container to check
+   * @return a Future containing whether the container was killed or not
+   */
+  def isOomKilled(id: ContainerId)(implicit transid: TransactionId): 
Future[Boolean]
 }
diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala
 
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala
index 444c365..72e1eb9 100644
--- 
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala
+++ 
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala
@@ -130,6 +130,11 @@ class DockerClientWithFileAccess(
     }
   }
 
+  override def isOomKilled(id: ContainerId)(implicit transid: TransactionId): 
Future[Boolean] =
+    configFileContents(containerConfigFile(id))
+      .map(_.fields("State").asJsObject.fields("OOMKilled").convertTo[Boolean])
+      .recover { case _ => false }
+
   // See extended trait for description
   def rawContainerLogs(containerId: ContainerId, fromPos: Long): 
Future[ByteBuffer] = Future {
     blocking { // Needed due to synchronous file operations
diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
 
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
index a63c304..89960e3 100644
--- 
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
+++ 
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
@@ -18,8 +18,10 @@
 package whisk.core.containerpool.docker
 
 import java.nio.charset.StandardCharsets
+import java.time.Instant
 
 import akka.actor.ActorSystem
+import spray.json._
 
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
@@ -27,11 +29,8 @@ import scala.concurrent.duration._
 import scala.util.Failure
 import whisk.common.Logging
 import whisk.common.TransactionId
-import whisk.core.containerpool.BlackboxStartupError
-import whisk.core.containerpool.Container
-import whisk.core.containerpool.ContainerId
-import whisk.core.containerpool.ContainerAddress
-import whisk.core.containerpool.WhiskContainerStartupError
+import whisk.core.containerpool._
+import whisk.core.entity.ActivationResponse.{ConnectionError, MemoryExhausted}
 import whisk.core.entity.ByteSize
 import whisk.core.entity.size._
 
@@ -132,8 +131,9 @@ class DockerContainer(protected val id: ContainerId, 
protected val addr: Contain
   /** The last read-position in the log file */
   private var logFileOffset = 0L
 
-  protected val logsRetryCount = 15
-  protected val logsRetryWait = 100.millis
+  protected val waitForLogs: FiniteDuration = 2.seconds
+  protected val waitForOomState: FiniteDuration = 2.seconds
+  protected val filePollInterval: FiniteDuration = 100.milliseconds
 
   def suspend()(implicit transid: TransactionId): Future[Unit] = runc.pause(id)
   def resume()(implicit transid: TransactionId): Future[Unit] = runc.resume(id)
@@ -143,6 +143,54 @@ class DockerContainer(protected val id: ContainerId, 
protected val addr: Contain
   }
 
   /**
+   * Was the container killed due to memory exhaustion?
+   *
+   * Retries because as all docker state-relevant operations, they won't
+   * be reflected by the respective commands immediately but will take
+   * some time to be propagated.
+   *
+   * @param retries number of retries to make
+   * @return a Future indicating a memory exhaustion situation
+   */
+  private def isOomKilled(retries: Int = (waitForOomState / 
filePollInterval).toInt)(
+    implicit transid: TransactionId): Future[Boolean] = {
+    docker.isOomKilled(id)(TransactionId.invoker).flatMap { killed =>
+      if (killed) Future.successful(true)
+      else if (retries > 0) akka.pattern.after(filePollInterval, 
as.scheduler)(isOomKilled(retries - 1))
+      else Future.successful(false)
+    }
+  }
+
+  override protected def callContainer(path: String, body: JsObject, timeout: 
FiniteDuration, retry: Boolean = false)(
+    implicit transid: TransactionId): Future[RunResult] = {
+    val started = Instant.now()
+    val http = httpConnection.getOrElse {
+      val conn = new HttpUtils(s"${addr.host}:${addr.port}", timeout, 1.MB)
+      httpConnection = Some(conn)
+      conn
+    }
+    Future {
+      http.post(path, body, retry)
+    }.flatMap { response =>
+      val finished = Instant.now()
+
+      response.left
+        .map {
+          // Only check for memory exhaustion if there was a
+          // terminal connection error.
+          case error: ConnectionError =>
+            isOomKilled().map {
+              case true  => MemoryExhausted()
+              case false => error
+            }
+          case other => Future.successful(other)
+        }
+        .fold(_.map(Left(_)), right => Future.successful(Right(right)))
+        .map(res => RunResult(Interval(started, finished), res))
+    }
+  }
+
+  /**
    * Obtains the container's stdout and stderr output and converts it to our 
own JSON format.
    * At the moment, this is done by reading the internal Docker log file for 
the container.
    * Said file is written by Docker's JSON log driver and has a "well-known" 
location and name.
@@ -176,7 +224,7 @@ class DockerContainer(protected val id: ContainerId, 
protected val addr: Contain
 
           if (retries > 0 && !isComplete && !isTruncated) {
             logging.info(this, s"log cursor advanced but missing sentinel, 
trying $retries more times")
-            akka.pattern.after(logsRetryWait, as.scheduler)(readLogs(retries - 
1))
+            akka.pattern.after(filePollInterval, 
as.scheduler)(readLogs(retries - 1))
           } else {
             logFileOffset += rawLogBytes.position - rawLogBytes.arrayOffset
             Future.successful(formattedLogs)
@@ -188,7 +236,7 @@ class DockerContainer(protected val id: ContainerId, 
protected val addr: Contain
         }
     }
 
-    readLogs(logsRetryCount)
+    readLogs((waitForLogs / filePollInterval).toInt)
   }
 
 }
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala
 
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala
index 7c6acbf..8b3a060 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala
@@ -39,6 +39,7 @@ import org.scalatest.fixture.{FlatSpec => FixtureFlatSpec}
 
 import common.StreamLogging
 import spray.json._
+import spray.json.DefaultJsonProtocol._
 import whisk.common.TransactionId
 import whisk.core.containerpool.ContainerId
 import whisk.core.containerpool.ContainerAddress
@@ -110,6 +111,39 @@ class DockerClientWithFileAccessTestsIp extends FlatSpec 
with Matchers with Stre
   }
 }
 
+@RunWith(classOf[JUnitRunner])
+class DockerClientWithFileAccessTestsOom extends FlatSpec with Matchers with 
StreamLogging with BeforeAndAfterEach {
+  override def beforeEach = stream.reset()
+
+  implicit val transid = TransactionId.testing
+  val id = ContainerId("Id")
+
+  def await[A](f: Future[A], timeout: FiniteDuration = 500.milliseconds) = 
Await.result(f, timeout)
+
+  def dockerClient(readResult: Future[JsObject]) =
+    new DockerClientWithFileAccess()(global) {
+      override val dockerCmd = Seq("docker")
+      override def configFileContents(configFile: File) = readResult
+    }
+
+  def stateObject(oom: Boolean) = JsObject("State" -> JsObject("OOMKilled" -> 
oom.toJson))
+
+  behavior of "DockerClientWithFileAccess - isOomKilled"
+
+  it should "return the state of the container respectively" in {
+    val dcTrue = dockerClient(Future.successful(stateObject(true)))
+    await(dcTrue.isOomKilled(id)) shouldBe true
+
+    val dcFalse = dockerClient(Future.successful(stateObject(false)))
+    await(dcFalse.isOomKilled(id)) shouldBe false
+  }
+
+  it should "default to 'false' if the json structure is unparseable" in {
+    val dc = dockerClient(Future.successful(JsObject()))
+    await(dc.isOomKilled(id)) shouldBe false
+  }
+}
+
 /**
  * The file access tests use fixtures (org.scalatest.fixture.FlatSpec) in 
contrast to
  * the IP address related tests. For this reason, the file access tests are in 
a separate
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
 
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index 47e67c2..960ab92 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -76,14 +76,15 @@ class DockerContainerTests
     retryCount: Int = 0)(implicit docker: DockerApiWithFileAccess, runc: 
RuncApi): DockerContainer = {
 
     new DockerContainer(id, addr) {
-      override protected def callContainer(path: String,
-                                           body: JsObject,
-                                           timeout: FiniteDuration,
-                                           retry: Boolean = false): 
Future[RunResult] = {
+      override protected def callContainer(
+        path: String,
+        body: JsObject,
+        timeout: FiniteDuration,
+        retry: Boolean = false)(implicit transid: TransactionId): 
Future[RunResult] = {
         ccRes
       }
-      override protected val logsRetryCount = retryCount
-      override protected val logsRetryWait = 0.milliseconds
+      override protected val waitForLogs = retryCount.milliseconds
+      override protected val filePollInterval = 1.millisecond
     }
   }
 
@@ -680,6 +681,8 @@ class DockerContainerTests
       Future.successful(())
     }
 
+    override def isOomKilled(id: ContainerId)(implicit transid: 
TransactionId): Future[Boolean] = ???
+
     def rawContainerLogs(containerId: ContainerId, fromPos: Long): 
Future[ByteBuffer] = {
       rawContainerLogsInvocations += ((containerId, fromPos))
       Future.successful(ByteBuffer.wrap(Array[Byte]()))
diff --git a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala 
b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
index e3e955c..4ac38f2 100644
--- a/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
+++ b/tests/src/test/scala/whisk/core/limits/ActionLimitsTests.scala
@@ -231,7 +231,7 @@ class ActionLimitsTests extends TestHelpers with 
WskTestHelpers {
       error.fields("message") shouldBe {
         JsObject(
           "code" -> "EMFILE".toJson,
-          "errno" -> -24.toJson,
+          "errno" -> (-24).toJson,
           "path" -> "/dev/zero".toJson,
           "syscall" -> "open".toJson)
       }
@@ -241,10 +241,7 @@ class ActionLimitsTests extends TestHelpers with 
WskTestHelpers {
 
       activation.logs
         .getOrElse(List())
-        .filter {
-          _.contains("ERROR: opened files = ")
-        }
-        .length shouldBe 1
+        .count(_.contains("ERROR: opened files = ")) shouldBe 1
     }
   }
 
@@ -266,4 +263,19 @@ class ActionLimitsTests extends TestHelpers with 
WskTestHelpers {
       }
     }
   }
+
+  it should "be aborted when exceeding its memory limits" in 
withAssetCleaner(wskprops) { (wp, assetHelper) =>
+    val name = "TestNodeJsMemoryExceeding"
+    assetHelper.withCleaner(wsk.action, name, confirmDelete = true) {
+      val allowedMemory = 256.megabytes
+      val actionName = TestUtils.getTestActionFilename("memoryWithGC.js")
+      (action, _) =>
+        action.create(name, Some(actionName), memory = Some(allowedMemory))
+    }
+
+    val run = wsk.action.invoke(name, Map("payload" -> 512.toJson))
+    withActivation(wsk.activation, run) {
+      _.response.result.get.fields("error") shouldBe 
Messages.memoryExhausted.toJson
+    }
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].

Reply via email to