cbickel closed pull request #3273: Wait for logs based on intervals not based 
on total processing time.
URL: https://github.com/apache/incubator-openwhisk/pull/3273
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
 
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
index 265a45031d..5d3083c620 100644
--- 
a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
+++ 
b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala
@@ -18,6 +18,7 @@
 package whisk.core.containerpool.docker
 
 import java.time.Instant
+import java.util.concurrent.TimeoutException
 import java.util.concurrent.atomic.AtomicLong
 
 import akka.actor.ActorSystem
@@ -80,7 +81,7 @@ object DockerContainer {
                                                             as: ActorSystem,
                                                             ec: 
ExecutionContext,
                                                             log: Logging): 
Future[DockerContainer] = {
-    implicit val tid = transid
+    implicit val tid: TransactionId = transid
 
     val environmentArgs = environment.flatMap {
       case (key, value) => Seq("-e", s"$key=$value")
@@ -246,18 +247,21 @@ class DockerContainer(protected val id: ContainerId,
         size
       }
       .via(new 
CompleteAfterOccurrences(_.containsSlice(DockerContainer.ActivationSentinel), 
2, waitForSentinel))
+      // As we're reading the logs after the activation has finished the 
invariant is that all loglines are already
+      // written and we mostly await them being flushed by the docker daemon. 
Therefore we can timeout based on the time
+      // between two loglines appear without relying on the log frequency in 
the action itself.
+      .idleTimeout(waitForLogs)
       .recover {
         case _: StreamLimitReachedException =>
           // While the stream has already ended by failing the limitWeighted 
stage above, we inject a truncation
           // notice downstream, which will be processed as usual. This will be 
the last element of the stream.
           ByteString(LogLine(Instant.now.toString, "stderr", 
Messages.truncateLogs(limit)).toJson.compactPrint)
-        case _: OccurrencesNotFoundException | _: FramingException =>
+        case _: OccurrencesNotFoundException | _: FramingException | _: 
TimeoutException =>
           // Stream has already ended and we insert a notice that data might 
be missing from the logs. While a
           // FramingException can also mean exceeding the limits, we cannot 
decide which case happened so we resort
           // to the general error message. This will be the last element of 
the stream.
           ByteString(LogLine(Instant.now.toString, "stderr", 
Messages.logFailure).toJson.compactPrint)
       }
-      .takeWithin(waitForLogs)
   }
 
   /** Delimiter used to split log-lines as written by the json-log-driver. */
@@ -279,9 +283,9 @@ class DockerContainer(protected val id: ContainerId,
  */
 class CompleteAfterOccurrences[T](isInEvent: T => Boolean, neededOccurrences: 
Int, errorOnNotEnough: Boolean)
     extends GraphStage[FlowShape[T, T]] {
-  val in = Inlet[T]("WaitForOccurances.in")
-  val out = Outlet[T]("WaitForOccurances.out")
-  override val shape = FlowShape.of(in, out)
+  val in: Inlet[T] = Inlet[T]("WaitForOccurrences.in")
+  val out: Outlet[T] = Outlet[T]("WaitForOccurrences.out")
+  override val shape: FlowShape[T, T] = FlowShape.of(in, out)
 
   override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
     new GraphStageLogic(shape) with InHandler with OutHandler {
diff --git 
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
 
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
index 5f9898e6b2..ea7781036b 100644
--- 
a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
+++ 
b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala
@@ -34,7 +34,6 @@ import org.scalamock.scalatest.MockFactory
 import org.scalatest.BeforeAndAfterEach
 import org.scalatest.FlatSpec
 import whisk.core.containerpool.logging.{DockerToActivationLogStore, LogLine}
-
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.Matchers
 import common.{StreamLogging, WskActorSystem}
@@ -49,7 +48,6 @@ import whisk.core.entity.ActivationResponse.ContainerResponse
 import whisk.core.entity.ActivationResponse.Timeout
 import whisk.core.entity.size._
 import whisk.http.Messages
-
 import whisk.core.entity.size._
 
 /**
@@ -583,8 +581,9 @@ class DockerContainerTests
 
     docker.rawContainerLogsInvocations should have size 1
 
-    processedLog should have size expectedLog.length
-    processedLog shouldBe expectedLog.map(_.toFormattedString)
+    processedLog should have size expectedLog.length + 1 //error log should be 
appended
+    processedLog.head shouldBe expectedLog.head.toFormattedString
+    processedLog(1) should include(Messages.logFailure)
   }
 
   it should "truncate logs and advance reading position to end of current 
read" in {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to