cbickel closed pull request #3273: Wait for logs based on intervals not based on total processing time. URL: https://github.com/apache/incubator-openwhisk/pull/3273
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala index 265a45031d..5d3083c620 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala @@ -18,6 +18,7 @@ package whisk.core.containerpool.docker import java.time.Instant +import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicLong import akka.actor.ActorSystem @@ -80,7 +81,7 @@ object DockerContainer { as: ActorSystem, ec: ExecutionContext, log: Logging): Future[DockerContainer] = { - implicit val tid = transid + implicit val tid: TransactionId = transid val environmentArgs = environment.flatMap { case (key, value) => Seq("-e", s"$key=$value") @@ -246,18 +247,21 @@ class DockerContainer(protected val id: ContainerId, size } .via(new CompleteAfterOccurrences(_.containsSlice(DockerContainer.ActivationSentinel), 2, waitForSentinel)) + // As we're reading the logs after the activation has finished the invariant is that all loglines are already + // written and we mostly await them being flushed by the docker daemon. Therefore we can timeout based on the time + // between two loglines appear without relying on the log frequency in the action itself. + .idleTimeout(waitForLogs) .recover { case _: StreamLimitReachedException => // While the stream has already ended by failing the limitWeighted stage above, we inject a truncation // notice downstream, which will be processed as usual. This will be the last element of the stream. ByteString(LogLine(Instant.now.toString, "stderr", Messages.truncateLogs(limit)).toJson.compactPrint) - case _: OccurrencesNotFoundException | _: FramingException => + case _: OccurrencesNotFoundException | _: FramingException | _: TimeoutException => // Stream has already ended and we insert a notice that data might be missing from the logs. While a // FramingException can also mean exceeding the limits, we cannot decide which case happened so we resort // to the general error message. This will be the last element of the stream. ByteString(LogLine(Instant.now.toString, "stderr", Messages.logFailure).toJson.compactPrint) } - .takeWithin(waitForLogs) } /** Delimiter used to split log-lines as written by the json-log-driver. */ @@ -279,9 +283,9 @@ class DockerContainer(protected val id: ContainerId, */ class CompleteAfterOccurrences[T](isInEvent: T => Boolean, neededOccurrences: Int, errorOnNotEnough: Boolean) extends GraphStage[FlowShape[T, T]] { - val in = Inlet[T]("WaitForOccurances.in") - val out = Outlet[T]("WaitForOccurances.out") - override val shape = FlowShape.of(in, out) + val in: Inlet[T] = Inlet[T]("WaitForOccurrences.in") + val out: Outlet[T] = Outlet[T]("WaitForOccurrences.out") + override val shape: FlowShape[T, T] = FlowShape.of(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala index 5f9898e6b2..ea7781036b 100644 --- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala @@ -34,7 +34,6 @@ import org.scalamock.scalatest.MockFactory import org.scalatest.BeforeAndAfterEach import org.scalatest.FlatSpec import whisk.core.containerpool.logging.{DockerToActivationLogStore, LogLine} - import org.scalatest.junit.JUnitRunner import org.scalatest.Matchers import common.{StreamLogging, WskActorSystem} @@ -49,7 +48,6 @@ import whisk.core.entity.ActivationResponse.ContainerResponse import whisk.core.entity.ActivationResponse.Timeout import whisk.core.entity.size._ import whisk.http.Messages - import whisk.core.entity.size._ /** @@ -583,8 +581,9 @@ class DockerContainerTests docker.rawContainerLogsInvocations should have size 1 - processedLog should have size expectedLog.length - processedLog shouldBe expectedLog.map(_.toFormattedString) + processedLog should have size expectedLog.length + 1 //error log should be appended + processedLog.head shouldBe expectedLog.head.toFormattedString + processedLog(1) should include(Messages.logFailure) } it should "truncate logs and advance reading position to end of current read" in { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services