markusthoemmes closed pull request #3219: Kubernetes ContainerFactoryProvider 
implementation
URL: https://github.com/apache/incubator-openwhisk/pull/3219
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 1a44bcee68..8e0aeede68 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -41,6 +41,7 @@ dependencies {
     }
     compile 'com.github.ben-manes.caffeine:caffeine:2.4.0'
     compile 'com.google.code.findbugs:jsr305:3.0.2'
+    compile 'io.fabric8:kubernetes-client:2.5.7'
     compile 'io.kamon:kamon-core_2.11:0.6.7'
     compile 'io.kamon:kamon-statsd_2.11:0.6.7'
 }
diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala 
b/common/scala/src/main/scala/whisk/common/Logging.scala
index cce6b913f6..37cc1e3cc6 100644
--- a/common/scala/src/main/scala/whisk/common/Logging.scala
+++ b/common/scala/src/main/scala/whisk/common/Logging.scala
@@ -263,6 +263,7 @@ object LoggingMarkers {
   def INVOKER_RUNC_CMD(cmd: String) = LogMarkerToken(invoker, s"runc.$cmd", 
start)
   def INVOKER_CONTAINER_START(containerState: String) =
     LogMarkerToken(invoker, s"container_start_${containerState}", count)
+  def INVOKER_KUBECTL_CMD(cmd: String) = LogMarkerToken(invoker, 
s"kubectl.$cmd", start)
 
   /*
    * General markers
diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala 
b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
index 79c6d89003..aaa6a89aa9 100644
--- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala
@@ -256,6 +256,9 @@ object ConfigKeys {
   val containerFactory = "whisk.container-factory"
   val containerArgs = s"$containerFactory.container-args"
 
+  val kubernetes = "whisk.kubernetes"
+  val kubernetesTimeouts = s"$kubernetes.timeouts"
+
   val transactions = "whisk.transactions"
   val stride = s"$transactions.stride"
 
diff --git a/core/invoker/Dockerfile b/core/invoker/Dockerfile
index da3342990a..1bc2164e45 100644
--- a/core/invoker/Dockerfile
+++ b/core/invoker/Dockerfile
@@ -1,6 +1,9 @@
 FROM scala
 
 ENV DOCKER_VERSION 1.12.0
+ENV KUBERNETES_VERSION 1.6.4
+
+RUN apk add --update openssl
 
 # Uncomment to fetch latest version of docker instead: RUN wget -qO- 
https://get.docker.com | sh
 # Install docker client
@@ -11,6 +14,11 @@ rm -f docker-${DOCKER_VERSION}.tgz && \
 chmod +x /usr/bin/docker && \
 chmod +x /usr/bin/docker-runc
 
+# Install kubernetes client
+RUN wget --no-verbose 
https://storage.googleapis.com/kubernetes-release/release/v${KUBERNETES_VERSION}/bin/linux/amd64/kubectl
 && \
+chmod +x kubectl && \
+mv kubectl /usr/bin/kubectl
+
 COPY build/distributions/invoker.tar ./
 RUN tar xf invoker.tar && \
 rm -f invoker.tar
@@ -19,4 +27,4 @@ COPY init.sh /
 RUN chmod +x init.sh
 
 EXPOSE 8080
-CMD ["init.sh", "0"]
\ No newline at end of file
+CMD ["init.sh", "0"]
diff --git a/core/invoker/src/main/resources/application.conf 
b/core/invoker/src/main/resources/application.conf
index 94620dd203..37d2f5ffa9 100644
--- a/core/invoker/src/main/resources/application.conf
+++ b/core/invoker/src/main/resources/application.conf
@@ -14,6 +14,14 @@ whisk {
     unpause: 10 seconds
   }
 
+  # Timeouts for k8s commands. Set to "Inf" to disable timeout.
+  kubernetes.timeouts {
+    run: 1 minute
+    rm: 1 minute
+    inspect: 1 minute
+    logs: 1 minute
+  }
+
   # Timeouts for runc commands. Set to "Inf" to disable timeout.
   runc.timeouts {
     pause: 10 seconds
@@ -27,4 +35,4 @@ whisk {
     extra-args: {}
 
   }
-}
\ No newline at end of file
+}
diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
 
b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
new file mode 100644
index 0000000000..39155d1535
--- /dev/null
+++ 
b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.kubernetes
+
+import java.io.{FileNotFoundException, IOException}
+import java.net.SocketTimeoutException
+import java.nio.file.Files
+import java.nio.file.Paths
+import java.time.{Instant, ZoneId}
+import java.time.format.DateTimeFormatterBuilder
+
+import akka.actor.ActorSystem
+import akka.event.Logging.{ErrorLevel, InfoLevel}
+import akka.http.scaladsl.model.Uri
+import akka.http.scaladsl.model.Uri.Path
+import akka.http.scaladsl.model.Uri.Query
+import akka.stream.{Attributes, Outlet, SourceShape}
+import akka.stream.scaladsl.Source
+import akka.stream.stage._
+import akka.util.ByteString
+import pureconfig.loadConfigOrThrow
+import whisk.common.Logging
+import whisk.common.LoggingMarkers
+import whisk.common.TransactionId
+import whisk.core.ConfigKeys
+import whisk.core.containerpool.ContainerId
+import whisk.core.containerpool.ContainerAddress
+import whisk.core.containerpool.docker.ProcessRunner
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.blocking
+import scala.concurrent.duration._
+import scala.util.Failure
+import scala.util.Success
+import scala.util.Try
+import spray.json._
+import spray.json.DefaultJsonProtocol._
+import io.fabric8.kubernetes.client.ConfigBuilder
+import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import okhttp3.{Call, Callback, Request, Response}
+import okio.BufferedSource
+
+import scala.annotation.tailrec
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+/**
+ * Configuration for kubernetes client command timeouts.
+ */
+case class KubernetesClientTimeoutConfig(run: Duration, rm: Duration, inspect: 
Duration, logs: Duration)
+
+/**
+ * Serves as interface to the kubectl CLI tool.
+ *
+ * Be cautious with the ExecutionContext passed to this, as the
+ * calls to the CLI are blocking.
+ *
+ * You only need one instance (and you shouldn't get more).
+ */
+class KubernetesClient(
+  timeouts: KubernetesClientTimeoutConfig = 
loadConfigOrThrow[KubernetesClientTimeoutConfig](
+    ConfigKeys.kubernetesTimeouts))(executionContext: 
ExecutionContext)(implicit log: Logging, as: ActorSystem)
+    extends KubernetesApi
+    with ProcessRunner {
+  implicit private val ec = executionContext
+  implicit private val kubeRestClient = new DefaultKubernetesClient(
+    new ConfigBuilder()
+      .withConnectionTimeout(timeouts.logs.toMillis.toInt)
+      .withRequestTimeout(timeouts.logs.toMillis.toInt)
+      .build())
+
+  // Determines how to run kubectl. Failure to find a kubectl binary implies
+  // a failure to initialize this instance of KubernetesClient.
+  protected def findKubectlCmd(): String = {
+    val alternatives = List("/usr/bin/kubectl", "/usr/local/bin/kubectl")
+    val kubectlBin = Try {
+      alternatives.find(a => Files.isExecutable(Paths.get(a))).get
+    } getOrElse {
+      throw new FileNotFoundException(s"Couldn't locate kubectl binary (tried: 
${alternatives.mkString(", ")}).")
+    }
+    kubectlBin
+  }
+  protected val kubectlCmd = Seq(findKubectlCmd)
+
+  def run(name: String, image: String, args: Seq[String] = Seq.empty[String])(
+    implicit transid: TransactionId): Future[ContainerId] = {
+    runCmd(Seq("run", name, s"--image=$image") ++ args, timeouts.run)
+      .map(_ => ContainerId(name))
+  }
+
+  def inspectIPAddress(id: ContainerId)(implicit transid: TransactionId): 
Future[ContainerAddress] = {
+    Future {
+      blocking {
+        val pod =
+          
kubeRestClient.pods().withName(id.asString).waitUntilReady(timeouts.inspect.length,
 timeouts.inspect.unit)
+        ContainerAddress(pod.getStatus().getPodIP())
+      }
+    }.recoverWith {
+      case e =>
+        log.error(this, s"Failed to get IP of Pod '${id.asString}' within 
timeout: ${e.getClass} - ${e.getMessage}")
+        Future.failed(new Exception(s"Failed to get IP of Pod 
'${id.asString}'"))
+    }
+  }
+
+  def rm(id: ContainerId)(implicit transid: TransactionId): Future[Unit] =
+    runCmd(Seq("delete", "--now", "pod", id.asString), timeouts.rm).map(_ => 
())
+
+  def rm(key: String, value: String)(implicit transid: TransactionId): 
Future[Unit] =
+    runCmd(Seq("delete", "--now", "pod", "-l", s"$key=$value"), 
timeouts.rm).map(_ => ())
+
+  def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: 
Boolean = false)(
+    implicit transid: TransactionId): Source[TypedLogLine, Any] = {
+
+    log.debug(this, "Parsing logs from Kubernetes Graph Stage?")
+
+    Source
+      .fromGraph(new KubernetesRestLogSourceStage(id, sinceTime, 
waitForSentinel))
+      .log("foobar")
+
+  }
+
+  private def runCmd(args: Seq[String], timeout: Duration)(implicit transid: 
TransactionId): Future[String] = {
+    val cmd = kubectlCmd ++ args
+    val start = transid.started(
+      this,
+      LoggingMarkers.INVOKER_KUBECTL_CMD(args.head),
+      s"running ${cmd.mkString(" ")} (timeout: $timeout)",
+      logLevel = InfoLevel)
+    executeProcess(cmd, timeout).andThen {
+      case Success(_) => transid.finished(this, start)
+      case Failure(t) => transid.failed(this, start, t.getMessage, ErrorLevel)
+    }
+  }
+}
+
+object KubernetesClient {
+
+  // Necessary, as Kubernetes uses nanosecond precision in logs, but 
java.time.Instant toString uses milliseconds
+  //%Y-%m-%dT%H:%M:%S.%N%z
+  val K8STimestampFormat = new DateTimeFormatterBuilder()
+    .parseCaseInsensitive()
+    .appendPattern("u-MM-dd")
+    .appendLiteral('T')
+    .appendPattern("HH:mm:ss[.n]")
+    .appendLiteral('Z')
+    .toFormatter()
+    .withZone(ZoneId.of("UTC"))
+
+  def parseK8STimestamp(ts: String): Try[Instant] =
+    Try(Instant.from(K8STimestampFormat.parse(ts)))
+
+  def formatK8STimestamp(ts: Instant): Try[String] =
+    Try(K8STimestampFormat.format(ts))
+}
+
+trait KubernetesApi {
+  def run(name: String, image: String, args: Seq[String] = Seq.empty[String])(
+    implicit transid: TransactionId): Future[ContainerId]
+
+  def inspectIPAddress(id: ContainerId)(implicit transid: TransactionId): 
Future[ContainerAddress]
+
+  def rm(id: ContainerId)(implicit transid: TransactionId): Future[Unit]
+
+  def rm(key: String, value: String)(implicit transid: TransactionId): 
Future[Unit]
+
+  def logs(containerId: ContainerId, sinceTime: Option[Instant], 
waitForSentinel: Boolean = false)(
+    implicit transid: TransactionId): Source[TypedLogLine, Any]
+}
+
+object KubernetesRestLogSourceStage {
+
+  import KubernetesClient.{formatK8STimestamp, parseK8STimestamp}
+
+  def constructPath(namespace: String, containerId: String): Path =
+    Path / "api" / "v1" / "namespaces" / namespace / "pods" / containerId / 
"log"
+
+  def constructQuery(sinceTime: Option[Instant], waitForSentinel: Boolean): 
Query = {
+
+    val sinceTimestamp = sinceTime.flatMap(time => 
formatK8STimestamp(time).toOption)
+
+    Query(Map("timestamps" -> "true") ++ sinceTimestamp.map(time => 
"sinceTime" -> time))
+
+  }
+
+  @tailrec
+  def readLines(src: BufferedSource,
+                lastTimestamp: Option[Instant],
+                lines: Seq[TypedLogLine] = Seq.empty[TypedLogLine]): 
Seq[TypedLogLine] = {
+
+    if (!src.exhausted()) {
+      (for {
+        line <- Option(src.readUtf8Line()) if !line.isEmpty
+        timestampDelimiter = line.indexOf(" ")
+        // Kubernetes is ignoring nanoseconds in sinceTime, so we have to 
filter additionally here
+        rawTimestamp = line.substring(0, timestampDelimiter)
+        timestamp <- parseK8STimestamp(rawTimestamp).toOption if 
isRelevantLogLine(lastTimestamp, timestamp)
+        msg = line.substring(timestampDelimiter + 1)
+        stream = "stdout" // TODO - when we can distinguish stderr: 
https://github.com/kubernetes/kubernetes/issues/28167
+      } yield {
+        TypedLogLine(timestamp, stream, msg)
+      }) match {
+        case Some(logLine) =>
+          readLines(src, Option(logLine.time), lines :+ logLine)
+        case None =>
+          // we may have skipped a line for filtering conditions only; keep 
going
+          readLines(src, lastTimestamp, lines)
+      }
+    } else {
+      lines
+    }
+
+  }
+
+  def isRelevantLogLine(lastTimestamp: Option[Instant], newTimestamp: 
Instant): Boolean =
+    lastTimestamp match {
+      case Some(last) =>
+        newTimestamp.isAfter(last)
+      case None =>
+        true
+    }
+
+}
+
+final class KubernetesRestLogSourceStage(id: ContainerId, sinceTime: 
Option[Instant], waitForSentinel: Boolean)(
+  implicit val kubeRestClient: DefaultKubernetesClient)
+    extends GraphStage[SourceShape[TypedLogLine]] { stage =>
+
+  import KubernetesRestLogSourceStage._
+
+  val out = Outlet[TypedLogLine]("K8SHttpLogging.out")
+
+  override val shape: SourceShape[TypedLogLine] = SourceShape.of(out)
+
+  override protected def initialAttributes: Attributes = 
Attributes.name("KubernetesHttpLogSource")
+
+  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
+    new GraphStageLogicWithLogging(shape) { logic =>
+
+      private val queue = mutable.Queue.empty[TypedLogLine]
+      private var lastTimestamp = sinceTime
+
+      def fetchLogs(): Unit =
+        try {
+          val path = constructPath(kubeRestClient.getNamespace, id.asString)
+          val query = constructQuery(lastTimestamp, waitForSentinel)
+
+          log.debug("* Fetching K8S HTTP Logs w/ Path: {} Query: {}", path, 
query)
+
+          val url = Uri(kubeRestClient.getMasterUrl.toString)
+            .withPath(path)
+            .withQuery(query)
+
+          val request = new Request.Builder().get().url(url.toString).build
+
+          kubeRestClient.getHttpClient.newCall(request).enqueue(new 
LogFetchCallback())
+        } catch {
+          case NonFatal(e) =>
+            onFailure(e)
+            throw e
+        }
+
+      def onFailure(e: Throwable): Unit = e match {
+        case _: SocketTimeoutException =>
+          log.warning("* Logging socket to Kubernetes timed out.") // this 
should only happen with follow behavior
+        case _ =>
+          log.error(e, "* Retrieving the logs from Kubernetes failed.")
+      }
+
+      val emitCallback: AsyncCallback[Seq[TypedLogLine]] = 
getAsyncCallback[Seq[TypedLogLine]] {
+        case firstLine +: restOfLines if isAvailable(out) =>
+          pushLine(firstLine)
+          queue ++= restOfLines
+        case lines =>
+          queue ++= lines
+      }
+
+      class LogFetchCallback extends Callback {
+
+        override def onFailure(call: Call, e: IOException): Unit = 
logic.onFailure(e)
+
+        override def onResponse(call: Call, response: Response): Unit =
+          try {
+            val lines = readLines(response.body.source, lastTimestamp)
+
+            response.body.source.close()
+
+            lines.lastOption.foreach { line =>
+              lastTimestamp = Option(line.time)
+            }
+
+            emitCallback.invoke(lines)
+          } catch {
+            case NonFatal(e) =>
+              log.error(e, "* Reading Kubernetes HTTP Response failed.")
+              logic.onFailure(e)
+              throw e
+          }
+      }
+
+      def pushLine(line: TypedLogLine): Unit = {
+        log.debug("* Pushing a chunk of kubernetes logging: {}", line)
+        push(out, line)
+      }
+
+      setHandler(
+        out,
+        new OutHandler {
+          override def onPull(): Unit = {
+            // if we still have lines queued up, return those; else make a new 
HTTP read.
+            if (queue.nonEmpty)
+              pushLine(queue.dequeue())
+            else
+              fetchLogs()
+          }
+        })
+    }
+}
+
+protected[core] final case class TypedLogLine(time: Instant, stream: String, 
log: String) {
+  import KubernetesClient.formatK8STimestamp
+
+  lazy val toJson: JsObject =
+    JsObject("time" -> formatK8STimestamp(time).getOrElse("").toJson, "stream" 
-> stream.toJson, "log" -> log.toJson)
+
+  lazy val jsonPrinted: String = toJson.compactPrint
+  lazy val jsonSize: Int = jsonPrinted.length
+
+  /**
+   * Returns a ByteString representation of the json for this Log Line
+   */
+  val toByteString = ByteString(jsonPrinted)
+
+  override def toString = s"${formatK8STimestamp(time).get} $stream: 
${log.trim}"
+}
+
+protected[core] object TypedLogLine {
+
+  import KubernetesClient.{parseK8STimestamp, K8STimestampFormat}
+
+  def readInstant(json: JsValue): Instant = json match {
+    case JsString(str) =>
+      parseK8STimestamp(str) match {
+        case Success(time) =>
+          time
+        case Failure(e) =>
+          deserializationError(
+            s"Could not parse a java.time.Instant from $str (Expected in 
format: $K8STimestampFormat: $e")
+      }
+    case _ =>
+      deserializationError(s"Could not parse a java.time.Instant from $json 
(Expected in format: $K8STimestampFormat)")
+  }
+
+  implicit val typedLogLineFormat = new RootJsonFormat[TypedLogLine] {
+    override def write(obj: TypedLogLine): JsValue = obj.toJson
+
+    override def read(json: JsValue): TypedLogLine = {
+      val obj = json.asJsObject
+      val fields = obj.fields
+      TypedLogLine(readInstant(fields("time")), 
fields("stream").convertTo[String], fields("log").convertTo[String])
+    }
+  }
+
+}
diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
 
b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
new file mode 100644
index 0000000000..4c4ccdcc9a
--- /dev/null
+++ 
b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.kubernetes
+
+import java.time.Instant
+import java.util.concurrent.atomic.AtomicReference
+
+import akka.stream.StreamLimitReachedException
+import akka.stream.scaladsl.Framing.FramingException
+import akka.stream.scaladsl.Source
+import akka.util.ByteString
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import whisk.common.Logging
+import whisk.common.TransactionId
+import whisk.core.containerpool.Container
+import whisk.core.containerpool.WhiskContainerStartupError
+import whisk.core.containerpool.ContainerId
+import whisk.core.containerpool.ContainerAddress
+import whisk.core.containerpool.docker.{CompleteAfterOccurrences, 
DockerContainer, OccurrencesNotFoundException}
+import whisk.core.entity.ByteSize
+import whisk.core.entity.size._
+import whisk.http.Messages
+
+object KubernetesContainer {
+
+  /**
+   * Creates a container running in kubernetes
+   *
+   * @param transid transaction creating the container
+   * @param image image to create the container from
+   * @param userProvidedImage whether the image is provided by the user
+   *     or is an OpenWhisk provided image
+   * @param labels labels to set on the container
+   * @param name optional name for the container
+   * @return a Future which either completes with a KubernetesContainer or one 
of two specific failures
+   */
+  def create(transid: TransactionId,
+             name: String,
+             image: String,
+             userProvidedImage: Boolean = false,
+             memory: ByteSize = 256.MB,
+             environment: Map[String, String] = Map(),
+             labels: Map[String, String] = Map())(implicit kubernetes: 
KubernetesApi,
+                                                  ec: ExecutionContext,
+                                                  log: Logging): 
Future[KubernetesContainer] = {
+    implicit val tid = transid
+
+    val podName = name.replace("_", "-").replaceAll("[()]", "").toLowerCase()
+
+    val environmentArgs = environment.flatMap {
+      case (key, value) => Seq("--env", s"$key=$value")
+    }.toSeq
+
+    val labelArgs = labels.map {
+      case (key, value) => s"$key=$value"
+    } match {
+      case Seq() => Seq()
+      case pairs => Seq("-l") ++ pairs
+    }
+
+    val args = Seq("--generator", "run-pod/v1", "--restart", "Always", 
"--limits", s"memory=${memory.toMB}Mi") ++ environmentArgs ++ labelArgs
+
+    for {
+      id <- kubernetes.run(podName, image, args).recoverWith {
+        case _ => 
Future.failed(WhiskContainerStartupError(Messages.resourceProvisionError))
+      }
+      ip <- kubernetes.inspectIPAddress(id).recoverWith {
+        // remove the container immediately if inspect failed as
+        // we cannot recover that case automatically
+        case _ =>
+          kubernetes.rm(id)
+          
Future.failed(WhiskContainerStartupError(Messages.resourceProvisionError))
+      }
+    } yield new KubernetesContainer(id, ip)
+  }
+
+}
+
+/**
+ * Represents a container as run by kubernetes.
+ *
+ * This class contains OpenWhisk specific behavior and as such does not 
necessarily
+ * use kubernetes commands to achieve the effects needed.
+ *
+ * @constructor
+ * @param id the id of the container
+ * @param addr the ip & port of the container
+ */
+class KubernetesContainer(protected val id: ContainerId, protected val addr: 
ContainerAddress)(
+  implicit kubernetes: KubernetesApi,
+  protected val ec: ExecutionContext,
+  protected val logging: Logging)
+    extends Container {
+
+  /** The last read timestamp in the log file */
+  private val lastTimestamp = new AtomicReference[Option[Instant]](None)
+
+  protected val waitForLogs: FiniteDuration = 2.seconds
+
+  // no-op under Kubernetes
+  def suspend()(implicit transid: TransactionId): Future[Unit] = 
Future.successful({})
+
+  // no-op under Kubernetes
+  def resume()(implicit transid: TransactionId): Future[Unit] = 
Future.successful({})
+
+  override def destroy()(implicit transid: TransactionId): Future[Unit] = {
+    super.destroy()
+    kubernetes.rm(id)
+  }
+
+  private val stringSentinel = DockerContainer.ActivationSentinel.utf8String
+
+  def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: 
TransactionId): Source[ByteString, Any] = {
+
+    kubernetes
+      .logs(id, lastTimestamp.get, waitForSentinel)
+      .limitWeighted(limit.toBytes) { obj =>
+        // Adding + 1 since we know there's a newline byte being read
+        obj.jsonSize.toLong + 1
+      }
+      .via(new CompleteAfterOccurrences(_.log == stringSentinel, 2, 
waitForSentinel))
+      .recover {
+        case _: StreamLimitReachedException =>
+          // While the stream has already ended by failing the limitWeighted 
stage above, we inject a truncation
+          // notice downstream, which will be processed as usual. This will be 
the last element of the stream.
+          TypedLogLine(Instant.now, "stderr", Messages.truncateLogs(limit))
+        case _: OccurrencesNotFoundException | _: FramingException =>
+          // Stream has already ended and we insert a notice that data might 
be missing from the logs. While a
+          // FramingException can also mean exceeding the limits, we cannot 
decide which case happened so we resort
+          // to the general error message. This will be the last element of 
the stream.
+          TypedLogLine(Instant.now, "stderr", Messages.logFailure)
+      }
+      .takeWithin(waitForLogs)
+      .map { line =>
+        lastTimestamp.set(Some(line.time))
+        line.toByteString
+      }
+  }
+
+}
diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
 
b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
new file mode 100644
index 0000000000..219c8a397b
--- /dev/null
+++ 
b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.kubernetes
+
+import akka.actor.ActorSystem
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+import whisk.common.Logging
+import whisk.common.TransactionId
+import whisk.core.containerpool.Container
+import whisk.core.containerpool.ContainerFactory
+import whisk.core.containerpool.ContainerFactoryProvider
+import whisk.core.entity.ByteSize
+import whisk.core.entity.ExecManifest.ImageName
+import whisk.core.entity.InstanceId
+import whisk.core.WhiskConfig
+
+class KubernetesContainerFactory(label: String, config: WhiskConfig)(implicit 
actorSystem: ActorSystem,
+                                                                     ec: 
ExecutionContext,
+                                                                     logging: 
Logging)
+    extends ContainerFactory {
+
+  implicit val kubernetes = new KubernetesClient()(ec)
+
+  /** Perform cleanup on init */
+  override def init(): Unit = cleanup()
+
+  override def cleanup() = {
+    logging.info(this, "Cleaning up function runtimes")
+    val cleaning = kubernetes.rm("invoker", label)(TransactionId.invokerNanny)
+    Await.ready(cleaning, 30.seconds)
+  }
+
+  override def createContainer(tid: TransactionId,
+                               name: String,
+                               actionImage: ImageName,
+                               userProvidedImage: Boolean,
+                               memory: ByteSize)(implicit config: WhiskConfig, 
logging: Logging): Future[Container] = {
+    val image = if (userProvidedImage) {
+      actionImage.publicImageName
+    } else {
+      actionImage.localImageName(config.dockerRegistry, 
config.dockerImagePrefix, Some(config.dockerImageTag))
+    }
+
+    KubernetesContainer.create(
+      tid,
+      name,
+      image,
+      userProvidedImage,
+      memory,
+      environment = Map("__OW_API_HOST" -> config.wskApiHost),
+      labels = Map("invoker" -> label))
+  }
+}
+
+object KubernetesContainerFactoryProvider extends ContainerFactoryProvider {
+  override def getContainerFactory(actorSystem: ActorSystem,
+                                   logging: Logging,
+                                   config: WhiskConfig,
+                                   instance: InstanceId,
+                                   parameters: Map[String, Set[String]]): 
ContainerFactory =
+    new KubernetesContainerFactory(s"invoker${instance.toInt}", 
config)(actorSystem, actorSystem.dispatcher, logging)
+}
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
 
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index ea7781036b..25bc0c980c 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -48,7 +48,29 @@ import whisk.core.entity.ActivationResponse.ContainerResponse
 import whisk.core.entity.ActivationResponse.Timeout
 import whisk.core.entity.size._
 import whisk.http.Messages
-import whisk.core.entity.size._
+
+import DockerContainerTests._
+
+object DockerContainerTests {
+
+  /** Awaits the given future, throws the exception enclosed in Failure. */
+  def await[A](f: Future[A], timeout: FiniteDuration = 500.milliseconds) = 
Await.result[A](f, timeout)
+
+  /** Creates an interval starting at EPOCH with the given duration. */
+  def intervalOf(duration: FiniteDuration) = Interval(Instant.EPOCH, 
Instant.ofEpochMilli(duration.toMillis))
+
+  def toRawLog(log: Seq[LogLine], appendSentinel: Boolean = true): ByteString 
= {
+    val appendedLog = if (appendSentinel) {
+      val lastTime = log.lastOption.map { case LogLine(time, _, _) => time 
}.getOrElse(Instant.EPOCH.toString)
+      log :+
+        LogLine(lastTime, "stderr", 
s"${DockerContainer.ActivationSentinel.utf8String}\n") :+
+        LogLine(lastTime, "stdout", 
s"${DockerContainer.ActivationSentinel.utf8String}\n")
+    } else {
+      log
+    }
+    ByteString(appendedLog.map(_.toJson.compactPrint).mkString("", "\n", "\n"))
+  }
+}
 
 /**
  * Unit tests for ContainerPool schedule
@@ -69,9 +91,6 @@ class DockerContainerTests
 
   implicit val materializer: ActorMaterializer = ActorMaterializer()
 
-  /** Awaits the given future, throws the exception enclosed in Failure. */
-  def await[A](f: Future[A], timeout: FiniteDuration = 500.milliseconds) = 
Await.result[A](f, timeout)
-
   /** Reads logs into memory and awaits them */
   def awaitLogs(source: Source[ByteString, Any], timeout: FiniteDuration = 
500.milliseconds): Vector[String] =
     
Await.result(source.via(DockerToActivationLogStore.toFormattedString).runWith(Sink.seq[String]),
 timeout).toVector
@@ -100,9 +119,6 @@ class DockerContainerTests
     }
   }
 
-  /** Creates an interval starting at EPOCH with the given duration. */
-  def intervalOf(duration: FiniteDuration) = Interval(Instant.EPOCH, 
Instant.ofEpochMilli(duration.toMillis))
-
   behavior of "DockerContainer"
 
   implicit val transid = TransactionId.testing
@@ -433,18 +449,6 @@ class DockerContainerTests
   /*
    * LOGS
    */
-  def toRawLog(log: Seq[LogLine], appendSentinel: Boolean = true): ByteString 
= {
-    val appendedLog = if (appendSentinel) {
-      val lastTime = log.lastOption.map { case LogLine(time, _, _) => time 
}.getOrElse(Instant.EPOCH.toString)
-      log :+
-        LogLine(lastTime, "stderr", 
s"${DockerContainer.ActivationSentinel.utf8String}\n") :+
-        LogLine(lastTime, "stdout", 
s"${DockerContainer.ActivationSentinel.utf8String}\n")
-    } else {
-      log
-    }
-    ByteString(appendedLog.map(_.toJson.compactPrint).mkString("", "\n", "\n"))
-  }
-
   it should "read a simple log with sentinel" in {
     val expectedLogEntry = LogLine(Instant.EPOCH.toString, "stdout", "This is 
a log entry.\n")
     val rawLog = toRawLog(Seq(expectedLogEntry), appendSentinel = true)
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
 
b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
new file mode 100644
index 0000000000..0b14042a20
--- /dev/null
+++ 
b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.kubernetes.test
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.{Concat, Sink, Source}
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import org.junit.runner.RunWith
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.concurrent.Eventually
+import org.scalatest.FlatSpec
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.Matchers
+import org.scalatest.time.{Seconds, Span}
+import common.{StreamLogging, WskActorSystem}
+import okio.Buffer
+import whisk.common.LogMarker
+import whisk.common.LoggingMarkers.INVOKER_KUBECTL_CMD
+import whisk.common.TransactionId
+import whisk.core.containerpool.{ContainerAddress, ContainerId}
+import whisk.core.containerpool.kubernetes.{KubernetesApi, KubernetesClient, 
KubernetesRestLogSourceStage, TypedLogLine}
+import whisk.core.containerpool.docker.ProcessRunningException
+
+import scala.collection.mutable
+import scala.collection.immutable
+
+@RunWith(classOf[JUnitRunner])
+class KubernetesClientTests
+    extends FlatSpec
+    with Matchers
+    with StreamLogging
+    with BeforeAndAfterEach
+    with Eventually
+    with WskActorSystem {
+
+  import KubernetesClientTests._
+
+  implicit val materializer: ActorMaterializer = ActorMaterializer()
+
+  /** Reads logs into memory and awaits them */
+  def awaitLogs(source: Source[TypedLogLine, Any], timeout: FiniteDuration = 
1000.milliseconds): Vector[TypedLogLine] =
+    Await.result(source.runWith(Sink.seq[TypedLogLine]), timeout).toVector
+
+  override def beforeEach = stream.reset()
+
+  implicit override val patienceConfig = PatienceConfig(timeout = 
scaled(Span(5, Seconds)))
+
+  implicit val transid = TransactionId.testing
+  val id = 
ContainerId("55db56ee082239428b27d3728b4dd324c09068458aad9825727d5bfc1bba6d52")
+
+  val commandTimeout = 500.milliseconds
+  def await[A](f: Future[A], timeout: FiniteDuration = commandTimeout) = 
Await.result(f, timeout)
+
+  val kubectlCommand = "kubectl"
+
+  /** Returns a KubernetesClient with a mocked result for 'executeProcess' */
+  def kubernetesClient(fixture: => Future[String]) = new 
KubernetesClient()(global) {
+    override def findKubectlCmd() = kubectlCommand
+    override def executeProcess(args: Seq[String], timeout: Duration)(implicit 
ec: ExecutionContext, as: ActorSystem) =
+      fixture
+  }
+
+  behavior of "KubernetesClient"
+
+  it should "write proper log markers on a successful command" in {
+    // a dummy string works here as we do not assert any output
+    // from the methods below
+    val stdout = "stdout"
+    val client = kubernetesClient { Future.successful(stdout) }
+
+    /** Awaits the command and checks for proper logging. */
+    def runAndVerify(f: Future[_], cmd: String, args: Seq[String]) = {
+      val result = await(f)
+
+      logLines.head should include((Seq(kubectlCommand, cmd) ++ 
args).mkString(" "))
+
+      val start = LogMarker.parse(logLines.head)
+      start.token shouldBe INVOKER_KUBECTL_CMD(cmd)
+
+      val end = LogMarker.parse(logLines.last)
+      end.token shouldBe INVOKER_KUBECTL_CMD(cmd).asFinish
+
+      stream.reset()
+      result
+    }
+
+    runAndVerify(client.rm(id), "delete", Seq("--now", "pod", id.asString))
+
+    val image = "image"
+    val name = "name"
+    val expected = Seq(name, s"--image=$image")
+    runAndVerify(client.run(name, image), "run", expected) shouldBe 
ContainerId(name)
+  }
+
+  it should "write proper log markers on a failing command" in {
+    val client = kubernetesClient { Future.failed(new RuntimeException()) }
+
+    /** Awaits the command, asserts the exception and checks for proper 
logging. */
+    def runAndVerify(f: Future[_], cmd: String) = {
+      a[RuntimeException] should be thrownBy await(f)
+
+      val start = LogMarker.parse(logLines.head)
+      start.token shouldBe INVOKER_KUBECTL_CMD(cmd)
+
+      val end = LogMarker.parse(logLines.last)
+      end.token shouldBe INVOKER_KUBECTL_CMD(cmd).asError
+
+      stream.reset()
+    }
+
+    runAndVerify(client.rm(id), "delete")
+    runAndVerify(client.run("name", "image"), "run")
+  }
+
+  it should "fail with ProcessRunningException when run returns with exit code 
!=125 or no container ID" in {
+    def runAndVerify(pre: ProcessRunningException, clue: String) = {
+      val client = kubernetesClient { Future.failed(pre) }
+      withClue(s"${clue} - exitCode = ${pre.exitCode}, stdout = 
'${pre.stdout}', stderr = '${pre.stderr}': ") {
+        the[ProcessRunningException] thrownBy await(client.run("name", 
"image")) shouldBe pre
+      }
+    }
+
+    Seq[(ProcessRunningException, String)](
+      (ProcessRunningException(126, id.asString, "Unknown command"), "Exit 
code not 125"),
+      (ProcessRunningException(125, "", "Unknown flag: --foo"), "No container 
ID"),
+      (ProcessRunningException(1, "", ""), "Exit code not 125 and no container 
ID")).foreach {
+      case (pre, clue) => runAndVerify(pre, clue)
+    }
+  }
+
+  val firstLog = """2018-02-06T00:00:18.419889342Z first activation
+                   |2018-02-06T00:00:18.419929471Z 
XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+                   |2018-02-06T00:00:18.419988733Z 
XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+                   |""".stripMargin
+  val secondLog = """2018-02-06T00:09:35.38267193Z second activation
+                    |2018-02-06T00:09:35.382990278Z 
XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+                    |2018-02-06T00:09:35.383116503Z 
XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX
+                    |""".stripMargin
+
+  def firstSource(lastTimestamp: Option[Instant] = None): Source[TypedLogLine, 
Any] =
+    Source(
+      KubernetesRestLogSourceStage
+        .readLines(new Buffer().writeUtf8(firstLog), lastTimestamp, List.empty)
+        .to[immutable.Seq])
+
+  def secondSource(lastTimestamp: Option[Instant] = None): 
Source[TypedLogLine, Any] =
+    Source(
+      KubernetesRestLogSourceStage
+        .readLines(new Buffer().writeUtf8(secondLog), lastTimestamp, 
List.empty)
+        .to[immutable.Seq])
+
+  it should "return all logs when no sinceTime passed" in {
+    val client = new TestKubernetesClient {
+      override def logs(id: ContainerId, sinceTime: Option[Instant], 
waitForSentinel: Boolean)(
+        implicit transid: TransactionId): Source[TypedLogLine, Any] = {
+        firstSource()
+      }
+    }
+    val logs = awaitLogs(client.logs(id, None))
+    logs should have size 3
+    logs(0) shouldBe TypedLogLine("2018-02-06T00:00:18.419889342Z", "stdout", 
"first activation")
+    logs(2) shouldBe TypedLogLine("2018-02-06T00:00:18.419988733Z", "stdout", 
"XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
+  }
+
+  it should "return all logs after the one matching sinceTime" in {
+
+    val testDate: Option[Instant] = "2018-02-06T00:00:18.419988733Z"
+    val client = new TestKubernetesClient {
+      override def logs(id: ContainerId, sinceTime: Option[Instant], 
waitForSentinel: Boolean)(
+        implicit transid: TransactionId): Source[TypedLogLine, Any] = {
+        Source.combine(firstSource(testDate), 
secondSource(testDate))(Concat(_))
+      }
+    }
+    val logs = awaitLogs(client.logs(id, testDate))
+    logs should have size 3
+    logs(0) shouldBe TypedLogLine("2018-02-06T00:09:35.38267193Z", "stdout", 
"second activation")
+    logs(2) shouldBe TypedLogLine("2018-02-06T00:09:35.383116503Z", "stdout", 
"XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
+  }
+
+  it should "return all logs if none match sinceTime" in {
+    val testDate: Option[Instant] = "2018-02-06T00:00:18.419988733Z"
+    val client = new TestKubernetesClient {
+      override def logs(id: ContainerId, sinceTime: Option[Instant], 
waitForSentinel: Boolean)(
+        implicit transid: TransactionId): Source[TypedLogLine, Any] = {
+        secondSource(testDate)
+      }
+    }
+    val logs = awaitLogs(client.logs(id, testDate))
+    logs should have size 3
+    logs(0) shouldBe TypedLogLine("2018-02-06T00:09:35.38267193Z", "stdout", 
"second activation")
+    logs(2) shouldBe TypedLogLine("2018-02-06T00:09:35.383116503Z", "stdout", 
"XXX_THE_END_OF_A_WHISK_ACTIVATION_XXX")
+  }
+
+}
+
+object KubernetesClientTests {
+  import scala.language.implicitConversions
+
+  implicit def strToDate(str: String): Option[Instant] =
+    KubernetesClient.parseK8STimestamp(str).toOption
+
+  implicit def strToInstant(str: String): Instant =
+    strToDate(str).get
+
+  class TestKubernetesClient extends KubernetesApi {
+    var runs = mutable.Buffer.empty[(String, String, Seq[String])]
+    var inspects = mutable.Buffer.empty[ContainerId]
+    var rms = mutable.Buffer.empty[ContainerId]
+    var rmByLabels = mutable.Buffer.empty[(String, String)]
+    var logCalls = mutable.Buffer.empty[(ContainerId, Option[Instant])]
+
+    def run(name: String, image: String, args: Seq[String] = 
Seq.empty[String])(
+      implicit transid: TransactionId): Future[ContainerId] = {
+      runs += ((name, image, args))
+      Future.successful(ContainerId("testId"))
+    }
+
+    def inspectIPAddress(id: ContainerId)(implicit transid: TransactionId): 
Future[ContainerAddress] = {
+      inspects += id
+      Future.successful(ContainerAddress("testIp"))
+    }
+
+    def rm(id: ContainerId)(implicit transid: TransactionId): Future[Unit] = {
+      rms += id
+      Future.successful(())
+    }
+
+    def rm(key: String, value: String)(implicit transid: TransactionId): 
Future[Unit] = {
+      rmByLabels += ((key, value))
+      Future.successful(())
+    }
+    def logs(id: ContainerId, sinceTime: Option[Instant], waitForSentinel: 
Boolean = false)(
+      implicit transid: TransactionId): Source[TypedLogLine, Any] = {
+      logCalls += ((id, sinceTime))
+      Source(List.empty[TypedLogLine])
+    }
+  }
+}
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
 
b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
new file mode 100644
index 0000000000..80dc0a3283
--- /dev/null
+++ 
b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.kubernetes.test
+
+import java.io.IOException
+import java.time.{Instant, ZoneId}
+
+import akka.NotUsed
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.{Flow, Sink, Source}
+import akka.util.ByteString
+import common.TimingHelpers
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import scala.concurrent.Future
+import org.junit.runner.RunWith
+import org.scalamock.scalatest.MockFactory
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.FlatSpec
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.Matchers
+import common.{StreamLogging, WskActorSystem}
+import spray.json._
+import whisk.common.LoggingMarkers._
+import whisk.common.LogMarker
+import whisk.common.TransactionId
+import whisk.core.containerpool._
+import whisk.core.containerpool.kubernetes._
+import whisk.core.containerpool.docker._
+import whisk.core.entity.ActivationResponse
+import whisk.core.entity.ActivationResponse.ContainerResponse
+import whisk.core.entity.ActivationResponse.Timeout
+import whisk.core.entity.size._
+import whisk.http.Messages
+import whisk.core.containerpool.docker.test.DockerContainerTests._
+
+import scala.collection.{immutable, mutable}
+
+/**
+ * Unit tests for ContainerPool schedule
+ */
+@RunWith(classOf[JUnitRunner])
+class KubernetesContainerTests
+    extends FlatSpec
+    with Matchers
+    with MockFactory
+    with StreamLogging
+    with BeforeAndAfterEach
+    with WskActorSystem
+    with TimingHelpers {
+
+  import KubernetesClientTests.TestKubernetesClient
+  import KubernetesContainerTests._
+
+  override def beforeEach() = {
+    stream.reset()
+  }
+
+  implicit val materializer: ActorMaterializer = ActorMaterializer()
+
+  def instantDT(instant: Instant): Instant = 
Instant.from(instant.atZone(ZoneId.of("GMT+0")))
+
+  val Epoch = Instant.EPOCH
+  val EpochDateTime = instantDT(Epoch)
+
+  /** Transforms chunked JsObjects into formatted strings */
+  val toFormattedString: Flow[ByteString, String, NotUsed] =
+    
Flow[ByteString].map(_.utf8String.parseJson.convertTo[TypedLogLine].toString)
+
+  /** Reads logs into memory and awaits them */
+  def awaitLogs(source: Source[ByteString, Any], timeout: FiniteDuration = 
500.milliseconds): Vector[String] =
+    Await.result(source.via(toFormattedString).runWith(Sink.seq[String]), 
timeout).toVector
+
+  val containerId = ContainerId("id")
+
+  /**
+   * Constructs a testcontainer with overridden IO methods. Results of the 
override can be provided
+   * as parameters.
+   */
+  def kubernetesContainer(id: ContainerId = containerId, addr: 
ContainerAddress = ContainerAddress("ip"))(
+    ccRes: Future[RunResult] =
+      Future.successful(RunResult(intervalOf(1.millisecond), 
Right(ContainerResponse(true, "", None)))),
+    awaitLogs: FiniteDuration = 2.seconds)(implicit kubernetes: 
KubernetesApi): KubernetesContainer = {
+
+    new KubernetesContainer(id, addr) {
+      override protected def callContainer(
+        path: String,
+        body: JsObject,
+        timeout: FiniteDuration,
+        retry: Boolean = false)(implicit transid: TransactionId): 
Future[RunResult] = {
+        ccRes
+      }
+      override protected val waitForLogs = awaitLogs
+    }
+  }
+
+  behavior of "KubernetesContainer"
+
+  implicit val transid = TransactionId.testing
+  val parameters = Map(
+    "--cap-drop" -> Set("NET_RAW", "NET_ADMIN"),
+    "--ulimit" -> Set("nofile=1024:1024"),
+    "--pids-limit" -> Set("1024"))
+
+  /*
+   * CONTAINER CREATION
+   */
+  it should "create a new instance" in {
+    implicit val kubernetes = new TestKubernetesClient
+
+    val image = "image"
+    val userProvidedImage = false
+    val environment = Map("test" -> "hi")
+    val labels = Map("invoker" -> "0")
+    val name = "my_Container(1)"
+    val container = KubernetesContainer.create(
+      transid = transid,
+      image = image,
+      userProvidedImage = userProvidedImage,
+      environment = environment,
+      labels = labels,
+      name = name)
+
+    await(container)
+
+    kubernetes.runs should have size 1
+    kubernetes.inspects should have size 1
+    kubernetes.rms should have size 0
+
+    val (testName, testImage, testArgs) = kubernetes.runs.head
+    testName shouldBe "my-container1"
+    testImage shouldBe image
+    testArgs shouldBe Seq(
+      "--generator",
+      "run-pod/v1",
+      "--restart",
+      "Always",
+      "--limits",
+      "memory=256Mi",
+      "--env",
+      "test=hi",
+      "-l",
+      "invoker=0")
+  }
+
+  it should "pull a user provided image before creating the container" in {
+    implicit val kubernetes = new TestKubernetesClient
+
+    val container =
+      KubernetesContainer.create(transid = transid, name = "name", image = 
"image", userProvidedImage = true)
+    await(container)
+
+    kubernetes.runs should have size 1
+    kubernetes.inspects should have size 1
+    kubernetes.rms should have size 0
+  }
+
+  it should "remove the container if inspect fails" in {
+    implicit val kubernetes = new TestKubernetesClient {
+      override def inspectIPAddress(id: ContainerId)(implicit transid: 
TransactionId): Future[ContainerAddress] = {
+        inspects += id
+        Future.failed(new RuntimeException())
+      }
+    }
+
+    val container = KubernetesContainer.create(transid = transid, name = 
"name", image = "image")
+    a[WhiskContainerStartupError] should be thrownBy await(container)
+
+    kubernetes.runs should have size 1
+    kubernetes.inspects should have size 1
+    kubernetes.rms should have size 1
+  }
+
+  it should "provide a proper error if run fails for blackbox containers" in {
+    implicit val kubernetes = new TestKubernetesClient {
+      override def run(name: String, image: String, args: Seq[String])(
+        implicit transid: TransactionId): Future[ContainerId] = {
+        runs += ((name, image, args))
+        Future.failed(ProcessRunningException(1, "", ""))
+      }
+    }
+
+    val container =
+      KubernetesContainer.create(transid = transid, name = "name", image = 
"image", userProvidedImage = true)
+    a[WhiskContainerStartupError] should be thrownBy await(container)
+
+    kubernetes.runs should have size 1
+    kubernetes.inspects should have size 0
+    kubernetes.rms should have size 0
+  }
+
+  it should "provide a proper error if inspect fails for blackbox containers" 
in {
+    implicit val kubernetes = new TestKubernetesClient {
+      override def inspectIPAddress(id: ContainerId)(implicit transid: 
TransactionId): Future[ContainerAddress] = {
+        inspects += id
+        Future.failed(new RuntimeException())
+      }
+    }
+
+    val container =
+      KubernetesContainer.create(transid = transid, name = "name", image = 
"image", userProvidedImage = true)
+    a[WhiskContainerStartupError] should be thrownBy await(container)
+
+    kubernetes.runs should have size 1
+    kubernetes.inspects should have size 1
+    kubernetes.rms should have size 1
+  }
+
+  /*
+   * KUBERNETES COMMANDS
+   */
+  it should "destroy a container via Kubernetes" in {
+    implicit val kubernetes = stub[KubernetesApi]
+
+    val id = ContainerId("id")
+    val container = new KubernetesContainer(id, ContainerAddress("ip"))
+
+    container.destroy()
+
+    (kubernetes.rm(_: ContainerId)(_: TransactionId)).verify(id, transid)
+  }
+
+  /*
+   * INITIALIZE
+   *
+   * Only tests for quite simple cases. Disambiguation of errors is delegated 
to ActivationResponse
+   * and so are the tests for those.
+   */
+  it should "initialize a container" in {
+    implicit val kubernetes = stub[KubernetesApi]
+
+    val initTimeout = 1.second
+    val interval = intervalOf(1.millisecond)
+    val container = kubernetesContainer() {
+      Future.successful(RunResult(interval, Right(ContainerResponse(true, "", 
None))))
+    }
+
+    val initInterval = container.initialize(JsObject(), initTimeout)
+    await(initInterval, initTimeout) shouldBe interval
+
+    // assert the starting log is there
+    val start = LogMarker.parse(logLines.head)
+    start.token shouldBe INVOKER_ACTIVATION_INIT
+
+    // assert the end log is there
+    val end = LogMarker.parse(logLines.last)
+    end.token shouldBe INVOKER_ACTIVATION_INIT.asFinish
+    end.deltaToMarkerStart shouldBe Some(interval.duration.toMillis)
+  }
+
+  it should "properly deal with a timeout during initialization" in {
+    implicit val kubernetes = stub[KubernetesApi]
+
+    val initTimeout = 1.second
+    val interval = intervalOf(initTimeout + 1.nanoseconds)
+
+    val container = kubernetesContainer() {
+      Future.successful(RunResult(interval, Left(Timeout())))
+    }
+
+    val init = container.initialize(JsObject(), initTimeout)
+
+    val error = the[InitializationError] thrownBy await(init, initTimeout)
+    error.interval shouldBe interval
+    error.response.statusCode shouldBe ActivationResponse.ApplicationError
+
+    // assert the finish log is there
+    val end = LogMarker.parse(logLines.last)
+    end.token shouldBe INVOKER_ACTIVATION_INIT.asFinish
+  }
+
+  /*
+   * RUN
+   *
+   * Only tests for quite simple cases. Disambiguation of errors is delegated 
to ActivationResponse
+   * and so are the tests for those.
+   */
+  it should "run a container" in {
+    implicit val kubernetes = stub[KubernetesApi]
+
+    val interval = intervalOf(1.millisecond)
+    val result = JsObject()
+    val container = kubernetesContainer() {
+      Future.successful(RunResult(interval, Right(ContainerResponse(true, 
result.compactPrint, None))))
+    }
+
+    val runResult = container.run(JsObject(), JsObject(), 1.second)
+    await(runResult) shouldBe (interval, 
ActivationResponse.success(Some(result)))
+
+    // assert the starting log is there
+    val start = LogMarker.parse(logLines.head)
+    start.token shouldBe INVOKER_ACTIVATION_RUN
+
+    // assert the end log is there
+    val end = LogMarker.parse(logLines.last)
+    end.token shouldBe INVOKER_ACTIVATION_RUN.asFinish
+    end.deltaToMarkerStart shouldBe Some(interval.duration.toMillis)
+  }
+
+  it should "properly deal with a timeout during run" in {
+    implicit val kubernetes = stub[KubernetesApi]
+
+    val runTimeout = 1.second
+    val interval = intervalOf(runTimeout + 1.nanoseconds)
+
+    val container = kubernetesContainer() {
+      Future.successful(RunResult(interval, Left(Timeout())))
+    }
+
+    val runResult = container.run(JsObject(), JsObject(), runTimeout)
+    await(runResult) shouldBe (interval, ActivationResponse.applicationError(
+      Messages.timedoutActivation(runTimeout, false)))
+
+    // assert the finish log is there
+    val end = LogMarker.parse(logLines.last)
+    end.token shouldBe INVOKER_ACTIVATION_RUN.asFinish
+  }
+
+  /*
+   * LOGS
+   */
+  it should "read a simple log with sentinel" in {
+    val expectedLogEntry = TypedLogLine(currentTsp, "stdout", "This is a log 
entry.")
+    val logSrc = logSource(expectedLogEntry, appendSentinel = true)
+
+    implicit val kubernetes = new TestKubernetesClient {
+      override def logs(id: ContainerId, sinceTime: Option[Instant], 
waitForSentinel: Boolean)(
+        implicit transid: TransactionId): Source[TypedLogLine, Any] = {
+        logCalls += ((id, sinceTime))
+        logSrc
+      }
+    }
+
+    val container = kubernetesContainer(id = containerId)()
+    // Read with tight limit to verify that no truncation occurs TODO: Need to 
figure out how to handle this with the Source-based kubernetes logs
+    val processedLogs = awaitLogs(container.logs(limit = 4096.B, 
waitForSentinel = true))
+
+    kubernetes.logCalls should have size 1
+    val (id, sinceTime) = kubernetes.logCalls(0)
+    id shouldBe containerId
+    sinceTime shouldBe None
+
+    processedLogs should have size 1
+    processedLogs shouldBe Vector(expectedLogEntry.rawString)
+  }
+
+  it should "read a simple log without sentinel" in {
+    val expectedLogEntry = TypedLogLine(currentTsp, "stdout", "This is a log 
entry.")
+    val logSrc = logSource(expectedLogEntry, appendSentinel = false)
+
+    implicit val kubernetes = new TestKubernetesClient {
+      override def logs(id: ContainerId, sinceTime: Option[Instant], 
waitForSentinel: Boolean)(
+        implicit transid: TransactionId): Source[TypedLogLine, Any] = {
+        logCalls += ((id, sinceTime))
+        logSrc
+      }
+    }
+
+    val container = kubernetesContainer(id = containerId)()
+    // Read without tight limit so that the full read result is processed
+    val processedLogs = awaitLogs(container.logs(limit = 1.MB, waitForSentinel 
= false))
+
+    kubernetes.logCalls should have size 1
+    val (id, sinceTime) = kubernetes.logCalls(0)
+    id shouldBe containerId
+    sinceTime shouldBe None
+
+    processedLogs should have size 1
+    processedLogs shouldBe Vector(expectedLogEntry.rawString)
+  }
+
+  it should "fail log reading if error occurs during file reading" in {
+    implicit val kubernetes = new TestKubernetesClient {
+      override def logs(id: ContainerId, sinceTime: Option[Instant], 
waitForSentinel: Boolean)(
+        implicit transid: TransactionId): Source[TypedLogLine, Any] = {
+        logCalls += ((containerId, sinceTime))
+        Source.failed(new IOException)
+      }
+    }
+
+    val container = kubernetesContainer()()
+    an[IOException] should be thrownBy awaitLogs(container.logs(limit = 1.MB, 
waitForSentinel = true))
+
+    kubernetes.logCalls should have size 1
+    val (id, sinceTime) = kubernetes.logCalls(0)
+    id shouldBe containerId
+    sinceTime shouldBe None
+  }
+
+  it should "read two consecutive logs with sentinel" in {
+    val firstLog = TypedLogLine(Instant.EPOCH, "stdout", "This is the first 
log.")
+    val secondLog = TypedLogLine(Instant.EPOCH.plusSeconds(1l), "stderr", 
"This is the second log.")
+    val logSources = mutable.Queue(logSource(firstLog, true), 
logSource(secondLog, true))
+
+    implicit val kubernetes = new TestKubernetesClient {
+      override def logs(id: ContainerId, sinceTime: Option[Instant], 
waitForSentinel: Boolean)(
+        implicit transid: TransactionId): Source[TypedLogLine, Any] = {
+        logCalls += ((id, sinceTime))
+        logSources.dequeue()
+      }
+    }
+
+    val container = kubernetesContainer()()
+    // Read without tight limit so that the full read result is processed
+    val processedFirstLog = awaitLogs(container.logs(limit = 1.MB, 
waitForSentinel = true))
+    val processedSecondLog = awaitLogs(container.logs(limit = 1.MB, 
waitForSentinel = true))
+
+    kubernetes.logCalls should have size 2
+    val (_, sinceTime1) = kubernetes.logCalls(0)
+    sinceTime1 shouldBe None
+    val (_, sinceTime2) = kubernetes.logCalls(1)
+    sinceTime2 shouldBe Some(EpochDateTime) // second read should start behind 
the first line
+
+    processedFirstLog should have size 1
+    processedFirstLog shouldBe Vector(firstLog.rawString)
+    processedSecondLog should have size 1
+    processedSecondLog shouldBe Vector(secondLog.rawString)
+
+  }
+
+  it should "eventually terminate even if no sentinels can be found" in {
+    val expectedLog = TypedLogLine(currentTsp, "stdout", s"This is log entry.")
+    val rawLog = toLogs(expectedLog, appendSentinel = false)
+
+    rawLog should have size 1
+
+    implicit val kubernetes = new TestKubernetesClient {
+      override def logs(containerId: ContainerId, sinceTime: Option[Instant], 
waitForSentinel: Boolean)(
+        implicit transid: TransactionId): Source[TypedLogLine, Any] = {
+        logCalls += ((containerId, sinceTime))
+        // "Fakes" an infinite source with only 1 entry
+        Source.tick(0.milliseconds, 10.seconds, rawLog.head)
+      }
+    }
+
+    val waitForLogs = 100.milliseconds
+    val container = kubernetesContainer()(awaitLogs = waitForLogs)
+    // Read without tight limit so that the full read result is processed
+
+    val (interval, processedLog) = durationOf(awaitLogs(container.logs(limit = 
1.MB, waitForSentinel = true)))
+
+    interval.toMillis should (be >= waitForLogs.toMillis and be < (waitForLogs 
* 2).toMillis)
+
+    kubernetes.logCalls should have size 1
+
+    /*    processedLog should have size expectedLog.length
+    processedLog shouldBe expectedLog.map(_.toFormattedString)*/
+  }
+
+  it should "include an incomplete warning if sentinels have not been found 
only if we wait for sentinels" in {
+    val expectedLogEntry =
+      TypedLogLine(currentTsp, "stdout", "This is a log entry.")
+
+    implicit val kubernetes = new TestKubernetesClient {
+      override def logs(containerId: ContainerId, sinceTime: Option[Instant], 
waitForSentinel: Boolean)(
+        implicit transid: TransactionId): Source[TypedLogLine, Any] = {
+        logCalls += ((containerId, sinceTime))
+        logSource(Seq(expectedLogEntry, expectedLogEntry), appendSentinel = 
false)
+      }
+    }
+
+    val waitForLogs = 100.milliseconds
+    val container = kubernetesContainer()(awaitLogs = waitForLogs)
+    // Read with tight limit to verify that no truncation occurs
+    val processedLogs = awaitLogs(container.logs(limit = 4096.B, 
waitForSentinel = true))
+
+    kubernetes.logCalls should have size 1
+    val (id, sinceTime) = kubernetes.logCalls(0)
+    id shouldBe containerId
+    sinceTime shouldBe None
+
+    processedLogs should have size 3
+    processedLogs(0) shouldBe expectedLogEntry.rawString
+    processedLogs(1) shouldBe expectedLogEntry.rawString
+    processedLogs(2) should include(Messages.logFailure)
+
+    val processedLogsFalse = awaitLogs(container.logs(limit = 4096.B, 
waitForSentinel = false))
+    processedLogsFalse should have size 2
+    processedLogsFalse(0) shouldBe expectedLogEntry.rawString
+    processedLogsFalse(1) shouldBe expectedLogEntry.rawString
+  }
+
+  it should "strip sentinel lines if it waits or doesn't wait for them" in {
+    val expectedLogEntry =
+      TypedLogLine(currentTsp, "stdout", "This is a log entry.")
+
+    implicit val kubernetes = new TestKubernetesClient {
+      override def logs(containerId: ContainerId, sinceTime: Option[Instant], 
waitForSentinel: Boolean)(
+        implicit transid: TransactionId): Source[TypedLogLine, Any] = {
+        logCalls += ((containerId, sinceTime))
+        logSource(expectedLogEntry, appendSentinel = true)
+      }
+    }
+
+    val container = kubernetesContainer(id = containerId)()
+    val processedLogs = awaitLogs(container.logs(limit = 1.MB, waitForSentinel 
= true))
+    processedLogs should have size 1
+    processedLogs(0) shouldBe expectedLogEntry.rawString
+
+    val processedLogsFalse = awaitLogs(container.logs(limit = 1.MB, 
waitForSentinel = false))
+    processedLogsFalse should have size 1
+    processedLogsFalse(0) shouldBe expectedLogEntry.rawString
+  }
+
+  def currentTsp: Instant = Instant.now
+
+}
+
+object KubernetesContainerTests {
+
+  def logSource(logLine: TypedLogLine, appendSentinel: Boolean): 
Source[TypedLogLine, Any] =
+    logSource(Seq(logLine), appendSentinel)
+
+  def logSource(logs: Seq[TypedLogLine], appendSentinel: Boolean): 
Source[TypedLogLine, Any] =
+    Source(toLogs(logs, appendSentinel).to[immutable.Seq])
+
+  def toLogs(logLine: TypedLogLine, appendSentinel: Boolean): 
Seq[TypedLogLine] =
+    toLogs(Seq(logLine), appendSentinel)
+
+  def toLogs(log: Seq[TypedLogLine], appendSentinel: Boolean): 
Seq[TypedLogLine] =
+    if (appendSentinel) {
+      val lastTime = log.lastOption.map { case TypedLogLine(time, _, _) => 
time }.getOrElse(Instant.EPOCH)
+      log :+
+        TypedLogLine(lastTime, "stderr", 
s"${DockerContainer.ActivationSentinel.utf8String}") :+
+        TypedLogLine(lastTime, "stdout", 
s"${DockerContainer.ActivationSentinel.utf8String}")
+    } else {
+      log
+    }
+
+  implicit class TypedLogHelper(log: TypedLogLine) {
+    import KubernetesClient.formatK8STimestamp
+
+    def rawString: String = "%s %s: 
%s".format(formatK8STimestamp(log.time).get.trim, log.stream, log.log)
+  }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to