This is an automated email from the ASF dual-hosted git repository.

cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new ebe7788  Wait for logs based on intervals not based on total 
processing time. (#3273)
ebe7788 is described below

commit ebe7788b5261bb845e43b74525fa2ad2cacbf80b
Author: Markus Thömmes <markusthoem...@me.com>
AuthorDate: Mon Feb 12 10:59:28 2018 +0100

    Wait for logs based on intervals not based on total processing time. (#3273)
    
    Writing a large chunk of logs can take quite some time to process. The 
standard timeout for this process is 2 seconds today. It is bounded, because an 
action developer might break the action proxy to make sentinels not appear at 
all which would cause us to infinitely wait on sentinels.
    
    As we process logs after an activation has run though, we can safely rely 
on the time **between** two logs not exceeding a certain threshold. That way, 
the complete processing is not bounded by some arbitrary timeout (which can 
even be too short for large volumes) and is still tight enough to exit early if 
sentinels really are missing.
    
    Furthermore, an error line is inserted if this timeout hits to inform the 
user that something might've gone wrong.
---
 .../core/containerpool/docker/DockerContainer.scala      | 16 ++++++++++------
 .../containerpool/docker/test/DockerContainerTests.scala |  7 +++----
 2 files changed, 13 insertions(+), 10 deletions(-)

diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
 
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
index 265a450..5d3083c 100644
--- 
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
+++ 
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
@@ -18,6 +18,7 @@
 package whisk.core.containerpool.docker
 
 import java.time.Instant
+import java.util.concurrent.TimeoutException
 import java.util.concurrent.atomic.AtomicLong
 
 import akka.actor.ActorSystem
@@ -80,7 +81,7 @@ object DockerContainer {
                                                             as: ActorSystem,
                                                             ec: 
ExecutionContext,
                                                             log: Logging): 
Future[DockerContainer] = {
-    implicit val tid = transid
+    implicit val tid: TransactionId = transid
 
     val environmentArgs = environment.flatMap {
       case (key, value) => Seq("-e", s"$key=$value")
@@ -246,18 +247,21 @@ class DockerContainer(protected val id: ContainerId,
         size
       }
       .via(new 
CompleteAfterOccurrences(_.containsSlice(DockerContainer.ActivationSentinel), 
2, waitForSentinel))
+      // As we're reading the logs after the activation has finished the 
invariant is that all loglines are already
+      // written and we mostly await them being flushed by the docker daemon. 
Therefore we can timeout based on the time
+      // between two loglines appear without relying on the log frequency in 
the action itself.
+      .idleTimeout(waitForLogs)
       .recover {
         case _: StreamLimitReachedException =>
           // While the stream has already ended by failing the limitWeighted 
stage above, we inject a truncation
           // notice downstream, which will be processed as usual. This will be 
the last element of the stream.
           ByteString(LogLine(Instant.now.toString, "stderr", 
Messages.truncateLogs(limit)).toJson.compactPrint)
-        case _: OccurrencesNotFoundException | _: FramingException =>
+        case _: OccurrencesNotFoundException | _: FramingException | _: 
TimeoutException =>
           // Stream has already ended and we insert a notice that data might 
be missing from the logs. While a
           // FramingException can also mean exceeding the limits, we cannot 
decide which case happened so we resort
           // to the general error message. This will be the last element of 
the stream.
           ByteString(LogLine(Instant.now.toString, "stderr", 
Messages.logFailure).toJson.compactPrint)
       }
-      .takeWithin(waitForLogs)
   }
 
   /** Delimiter used to split log-lines as written by the json-log-driver. */
@@ -279,9 +283,9 @@ class DockerContainer(protected val id: ContainerId,
  */
 class CompleteAfterOccurrences[T](isInEvent: T => Boolean, neededOccurrences: 
Int, errorOnNotEnough: Boolean)
     extends GraphStage[FlowShape[T, T]] {
-  val in = Inlet[T]("WaitForOccurances.in")
-  val out = Outlet[T]("WaitForOccurances.out")
-  override val shape = FlowShape.of(in, out)
+  val in: Inlet[T] = Inlet[T]("WaitForOccurrences.in")
+  val out: Outlet[T] = Outlet[T]("WaitForOccurrences.out")
+  override val shape: FlowShape[T, T] = FlowShape.of(in, out)
 
   override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
     new GraphStageLogic(shape) with InHandler with OutHandler {
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
 
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index 5f9898e..ea77810 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -34,7 +34,6 @@ import org.scalamock.scalatest.MockFactory
 import org.scalatest.BeforeAndAfterEach
 import org.scalatest.FlatSpec
 import whisk.core.containerpool.logging.{DockerToActivationLogStore, LogLine}
-
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.Matchers
 import common.{StreamLogging, WskActorSystem}
@@ -49,7 +48,6 @@ import whisk.core.entity.ActivationResponse.ContainerResponse
 import whisk.core.entity.ActivationResponse.Timeout
 import whisk.core.entity.size._
 import whisk.http.Messages
-
 import whisk.core.entity.size._
 
 /**
@@ -583,8 +581,9 @@ class DockerContainerTests
 
     docker.rawContainerLogsInvocations should have size 1
 
-    processedLog should have size expectedLog.length
-    processedLog shouldBe expectedLog.map(_.toFormattedString)
+    processedLog should have size expectedLog.length + 1 //error log should be 
appended
+    processedLog.head shouldBe expectedLog.head.toFormattedString
+    processedLog(1) should include(Messages.logFailure)
   }
 
   it should "truncate logs and advance reading position to end of current 
read" in {

-- 
To stop receiving notification emails like this one, please contact
cbic...@apache.org.

Reply via email to