dgrove-oss closed pull request #3378: LogStoreProvider using invokerAgent for 
KubernetesContainerPool
URL: https://github.com/apache/incubator-openwhisk/pull/3378
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
index 4be36a730a..0a7f9e8c14 100644
--- 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
+++ 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala
@@ -22,21 +22,22 @@ import java.time.Instant
 
 import akka.NotUsed
 import akka.actor.ActorSystem
+import akka.http.scaladsl.model.HttpRequest
 import akka.stream.alpakka.file.scaladsl.LogRotatorSink
-import akka.stream.{Graph, SinkShape, UniformFanOutShape}
+import akka.stream.{ActorMaterializer, Graph, SinkShape, UniformFanOutShape}
 import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, MergeHub, Sink, 
Source}
 import akka.util.ByteString
 
 import whisk.common.TransactionId
 import whisk.core.containerpool.Container
-import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, 
WhiskActivation}
+import whisk.core.entity._
 import whisk.core.entity.size._
 import whisk.http.Messages
 
 import spray.json._
 import spray.json.DefaultJsonProtocol._
 
-import scala.concurrent.Future
+import scala.concurrent.{ExecutionContext, Future}
 
 /**
  * Docker based implementation of a LogStore.
@@ -48,7 +49,17 @@ import scala.concurrent.Future
  * Additionally writes logs to a separate file which can be processed by any 
backend service asynchronously.
  */
 class DockerToActivationFileLogStore(system: ActorSystem, 
destinationDirectory: Path = Paths.get("logs"))
-    extends DockerToActivationLogStore(system) {
+    extends LogDriverForwarderLogStore(system) {
+
+  implicit val ec: ExecutionContext = system.dispatcher
+  implicit val mat: ActorMaterializer = ActorMaterializer()(system)
+
+  /* "json-file" is the log-driver that writes out to file */
+  override val containerParameters = Map("--log-driver" -> Set("json-file"))
+
+  /* As logs are already part of the activation record, just return that bit 
of it */
+  override def fetchLogs(user: Identity, activation: WhiskActivation, request: 
HttpRequest): Future[ActivationLogs] =
+    Future.successful(activation.logs)
 
   /**
    * End of an event as written to a file. Closes the json-object and also 
appends a newline.
@@ -92,22 +103,14 @@ class DockerToActivationFileLogStore(system: ActorSystem, 
destinationDirectory:
     }))
     .run()
 
-  override def collectLogs(transid: TransactionId,
-                           user: Identity,
-                           activation: WhiskActivation,
-                           container: Container,
-                           action: ExecutableWhiskAction): 
Future[ActivationLogs] = {
-
-    val logs = container.logs(action.limits.logs.asMegaBytes, 
action.exec.sentinelledLogs)(transid)
-
-    // Adding the userId field to every written record, so any background 
process can properly correlate.
-    val userIdField = Map("namespaceId" -> user.authkey.uuid.toJson)
-
-    val additionalMetadata = Map(
-      "activationId" -> activation.activationId.asString.toJson,
-      "action" -> action.fullyQualifiedName(false).asString.toJson) ++ 
userIdField
+  def forwardLogs(transid: TransactionId,
+                  container: Container,
+                  sizeLimit: ByteSize,
+                  sentinelledLogs: Boolean,
+                  additionalMetadata: Map[String, JsValue],
+                  augmentedActivation: JsObject): Future[ActivationLogs] = {
 
-    val augmentedActivation = JsObject(activation.toJson.fields ++ userIdField)
+    val logs = container.logs(sizeLimit, sentinelledLogs)(transid)
 
     // Manually construct JSON fields to omit parsing the whole structure
     val metadata = ByteString("," + fieldsString(additionalMetadata))
@@ -124,7 +127,7 @@ class DockerToActivationFileLogStore(system: ActorSystem, 
destinationDirectory:
     val combined = OwSink.combine(toSeq, toFile)(Broadcast[ByteString](_))
 
     logs.runWith(combined)._1.flatMap { seq =>
-      val possibleErrors = Set(Messages.logFailure, 
Messages.truncateLogs(action.limits.logs.asMegaBytes))
+      val possibleErrors = Set(Messages.logFailure, 
Messages.truncateLogs(sizeLimit))
       val errored = seq.lastOption.exists(last => 
possibleErrors.exists(last.contains))
       val logs = ActivationLogs(seq.toVector)
       if (!errored) {
diff --git 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverForwarderLogStore.scala
 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverForwarderLogStore.scala
new file mode 100644
index 0000000000..6cea32d0f4
--- /dev/null
+++ 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverForwarderLogStore.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import akka.actor.ActorSystem
+import spray.json.JsObject
+import whisk.core.entity._
+import whisk.common.TransactionId
+import whisk.core.containerpool.Container
+import spray.json._
+import spray.json.DefaultJsonProtocol._
+
+import scala.concurrent.Future
+
+/**
+ * A LogDriverLogStore skeleton that forwards enriched logs to an external 
store
+ * at the end of each activation execution.  Forwarding may either be done in 
the invoker,
+ * in which case the logs may also be returned from collectLogs, or by an 
external agent,
+ * in which case collectLogs will return an empty ActivationLogs instance.
+ */
+abstract class LogDriverForwarderLogStore(actorSystem: ActorSystem) extends 
LogDriverLogStore(actorSystem) {
+
+  override def collectLogs(transid: TransactionId,
+                           user: Identity,
+                           activation: WhiskActivation,
+                           container: Container,
+                           action: ExecutableWhiskAction): 
Future[ActivationLogs] = {
+
+    val sizeLimit = action.limits.logs.asMegaBytes
+    val sentinelledLogs = action.exec.sentinelledLogs
+
+    // Add the userId field to every written record, so any background process 
can properly correlate.
+    val userIdField = Map("namespaceId" -> user.authkey.uuid.toJson)
+
+    val additionalMetadata = Map(
+      "activationId" -> activation.activationId.asString.toJson,
+      "action" -> action.fullyQualifiedName(false).asString.toJson) ++ 
userIdField
+
+    val augmentedActivation = JsObject(activation.toJson.fields ++ userIdField)
+
+    forwardLogs(transid, container, sizeLimit, sentinelledLogs, 
additionalMetadata, augmentedActivation)
+  }
+
+  def forwardLogs(transId: TransactionId,
+                  container: Container,
+                  sizeLimit: ByteSize,
+                  sentinelledLogs: Boolean,
+                  additionalMetadata: Map[String, JsValue],
+                  augmentedActivation: JsObject): Future[ActivationLogs]
+}
diff --git 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
index 5320d4d1d7..8b909ca013 100644
--- 
a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
+++ 
b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala
@@ -38,7 +38,7 @@ import scala.concurrent.Future
 class LogDriverLogStore(actorSystem: ActorSystem) extends LogStore {
 
   /** Indicate --log-driver and --log-opt flags via 
ContainerArgsConfig.extraArgs */
-  override def containerParameters = Map.empty
+  override def containerParameters = Map.empty[String, Set[String]]
 
   def collectLogs(transid: TransactionId,
                   user: Identity,
diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
 
b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
index 5f7c7d6413..21ac745a0c 100644
--- 
a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
+++ 
b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala
@@ -26,9 +26,10 @@ import java.time.format.DateTimeFormatterBuilder
 
 import akka.actor.ActorSystem
 import akka.event.Logging.{ErrorLevel, InfoLevel}
-import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri}
+import akka.http.scaladsl.model._
 import akka.http.scaladsl.model.Uri.Path
 import akka.http.scaladsl.model.Uri.Query
+import akka.http.scaladsl.unmarshalling.Unmarshal
 import akka.stream.{Attributes, Outlet, SourceShape}
 import akka.http.scaladsl.Http
 import akka.stream.ActorMaterializer
@@ -57,10 +58,12 @@ import scala.util.Try
 import spray.json._
 import spray.json.DefaultJsonProtocol._
 import collection.JavaConverters._
+
 import io.fabric8.kubernetes.client.ConfigBuilder
 import io.fabric8.kubernetes.client.DefaultKubernetesClient
 import okhttp3.{Call, Callback, Request, Response}
 import okio.BufferedSource
+import whisk.core.entity.ByteSize
 
 import scala.annotation.tailrec
 import scala.collection.mutable
@@ -223,6 +226,28 @@ class KubernetesClient(
     }
   }
 
+  def forwardLogs(container: KubernetesContainer,
+                  lastOffset: Long,
+                  sizeLimit: ByteSize,
+                  sentinelledLogs: Boolean,
+                  additionalMetadata: Map[String, JsValue],
+                  augmentedActivation: JsObject)(implicit transid: 
TransactionId): Future[Long] = {
+    if (config.invokerAgent.enabled) {
+      val serializedData = Map(
+        "lastOffset" -> JsNumber(lastOffset),
+        "sizeLimit" -> JsNumber(sizeLimit.toBytes),
+        "sentinelledLogs" -> JsBoolean(sentinelledLogs),
+        "encodedLogLineMetadata" -> JsString(fieldsString(additionalMetadata)),
+        "encodedActivation" -> JsString(augmentedActivation.compactPrint))
+      agentCommand("logs", container, serializedData)
+        .flatMap { response =>
+          Unmarshal(response.entity).to[String].map(s => s.toLong)
+        }
+    } else {
+      Future.failed(new UnsupportedOperationException("forwardLogs requires 
whisk.kubernetes.invokerAgent=true"))
+    }
+  }
+
   def logs(container: KubernetesContainer, sinceTime: Option[Instant], 
waitForSentinel: Boolean = false)(
     implicit transid: TransactionId): Source[TypedLogLine, Any] = {
 
@@ -234,6 +259,13 @@ class KubernetesClient(
 
   }
 
+  private def fieldsString(fields: Map[String, JsValue]) =
+    fields
+      .map {
+        case (key, value) => s""""$key":${value.compactPrint}"""
+      }
+      .mkString(",")
+
   private def toContainer(pod: Pod): KubernetesContainer = {
     val id = ContainerId(pod.getMetadata.getName)
     val addr = ContainerAddress(pod.getStatus.getPodIP)
@@ -246,13 +278,15 @@ class KubernetesClient(
   }
 
   // Forward a command to invoker-agent daemonset instance on container's 
worker node
-  private def agentCommand(command: String, container: KubernetesContainer): 
Future[HttpResponse] = {
+  private def agentCommand(command: String,
+                           container: KubernetesContainer,
+                           payload: Map[String, JsValue] = Map.empty): 
Future[HttpResponse] = {
     val uri = Uri()
       .withScheme("http")
       .withHost(container.workerIP)
       .withPort(config.invokerAgent.port)
       .withPath(Path / command / container.nativeContainerId)
-    Http().singleRequest(HttpRequest(uri = uri))
+    Http().singleRequest(HttpRequest(uri = uri, entity = 
payload.toJson.compactPrint))
   }
 
   private def runCmd(args: Seq[String], timeout: Duration)(implicit transid: 
TransactionId): Future[String] = {
@@ -304,6 +338,13 @@ trait KubernetesApi {
 
   def resume(container: KubernetesContainer)(implicit transid: TransactionId): 
Future[Unit]
 
+  def forwardLogs(container: KubernetesContainer,
+                  lastOffset: Long,
+                  sizeLimit: ByteSize,
+                  sentinelledLogs: Boolean,
+                  additionalMetadata: Map[String, JsValue],
+                  augmentedActivation: JsObject)(implicit transid: 
TransactionId): Future[Long]
+
   def logs(container: KubernetesContainer, sinceTime: Option[Instant], 
waitForSentinel: Boolean = false)(
     implicit transid: TransactionId): Source[TypedLogLine, Any]
 }
diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
 
b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
index f1d03d1919..f4d4e388e9 100644
--- 
a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
+++ 
b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -24,6 +24,7 @@ import akka.stream.StreamLimitReachedException
 import akka.stream.scaladsl.Framing.FramingException
 import akka.stream.scaladsl.Source
 import akka.util.ByteString
+import spray.json.{JsObject, JsValue}
 
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
@@ -97,6 +98,9 @@ class KubernetesContainer(protected[core] val id: ContainerId,
   /** The last read timestamp in the log file */
   private val lastTimestamp = new AtomicReference[Option[Instant]](None)
 
+  /** The last offset read in the remote log file */
+  private val lastOffset = new AtomicReference[Long](0)
+
   protected val waitForLogs: FiniteDuration = 2.seconds
 
   def suspend()(implicit transid: TransactionId): Future[Unit] = 
kubernetes.suspend(this)
@@ -110,6 +114,17 @@ class KubernetesContainer(protected[core] val id: 
ContainerId,
 
   private val stringSentinel = DockerContainer.ActivationSentinel.utf8String
 
+  def forwardLogs(sizeLimit: ByteSize,
+                  sentinelledLogs: Boolean,
+                  additionalMetadata: Map[String, JsValue],
+                  augmentedActivation: JsObject)(implicit transid: 
TransactionId): Future[Unit] = {
+    kubernetes
+      .forwardLogs(this, lastOffset.get(), sizeLimit, sentinelledLogs, 
additionalMetadata, augmentedActivation)
+      .map { newOffset =>
+        lastOffset.set(newOffset)
+      }
+  }
+
   def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: 
TransactionId): Source[ByteString, Any] = {
 
     kubernetes
diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/logging/InvokerAgentLogStore.scala
 
b/core/invoker/src/main/scala/whisk/core/containerpool/logging/InvokerAgentLogStore.scala
new file mode 100644
index 0000000000..dc39da79de
--- /dev/null
+++ 
b/core/invoker/src/main/scala/whisk/core/containerpool/logging/InvokerAgentLogStore.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.containerpool.logging
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import whisk.common.TransactionId
+import whisk.core.containerpool.Container
+import whisk.core.entity._
+import spray.json._
+
+import whisk.core.containerpool.kubernetes.KubernetesContainer
+
+import scala.concurrent.{ExecutionContext, Future}
+
+/**
+ * An implementation of a LogDriverForwarder for Kubernetes that delegates
+ * to the InvokerAgent running on the container's worker node to forward
+ * the logs appropriately to an external logging service.
+ * Logs are not brought back to the invoker and thus are not available
+ * except via the external logging service.
+ */
+class InvokerAgentLogStore(system: ActorSystem) extends 
LogDriverForwarderLogStore(system) {
+
+  implicit val ec: ExecutionContext = system.dispatcher
+  implicit val mat: ActorMaterializer = ActorMaterializer()(system)
+
+  override def forwardLogs(transid: TransactionId,
+                           container: Container,
+                           sizeLimit: ByteSize,
+                           sentinelledLogs: Boolean,
+                           additionalMetadata: Map[String, JsValue],
+                           augmentedActivation: JsObject): 
Future[ActivationLogs] = {
+
+    container match {
+      case kc: KubernetesContainer => {
+        kc.forwardLogs(sizeLimit, sentinelledLogs, additionalMetadata, 
augmentedActivation)(transid)
+          .map { _ =>
+            ActivationLogs()
+          }
+      }
+      case _ => Future.failed(LogCollectingException(ActivationLogs()))
+    }
+  }
+}
+
+object InvokerAgentLogStoreProvider extends LogStoreProvider {
+  override def logStore(actorSystem: ActorSystem): LogStore = new 
InvokerAgentLogStore(actorSystem)
+}
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
 
b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
index 0eb4f95688..fa2e103c3f 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala
@@ -37,6 +37,7 @@ import org.scalatest.Matchers
 import org.scalatest.time.{Seconds, Span}
 import common.{StreamLogging, WskActorSystem}
 import okio.Buffer
+import spray.json.{JsObject, JsValue}
 import whisk.common.TransactionId
 import whisk.core.containerpool.{ContainerAddress, ContainerId}
 import whisk.core.containerpool.kubernetes.{
@@ -194,6 +195,7 @@ object KubernetesClientTests {
     var rmByLabels = mutable.Buffer.empty[(String, String)]
     var resumes = mutable.Buffer.empty[ContainerId]
     var suspends = mutable.Buffer.empty[ContainerId]
+    var forwardLogs = mutable.Buffer.empty[(ContainerId, Long)]
     var logCalls = mutable.Buffer.empty[(ContainerId, Option[Instant])]
 
     def run(name: String,
@@ -231,6 +233,16 @@ object KubernetesClientTests {
       Future.successful({})
     }
 
+    def forwardLogs(container: KubernetesContainer,
+                    lastOffset: Long,
+                    sizeLimit: ByteSize,
+                    sentinelledLogs: Boolean,
+                    additionalMetadata: Map[String, JsValue],
+                    augmentedActivation: JsObject)(implicit transid: 
TransactionId): Future[Long] = {
+      forwardLogs += ((container.id, lastOffset))
+      return Future.successful(lastOffset + sizeLimit.toBytes) // for testing, 
pretend we read size limit bytes
+    }
+
     def logs(container: KubernetesContainer, sinceTime: Option[Instant], 
waitForSentinel: Boolean = false)(
       implicit transid: TransactionId): Source[TypedLogLine, Any] = {
       logCalls += ((container.id, sinceTime))
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
 
b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index 3415b68861..0b9e565b1e 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -292,6 +292,26 @@ class KubernetesContainerTests
     end.token shouldBe INVOKER_ACTIVATION_RUN.asFinish
   }
 
+  /*
+   * LOG FORWARDING
+   */
+  it should "container should maintain lastOffset across calls to forwardLogs" 
in {
+    implicit val kubernetes = new TestKubernetesClient
+    val id = ContainerId("id")
+    val container = new KubernetesContainer(id, ContainerAddress("ip"), 
"127.0.0.1", "docker://foo")
+    val logChunk = 10.kilobytes
+
+    Await.result(container.forwardLogs(logChunk, false, Map.empty, 
JsObject()), 500.milliseconds)
+    Await.result(container.forwardLogs(42.bytes, false, Map.empty, 
JsObject()), 500.milliseconds)
+    Await.result(container.forwardLogs(logChunk, false, Map.empty, 
JsObject()), 500.milliseconds)
+    Await.result(container.forwardLogs(42.bytes, false, Map.empty, 
JsObject()), 500.milliseconds)
+
+    kubernetes.forwardLogs(0) shouldBe (id, 0)
+    kubernetes.forwardLogs(1) shouldBe (id, logChunk.toBytes)
+    kubernetes.forwardLogs(2) shouldBe (id, logChunk.toBytes + 42)
+    kubernetes.forwardLogs(3) shouldBe (id, 2 * logChunk.toBytes + 42)
+  }
+
   /*
    * LOGS
    */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to