dgrove-oss closed pull request #3378: LogStoreProvider using invokerAgent for KubernetesContainerPool URL: https://github.com/apache/incubator-openwhisk/pull/3378
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala index 4be36a730a..0a7f9e8c14 100644 --- a/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/DockerToActivationFileLogStore.scala @@ -22,21 +22,22 @@ import java.time.Instant import akka.NotUsed import akka.actor.ActorSystem +import akka.http.scaladsl.model.HttpRequest import akka.stream.alpakka.file.scaladsl.LogRotatorSink -import akka.stream.{Graph, SinkShape, UniformFanOutShape} +import akka.stream.{ActorMaterializer, Graph, SinkShape, UniformFanOutShape} import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Keep, MergeHub, Sink, Source} import akka.util.ByteString import whisk.common.TransactionId import whisk.core.containerpool.Container -import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation} +import whisk.core.entity._ import whisk.core.entity.size._ import whisk.http.Messages import spray.json._ import spray.json.DefaultJsonProtocol._ -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} /** * Docker based implementation of a LogStore. @@ -48,7 +49,17 @@ import scala.concurrent.Future * Additionally writes logs to a separate file which can be processed by any backend service asynchronously. */ class DockerToActivationFileLogStore(system: ActorSystem, destinationDirectory: Path = Paths.get("logs")) - extends DockerToActivationLogStore(system) { + extends LogDriverForwarderLogStore(system) { + + implicit val ec: ExecutionContext = system.dispatcher + implicit val mat: ActorMaterializer = ActorMaterializer()(system) + + /* "json-file" is the log-driver that writes out to file */ + override val containerParameters = Map("--log-driver" -> Set("json-file")) + + /* As logs are already part of the activation record, just return that bit of it */ + override def fetchLogs(user: Identity, activation: WhiskActivation, request: HttpRequest): Future[ActivationLogs] = + Future.successful(activation.logs) /** * End of an event as written to a file. Closes the json-object and also appends a newline. @@ -92,22 +103,14 @@ class DockerToActivationFileLogStore(system: ActorSystem, destinationDirectory: })) .run() - override def collectLogs(transid: TransactionId, - user: Identity, - activation: WhiskActivation, - container: Container, - action: ExecutableWhiskAction): Future[ActivationLogs] = { - - val logs = container.logs(action.limits.logs.asMegaBytes, action.exec.sentinelledLogs)(transid) - - // Adding the userId field to every written record, so any background process can properly correlate. - val userIdField = Map("namespaceId" -> user.authkey.uuid.toJson) - - val additionalMetadata = Map( - "activationId" -> activation.activationId.asString.toJson, - "action" -> action.fullyQualifiedName(false).asString.toJson) ++ userIdField + def forwardLogs(transid: TransactionId, + container: Container, + sizeLimit: ByteSize, + sentinelledLogs: Boolean, + additionalMetadata: Map[String, JsValue], + augmentedActivation: JsObject): Future[ActivationLogs] = { - val augmentedActivation = JsObject(activation.toJson.fields ++ userIdField) + val logs = container.logs(sizeLimit, sentinelledLogs)(transid) // Manually construct JSON fields to omit parsing the whole structure val metadata = ByteString("," + fieldsString(additionalMetadata)) @@ -124,7 +127,7 @@ class DockerToActivationFileLogStore(system: ActorSystem, destinationDirectory: val combined = OwSink.combine(toSeq, toFile)(Broadcast[ByteString](_)) logs.runWith(combined)._1.flatMap { seq => - val possibleErrors = Set(Messages.logFailure, Messages.truncateLogs(action.limits.logs.asMegaBytes)) + val possibleErrors = Set(Messages.logFailure, Messages.truncateLogs(sizeLimit)) val errored = seq.lastOption.exists(last => possibleErrors.exists(last.contains)) val logs = ActivationLogs(seq.toVector) if (!errored) { diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverForwarderLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverForwarderLogStore.scala new file mode 100644 index 0000000000..6cea32d0f4 --- /dev/null +++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverForwarderLogStore.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.containerpool.logging + +import akka.actor.ActorSystem +import spray.json.JsObject +import whisk.core.entity._ +import whisk.common.TransactionId +import whisk.core.containerpool.Container +import spray.json._ +import spray.json.DefaultJsonProtocol._ + +import scala.concurrent.Future + +/** + * A LogDriverLogStore skeleton that forwards enriched logs to an external store + * at the end of each activation execution. Forwarding may either be done in the invoker, + * in which case the logs may also be returned from collectLogs, or by an external agent, + * in which case collectLogs will return an empty ActivationLogs instance. + */ +abstract class LogDriverForwarderLogStore(actorSystem: ActorSystem) extends LogDriverLogStore(actorSystem) { + + override def collectLogs(transid: TransactionId, + user: Identity, + activation: WhiskActivation, + container: Container, + action: ExecutableWhiskAction): Future[ActivationLogs] = { + + val sizeLimit = action.limits.logs.asMegaBytes + val sentinelledLogs = action.exec.sentinelledLogs + + // Add the userId field to every written record, so any background process can properly correlate. + val userIdField = Map("namespaceId" -> user.authkey.uuid.toJson) + + val additionalMetadata = Map( + "activationId" -> activation.activationId.asString.toJson, + "action" -> action.fullyQualifiedName(false).asString.toJson) ++ userIdField + + val augmentedActivation = JsObject(activation.toJson.fields ++ userIdField) + + forwardLogs(transid, container, sizeLimit, sentinelledLogs, additionalMetadata, augmentedActivation) + } + + def forwardLogs(transId: TransactionId, + container: Container, + sizeLimit: ByteSize, + sentinelledLogs: Boolean, + additionalMetadata: Map[String, JsValue], + augmentedActivation: JsObject): Future[ActivationLogs] +} diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala index 5320d4d1d7..8b909ca013 100644 --- a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogDriverLogStore.scala @@ -38,7 +38,7 @@ import scala.concurrent.Future class LogDriverLogStore(actorSystem: ActorSystem) extends LogStore { /** Indicate --log-driver and --log-opt flags via ContainerArgsConfig.extraArgs */ - override def containerParameters = Map.empty + override def containerParameters = Map.empty[String, Set[String]] def collectLogs(transid: TransactionId, user: Identity, diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala index 5f7c7d6413..21ac745a0c 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClient.scala @@ -26,9 +26,10 @@ import java.time.format.DateTimeFormatterBuilder import akka.actor.ActorSystem import akka.event.Logging.{ErrorLevel, InfoLevel} -import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri} +import akka.http.scaladsl.model._ import akka.http.scaladsl.model.Uri.Path import akka.http.scaladsl.model.Uri.Query +import akka.http.scaladsl.unmarshalling.Unmarshal import akka.stream.{Attributes, Outlet, SourceShape} import akka.http.scaladsl.Http import akka.stream.ActorMaterializer @@ -57,10 +58,12 @@ import scala.util.Try import spray.json._ import spray.json.DefaultJsonProtocol._ import collection.JavaConverters._ + import io.fabric8.kubernetes.client.ConfigBuilder import io.fabric8.kubernetes.client.DefaultKubernetesClient import okhttp3.{Call, Callback, Request, Response} import okio.BufferedSource +import whisk.core.entity.ByteSize import scala.annotation.tailrec import scala.collection.mutable @@ -223,6 +226,28 @@ class KubernetesClient( } } + def forwardLogs(container: KubernetesContainer, + lastOffset: Long, + sizeLimit: ByteSize, + sentinelledLogs: Boolean, + additionalMetadata: Map[String, JsValue], + augmentedActivation: JsObject)(implicit transid: TransactionId): Future[Long] = { + if (config.invokerAgent.enabled) { + val serializedData = Map( + "lastOffset" -> JsNumber(lastOffset), + "sizeLimit" -> JsNumber(sizeLimit.toBytes), + "sentinelledLogs" -> JsBoolean(sentinelledLogs), + "encodedLogLineMetadata" -> JsString(fieldsString(additionalMetadata)), + "encodedActivation" -> JsString(augmentedActivation.compactPrint)) + agentCommand("logs", container, serializedData) + .flatMap { response => + Unmarshal(response.entity).to[String].map(s => s.toLong) + } + } else { + Future.failed(new UnsupportedOperationException("forwardLogs requires whisk.kubernetes.invokerAgent=true")) + } + } + def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean = false)( implicit transid: TransactionId): Source[TypedLogLine, Any] = { @@ -234,6 +259,13 @@ class KubernetesClient( } + private def fieldsString(fields: Map[String, JsValue]) = + fields + .map { + case (key, value) => s""""$key":${value.compactPrint}""" + } + .mkString(",") + private def toContainer(pod: Pod): KubernetesContainer = { val id = ContainerId(pod.getMetadata.getName) val addr = ContainerAddress(pod.getStatus.getPodIP) @@ -246,13 +278,15 @@ class KubernetesClient( } // Forward a command to invoker-agent daemonset instance on container's worker node - private def agentCommand(command: String, container: KubernetesContainer): Future[HttpResponse] = { + private def agentCommand(command: String, + container: KubernetesContainer, + payload: Map[String, JsValue] = Map.empty): Future[HttpResponse] = { val uri = Uri() .withScheme("http") .withHost(container.workerIP) .withPort(config.invokerAgent.port) .withPath(Path / command / container.nativeContainerId) - Http().singleRequest(HttpRequest(uri = uri)) + Http().singleRequest(HttpRequest(uri = uri, entity = payload.toJson.compactPrint)) } private def runCmd(args: Seq[String], timeout: Duration)(implicit transid: TransactionId): Future[String] = { @@ -304,6 +338,13 @@ trait KubernetesApi { def resume(container: KubernetesContainer)(implicit transid: TransactionId): Future[Unit] + def forwardLogs(container: KubernetesContainer, + lastOffset: Long, + sizeLimit: ByteSize, + sentinelledLogs: Boolean, + additionalMetadata: Map[String, JsValue], + augmentedActivation: JsObject)(implicit transid: TransactionId): Future[Long] + def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean = false)( implicit transid: TransactionId): Source[TypedLogLine, Any] } diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala index f1d03d1919..f4d4e388e9 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala @@ -24,6 +24,7 @@ import akka.stream.StreamLimitReachedException import akka.stream.scaladsl.Framing.FramingException import akka.stream.scaladsl.Source import akka.util.ByteString +import spray.json.{JsObject, JsValue} import scala.concurrent.ExecutionContext import scala.concurrent.Future @@ -97,6 +98,9 @@ class KubernetesContainer(protected[core] val id: ContainerId, /** The last read timestamp in the log file */ private val lastTimestamp = new AtomicReference[Option[Instant]](None) + /** The last offset read in the remote log file */ + private val lastOffset = new AtomicReference[Long](0) + protected val waitForLogs: FiniteDuration = 2.seconds def suspend()(implicit transid: TransactionId): Future[Unit] = kubernetes.suspend(this) @@ -110,6 +114,17 @@ class KubernetesContainer(protected[core] val id: ContainerId, private val stringSentinel = DockerContainer.ActivationSentinel.utf8String + def forwardLogs(sizeLimit: ByteSize, + sentinelledLogs: Boolean, + additionalMetadata: Map[String, JsValue], + augmentedActivation: JsObject)(implicit transid: TransactionId): Future[Unit] = { + kubernetes + .forwardLogs(this, lastOffset.get(), sizeLimit, sentinelledLogs, additionalMetadata, augmentedActivation) + .map { newOffset => + lastOffset.set(newOffset) + } + } + def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Source[ByteString, Any] = { kubernetes diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/logging/InvokerAgentLogStore.scala b/core/invoker/src/main/scala/whisk/core/containerpool/logging/InvokerAgentLogStore.scala new file mode 100644 index 0000000000..dc39da79de --- /dev/null +++ b/core/invoker/src/main/scala/whisk/core/containerpool/logging/InvokerAgentLogStore.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.containerpool.logging + +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import whisk.common.TransactionId +import whisk.core.containerpool.Container +import whisk.core.entity._ +import spray.json._ + +import whisk.core.containerpool.kubernetes.KubernetesContainer + +import scala.concurrent.{ExecutionContext, Future} + +/** + * An implementation of a LogDriverForwarder for Kubernetes that delegates + * to the InvokerAgent running on the container's worker node to forward + * the logs appropriately to an external logging service. + * Logs are not brought back to the invoker and thus are not available + * except via the external logging service. + */ +class InvokerAgentLogStore(system: ActorSystem) extends LogDriverForwarderLogStore(system) { + + implicit val ec: ExecutionContext = system.dispatcher + implicit val mat: ActorMaterializer = ActorMaterializer()(system) + + override def forwardLogs(transid: TransactionId, + container: Container, + sizeLimit: ByteSize, + sentinelledLogs: Boolean, + additionalMetadata: Map[String, JsValue], + augmentedActivation: JsObject): Future[ActivationLogs] = { + + container match { + case kc: KubernetesContainer => { + kc.forwardLogs(sizeLimit, sentinelledLogs, additionalMetadata, augmentedActivation)(transid) + .map { _ => + ActivationLogs() + } + } + case _ => Future.failed(LogCollectingException(ActivationLogs())) + } + } +} + +object InvokerAgentLogStoreProvider extends LogStoreProvider { + override def logStore(actorSystem: ActorSystem): LogStore = new InvokerAgentLogStore(actorSystem) +} diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala index 0eb4f95688..fa2e103c3f 100644 --- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala @@ -37,6 +37,7 @@ import org.scalatest.Matchers import org.scalatest.time.{Seconds, Span} import common.{StreamLogging, WskActorSystem} import okio.Buffer +import spray.json.{JsObject, JsValue} import whisk.common.TransactionId import whisk.core.containerpool.{ContainerAddress, ContainerId} import whisk.core.containerpool.kubernetes.{ @@ -194,6 +195,7 @@ object KubernetesClientTests { var rmByLabels = mutable.Buffer.empty[(String, String)] var resumes = mutable.Buffer.empty[ContainerId] var suspends = mutable.Buffer.empty[ContainerId] + var forwardLogs = mutable.Buffer.empty[(ContainerId, Long)] var logCalls = mutable.Buffer.empty[(ContainerId, Option[Instant])] def run(name: String, @@ -231,6 +233,16 @@ object KubernetesClientTests { Future.successful({}) } + def forwardLogs(container: KubernetesContainer, + lastOffset: Long, + sizeLimit: ByteSize, + sentinelledLogs: Boolean, + additionalMetadata: Map[String, JsValue], + augmentedActivation: JsObject)(implicit transid: TransactionId): Future[Long] = { + forwardLogs += ((container.id, lastOffset)) + return Future.successful(lastOffset + sizeLimit.toBytes) // for testing, pretend we read size limit bytes + } + def logs(container: KubernetesContainer, sinceTime: Option[Instant], waitForSentinel: Boolean = false)( implicit transid: TransactionId): Source[TypedLogLine, Any] = { logCalls += ((container.id, sinceTime)) diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala index 3415b68861..0b9e565b1e 100644 --- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala @@ -292,6 +292,26 @@ class KubernetesContainerTests end.token shouldBe INVOKER_ACTIVATION_RUN.asFinish } + /* + * LOG FORWARDING + */ + it should "container should maintain lastOffset across calls to forwardLogs" in { + implicit val kubernetes = new TestKubernetesClient + val id = ContainerId("id") + val container = new KubernetesContainer(id, ContainerAddress("ip"), "127.0.0.1", "docker://foo") + val logChunk = 10.kilobytes + + Await.result(container.forwardLogs(logChunk, false, Map.empty, JsObject()), 500.milliseconds) + Await.result(container.forwardLogs(42.bytes, false, Map.empty, JsObject()), 500.milliseconds) + Await.result(container.forwardLogs(logChunk, false, Map.empty, JsObject()), 500.milliseconds) + Await.result(container.forwardLogs(42.bytes, false, Map.empty, JsObject()), 500.milliseconds) + + kubernetes.forwardLogs(0) shouldBe (id, 0) + kubernetes.forwardLogs(1) shouldBe (id, logChunk.toBytes) + kubernetes.forwardLogs(2) shouldBe (id, logChunk.toBytes + 42) + kubernetes.forwardLogs(3) shouldBe (id, 2 * logChunk.toBytes + 42) + } + /* * LOGS */ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services