This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 15bb04a Introduce a ContainerClient interface and an akka based implementation. (#3812) 15bb04a is described below commit 15bb04a449f621d262c2687a7b8417241f3856b8 Author: tysonnorris <tysonnor...@gmail.com> AuthorDate: Thu Jul 26 14:25:44 2018 -0700 Introduce a ContainerClient interface and an akka based implementation. (#3812) HttpUtils (http client for invoker -> action container) uses org.apache.http client that is synchronous and poor performing for concurrent requests. I ran into problems using it with concurrent activation support. Instead of trying to force that client to work, this is work towards replacing it (or re-replacing it) with akka http based client. --- .../src/main/scala/whisk/common/Logging.scala | 3 + .../core/containerpool/AkkaContainerClient.scala | 222 +++++++++++++++++++++ ...s.scala => ApacheBlockingContainerClient.scala} | 52 +++-- .../scala/whisk/core/containerpool/Container.scala | 34 +++- .../whisk/core/containerpool/ContainerClient.scala | 30 +++ .../core/containerpool/ContainerFactory.scala | 2 +- .../whisk/core/database/CouchDbRestStore.scala | 7 +- .../main/scala/whisk/core/mesos/MesosTask.scala | 3 +- .../main/scala/whisk/http/PoolingRestClient.scala | 23 ++- core/invoker/src/main/resources/application.conf | 1 + .../containerpool/docker/DockerContainer.scala | 50 +++-- .../kubernetes/KubernetesContainer.scala | 4 +- .../scala/actionContainers/ActionContainer.scala | 11 +- .../docker/test/AkkaContainerClientTests.scala | 214 ++++++++++++++++++++ ...la => ApacheBlockingContainerClientTests.scala} | 66 ++++-- .../kubernetes/test/KubernetesClientTests.scala | 8 +- .../test/DockerToActivationLogStoreTests.scala | 5 +- .../mesos/test/MesosContainerFactoryTest.scala | 2 +- .../containerpool/test/ContainerPoolTests.scala | 28 +-- .../containerpool/test/ContainerProxyTests.scala | 3 +- 20 files changed, 660 insertions(+), 108 deletions(-) diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala index 56ab754..31d42a7 100644 --- a/common/scala/src/main/scala/whisk/common/Logging.scala +++ b/common/scala/src/main/scala/whisk/common/Logging.scala @@ -250,6 +250,7 @@ object LoggingMarkers { private val activation = "activation" private val kafka = "kafka" private val loadbalancer = "loadbalancer" + private val containerClient = "containerClient" /* * Controller related markers @@ -297,6 +298,8 @@ object LoggingMarkers { def INVOKER_KUBECTL_CMD(cmd: String) = LogMarkerToken(invoker, "kubectl", start, Some(cmd), Map("cmd" -> cmd)) def INVOKER_CONTAINER_START(containerState: String) = LogMarkerToken(invoker, "containerStart", count, Some(containerState), Map("containerState" -> containerState)) + val CONTAINER_CLIENT_RETRIES = + LogMarkerToken(containerClient, "retries", count) // Kafka related markers def KAFKA_QUEUE(topic: String) = LogMarkerToken(kafka, topic, count) diff --git a/common/scala/src/main/scala/whisk/core/containerpool/AkkaContainerClient.scala b/common/scala/src/main/scala/whisk/core/containerpool/AkkaContainerClient.scala new file mode 100644 index 0000000..a133989 --- /dev/null +++ b/common/scala/src/main/scala/whisk/core/containerpool/AkkaContainerClient.scala @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.containerpool + +import akka.actor.ActorSystem +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ +import akka.http.scaladsl.marshalling.Marshal +import akka.http.scaladsl.model.HttpMethods +import akka.http.scaladsl.model.HttpRequest +import akka.http.scaladsl.model.HttpResponse +import akka.http.scaladsl.model.MediaTypes +import akka.http.scaladsl.model.MessageEntity +import akka.http.scaladsl.model.StatusCodes +import akka.http.scaladsl.model.headers.Accept +import akka.http.scaladsl.model.headers.Connection +import akka.http.scaladsl.unmarshalling.Unmarshal +import akka.stream.StreamTcpException +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.util.ByteString +import scala.concurrent.Await +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.TimeoutException +import scala.concurrent.duration._ +import scala.util.Try +import scala.util.control.NonFatal +import spray.json._ +import whisk.common.Logging +import whisk.common.LoggingMarkers.CONTAINER_CLIENT_RETRIES +import whisk.common.MetricEmitter +import whisk.common.TransactionId +import whisk.core.entity.ActivationResponse.ContainerHttpError +import whisk.core.entity.ActivationResponse._ +import whisk.core.entity.ByteSize +import whisk.core.entity.size.SizeLong +import whisk.http.PoolingRestClient + +/** + * This HTTP client is used only in the invoker to communicate with the action container. + * It allows to POST a JSON object and receive JSON object back; that is the + * content type and the accept headers are both 'application/json. + * This implementation uses the akka http host-level client API. + * + * @param hostname the host name + * @param port the port + * @param timeout the timeout in msecs to wait for a response + * @param maxResponse the maximum size in bytes the connection will accept + * @param queueSize once all connections are used, how big of queue to allow for additional requests + * @param retryInterval duration between retries for TCP connection errors + */ +protected class AkkaContainerClient( + hostname: String, + port: Int, + timeout: FiniteDuration, + maxResponse: ByteSize, + queueSize: Int, + retryInterval: FiniteDuration = 100.milliseconds)(implicit logging: Logging, as: ActorSystem) + extends PoolingRestClient("http", hostname, port, queueSize, timeout = Some(timeout)) + with ContainerClient { + + def close() = Await.result(shutdown(), 30.seconds) + + /** + * Posts to hostname/endpoint the given JSON object. + * Waits up to timeout before aborting on a good connection. + * If the endpoint is not ready, retry up to timeout. + * Every retry reduces the available timeout so that this method should not + * wait longer than the total timeout (within a small slack allowance). + * + * @param endpoint the path the api call relative to hostname + * @param body the JSON value to post (this is usually a JSON objecT) + * @param retry whether or not to retry on connection failure + * @return Left(Error Message) or Right(Status Code, Response as UTF-8 String) + */ + def post(endpoint: String, body: JsValue, retry: Boolean)( + implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]] = { + + //create the request + val req = Marshal(body).to[MessageEntity].map { b => + //DO NOT reuse the connection + //For details on Connection: Close handling, see: + // - https://doc.akka.io/docs/akka-http/current/common/http-model.html#http-headers + // - http://github.com/akka/akka-http/tree/v10.1.3/akka-http-core/src/test/scala/akka/http/impl/engine/rendering/ResponseRendererSpec.scala#L470-L571 + HttpRequest(HttpMethods.POST, endpoint, entity = b) + .withHeaders(Connection("close"), Accept(MediaTypes.`application/json`)) + } + + retryingRequest(req, timeout, retry) + .flatMap { + case (response, retries) => { + if (retries > 0) { + logging.debug(this, s"completed request to $endpoint after $retries retries") + MetricEmitter.emitHistogramMetric(CONTAINER_CLIENT_RETRIES, retries) + } + + response.entity.contentLengthOption match { + case Some(contentLength) if response.status != StatusCodes.NoContent => + if (contentLength <= maxResponse.toBytes) { + Unmarshal(response.entity.withSizeLimit(maxResponse.toBytes)).to[String].map { o => + Right(ContainerResponse(response.status.intValue, o, None)) + } + } else { + truncated(response.entity.dataBytes).map { s => + Right(ContainerResponse(response.status.intValue, s, Some(contentLength.B, maxResponse))) + } + } + case _ => + //handle missing Content-Length as NoResponseReceived + //also handle 204 as NoResponseReceived, for parity with ApacheBlockingContainerClient client + response.discardEntityBytes().future.map(_ => Left(NoResponseReceived())) + } + } + } + .recover { + case t: TimeoutException => Left(Timeout(t)) + case NonFatal(t) => Left(ConnectionError(t)) + } + } + //returns a Future HttpResponse -> Int (where Int is the retryCount) + private def retryingRequest(req: Future[HttpRequest], + timeout: FiniteDuration, + retry: Boolean, + retryCount: Int = 0): Future[(HttpResponse, Int)] = { + request(req) + .map((_, retryCount)) + .recoverWith { + case t: StreamTcpException if retry => + val newTimeout = timeout - retryInterval + if (newTimeout > Duration.Zero) { + akka.pattern.after(retryInterval, as.scheduler)(retryingRequest(req, newTimeout, retry, retryCount + 1)) + } else { + logging.warn( + this, + s"POST failed after $retryCount retries with $t - no more retries because timeout exceeded.") + Future.failed(new TimeoutException(t.getMessage)) + } + } + } + + private def truncated(responseBytes: Source[ByteString, _], + previouslyCaptured: ByteString = ByteString.empty): Future[String] = { + responseBytes.prefixAndTail(1).runWith(Sink.head).flatMap { + case (Nil, tail) => + //ignore the tail (MUST CONSUME ENTIRE ENTITY!) + tail.runWith(Sink.ignore).map(_ => previouslyCaptured.utf8String) + case (Seq(prefix), tail) => + val truncatedResponse = previouslyCaptured ++ prefix + if (truncatedResponse.size < maxResponse.toBytes) { + truncated(tail, truncatedResponse) + } else { + //ignore the tail (MUST CONSUME ENTIRE ENTITY!) + //captured string MAY be larger than the max response, so take only maxResponse bytes to get the exact length + tail.runWith(Sink.ignore).map(_ => truncatedResponse.take(maxResponse.toBytes.toInt).utf8String) + } + } + } +} + +object AkkaContainerClient { + + /** A helper method to post one single request to a connection. Used for container tests. */ + def post(host: String, port: Int, endPoint: String, content: JsValue, timeout: FiniteDuration)( + implicit logging: Logging, + as: ActorSystem, + ec: ExecutionContext, + tid: TransactionId): (Int, Option[JsObject]) = { + val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1) + val response = executeRequest(connection, endPoint, content) + val result = Await.result(response, timeout + 10.seconds) //additional timeout to complete futures + connection.close() + result + } + + /** A helper method to post multiple concurrent requests to a single connection. Used for container tests. */ + def concurrentPost(host: String, port: Int, endPoint: String, contents: Seq[JsValue], timeout: FiniteDuration)( + implicit logging: Logging, + tid: TransactionId, + as: ActorSystem, + ec: ExecutionContext): Seq[(Int, Option[JsObject])] = { + val connection = new AkkaContainerClient(host, port, timeout, 1.MB, 1) + val futureResults = contents.map { executeRequest(connection, endPoint, _) } + val results = Await.result(Future.sequence(futureResults), timeout + 10.seconds) //additional timeout to complete futures + connection.close() + results + } + + private def executeRequest(connection: AkkaContainerClient, endpoint: String, content: JsValue)( + implicit logging: Logging, + as: ActorSystem, + ec: ExecutionContext, + tid: TransactionId): Future[(Int, Option[JsObject])] = { + + val res = connection + .post(endpoint, content, true) + .map({ + case Right(r) => (r.statusCode, Try(r.entity.parseJson.asJsObject).toOption) + case Left(NoResponseReceived()) => throw new IllegalStateException("no response from container") + case Left(Timeout(_)) => throw new java.util.concurrent.TimeoutException() + case Left(ConnectionError(t: java.net.SocketTimeoutException)) => + throw new java.util.concurrent.TimeoutException() + case Left(ConnectionError(t)) => throw new IllegalStateException(t.getMessage) + }) + + res + } +} diff --git a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala b/common/scala/src/main/scala/whisk/core/containerpool/ApacheBlockingContainerClient.scala similarity index 85% rename from common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala rename to common/scala/src/main/scala/whisk/core/containerpool/ApacheBlockingContainerClient.scala index 4ee7363..a6d71c3 100644 --- a/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/ApacheBlockingContainerClient.scala @@ -38,11 +38,15 @@ import whisk.core.entity.ByteSize import whisk.core.entity.size.SizeLong import scala.annotation.tailrec +import scala.concurrent._ import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration._ import scala.util.{Failure, Success, Try} import scala.util.control.NoStackTrace +// Used internally to wrap all exceptions for which the request can be retried +protected[containerpool] case class RetryableConnectionError(t: Throwable) extends Exception(t) with NoStackTrace + /** * This HTTP client is used only in the invoker to communicate with the action container. * It allows to POST a JSON object and receive JSON object back; that is the @@ -56,13 +60,16 @@ import scala.util.control.NoStackTrace * @param maxResponse the maximum size in bytes the connection will accept * @param maxConcurrent the maximum number of concurrent requests allowed (Default is 1) */ -protected class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse: ByteSize, maxConcurrent: Int = 1)( - implicit logging: Logging) { +protected class ApacheBlockingContainerClient(hostname: String, + timeout: FiniteDuration, + maxResponse: ByteSize, + maxConcurrent: Int = 1)(implicit logging: Logging, ec: ExecutionContext) + extends ContainerClient { /** * Closes the HttpClient and all resources allocated by it. * - * This will close the HttpClient that is generated for this instance of HttpUtils. That will also cause the + * This will close the HttpClient that is generated for this instance of ApacheBlockingContainerClient. That will also cause the * ConnectionManager to be closed alongside. */ def close(): Unit = HttpClientUtils.closeQuietly(connection) @@ -80,7 +87,7 @@ protected class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse * @return Left(Error Message) or Right(Status Code, Response as UTF-8 String) */ def post(endpoint: String, body: JsValue, retry: Boolean)( - implicit tid: TransactionId): Either[ContainerHttpError, ContainerResponse] = { + implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]] = { val entity = new StringEntity(body.compactPrint, StandardCharsets.UTF_8) entity.setContentType("application/json") @@ -88,12 +95,13 @@ protected class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse request.addHeader(HttpHeaders.ACCEPT, "application/json") request.setEntity(entity) - execute(request, timeout, maxConcurrent, retry) + Future { + blocking { + execute(request, timeout, maxConcurrent, retry) + } + } } - // Used internally to wrap all exceptions for which the request can be retried - private case class RetryableConnectionError(t: Throwable) extends Exception(t) with NoStackTrace - // Annotation will make the compiler complain if no tail recursion is possible @tailrec private def execute(request: HttpRequestBase, timeout: FiniteDuration, maxConcurrent: Int, retry: Boolean)( implicit tid: TransactionId): Either[ContainerHttpError, ContainerResponse] = { @@ -191,15 +199,19 @@ protected class HttpUtils(hostname: String, timeout: FiniteDuration, maxResponse .build } -object HttpUtils { +object ApacheBlockingContainerClient { /** A helper method to post one single request to a connection. Used for container tests. */ - def post(host: String, port: Int, endPoint: String, content: JsValue)(implicit logging: Logging, - tid: TransactionId): (Int, Option[JsObject]) = { - val connection = new HttpUtils(s"$host:$port", 90.seconds, 1.MB) + def post(host: String, port: Int, endPoint: String, content: JsValue)( + implicit logging: Logging, + tid: TransactionId, + ec: ExecutionContext): (Int, Option[JsObject]) = { + val timeout = 90.seconds + val connection = new ApacheBlockingContainerClient(s"$host:$port", timeout, 1.MB) val response = executeRequest(connection, endPoint, content) + val result = Await.result(response, timeout) connection.close() - response + result } /** A helper method to post multiple concurrent requests to a single connection. Used for container tests. */ @@ -207,17 +219,20 @@ object HttpUtils { implicit logging: Logging, tid: TransactionId, ec: ExecutionContext): Seq[(Int, Option[JsObject])] = { - val connection = new HttpUtils(s"$host:$port", 90.seconds, 1.MB, contents.size) - val futureResults = contents.map(content => Future { executeRequest(connection, endPoint, content) }) + val connection = new ApacheBlockingContainerClient(s"$host:$port", 90.seconds, 1.MB, contents.size) + val futureResults = contents.map { content => + executeRequest(connection, endPoint, content) + } val results = Await.result(Future.sequence(futureResults), timeout) connection.close() results } - private def executeRequest(connection: HttpUtils, endpoint: String, content: JsValue)( + private def executeRequest(connection: ApacheBlockingContainerClient, endpoint: String, content: JsValue)( implicit logging: Logging, - tid: TransactionId): (Int, Option[JsObject]) = { - connection.post(endpoint, content, retry = true) match { + tid: TransactionId, + ec: ExecutionContext): Future[(Int, Option[JsObject])] = { + connection.post(endpoint, content, retry = true) map { case Right(r) => (r.statusCode, Try(r.entity.parseJson.asJsObject).toOption) case Left(NoResponseReceived()) => throw new IllegalStateException("no response from container") case Left(Timeout(_)) => throw new java.util.concurrent.TimeoutException() @@ -225,5 +240,6 @@ object HttpUtils { throw new java.util.concurrent.TimeoutException() case Left(ConnectionError(t)) => throw new IllegalStateException(t.getMessage) } + } } diff --git a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala index 49c692b..7c46615 100644 --- a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala @@ -17,11 +17,11 @@ package whisk.core.containerpool +import akka.actor.ActorSystem import java.time.Instant - import akka.stream.scaladsl.Source import akka.util.ByteString - +import pureconfig._ import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration.Duration @@ -38,9 +38,10 @@ import whisk.core.entity.ActivationResponse import whisk.core.entity.ActivationResponse.ContainerConnectionError import whisk.core.entity.ActivationResponse.ContainerResponse import whisk.core.entity.ByteSize -import whisk.core.entity.size._ import whisk.http.Messages import akka.event.Logging.InfoLevel +import whisk.core.ConfigKeys +import whisk.core.entity.ActivationEntityLimit /** * An OpenWhisk biased container abstraction. This is **not only** an abstraction @@ -56,13 +57,17 @@ case class ContainerAddress(val host: String, val port: Int = 8080) { trait Container { + implicit protected val as: ActorSystem protected val id: ContainerId protected val addr: ContainerAddress protected implicit val logging: Logging protected implicit val ec: ExecutionContext + protected[containerpool] val config: ContainerPoolConfig = + loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool) + /** HTTP connection to the container, will be lazily established by callContainer */ - protected var httpConnection: Option[HttpUtils] = None + protected var httpConnection: Option[ContainerClient] = None /** Stops the container from consuming CPU cycles. */ def suspend()(implicit transid: TransactionId): Future[Unit] @@ -166,16 +171,23 @@ trait Container { implicit transid: TransactionId): Future[RunResult] = { val started = Instant.now() val http = httpConnection.getOrElse { - val conn = new HttpUtils(s"${addr.host}:${addr.port}", timeout, 1.MB) + val conn = if (config.akkaClient) { + new AkkaContainerClient(addr.host, addr.port, timeout, ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT, 1024) + } else { + new ApacheBlockingContainerClient( + s"${addr.host}:${addr.port}", + timeout, + ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT) + } httpConnection = Some(conn) conn } - Future { - http.post(path, body, retry) - }.map { response => - val finished = Instant.now() - RunResult(Interval(started, finished), response) - } + http + .post(path, body, retry) + .map { response => + val finished = Instant.now() + RunResult(Interval(started, finished), response) + } } } diff --git a/common/scala/src/main/scala/whisk/core/containerpool/ContainerClient.scala b/common/scala/src/main/scala/whisk/core/containerpool/ContainerClient.scala new file mode 100644 index 0000000..dfcd231 --- /dev/null +++ b/common/scala/src/main/scala/whisk/core/containerpool/ContainerClient.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.containerpool + +import scala.concurrent.Future +import spray.json._ +import whisk.common.TransactionId +import whisk.core.entity.ActivationResponse.ContainerHttpError +import whisk.core.entity.ActivationResponse._ + +trait ContainerClient extends AutoCloseable { + def post(endpoint: String, body: JsValue, retry: Boolean)( + implicit tid: TransactionId): Future[Either[ContainerHttpError, ContainerResponse]] + +} diff --git a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala index 5c972f0..3c56cf9 100644 --- a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala @@ -31,7 +31,7 @@ case class ContainerArgsConfig(network: String, dnsServers: Seq[String] = Seq.empty, extraArgs: Map[String, Set[String]] = Map.empty) -case class ContainerPoolConfig(numCore: Int, coreShare: Int) { +case class ContainerPoolConfig(numCore: Int, coreShare: Int, akkaClient: Boolean) { /** * The total number of containers is simply the number of cores dilated by the cpu sharing. diff --git a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala index 1e43eb7..aa35358 100644 --- a/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala +++ b/common/scala/src/main/scala/whisk/core/database/CouchDbRestStore.scala @@ -23,6 +23,8 @@ import akka.http.scaladsl.model._ import akka.stream.ActorMaterializer import akka.stream.scaladsl._ import akka.util.ByteString +import scala.concurrent.Await +import scala.concurrent.duration._ import spray.json._ import whisk.common.{Logging, LoggingMarkers, MetricEmitter, TransactionId} import whisk.core.database.StoreUtils._ @@ -30,8 +32,7 @@ import whisk.core.entity.Attachments.Attached import whisk.core.entity.{BulkEntityResult, DocInfo, DocumentReader, UUID} import whisk.http.Messages -import scala.concurrent.duration._ -import scala.concurrent.{Await, Future} +import scala.concurrent.Future import scala.util.Try /** @@ -513,7 +514,7 @@ class CouchDbRestStore[DocumentAbstraction <: DocumentSerializer](dbProtocol: St .getOrElse(Future.successful(true)) // For CouchDB it is expected that the entire document is deleted. override def shutdown(): Unit = { - Await.ready(client.shutdown(), 1.minute) + Await.result(client.shutdown(), 30.seconds) attachmentStore.foreach(_.shutdown()) } diff --git a/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala b/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala index 0e50543..db3f87c 100644 --- a/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala +++ b/common/scala/src/main/scala/whisk/core/mesos/MesosTask.scala @@ -119,7 +119,7 @@ object MesosTask { log.info(this, s"launched task with state ${taskDetails.taskStatus.getState} at ${taskHost}:${taskPort}") val containerIp = new ContainerAddress(taskHost, taskPort) val containerId = new ContainerId(taskId); - new MesosTask(containerId, containerIp, ec, log, taskId, mesosClientActor, mesosConfig) + new MesosTask(containerId, containerIp, ec, log, as, taskId, mesosClientActor, mesosConfig) }) } @@ -134,6 +134,7 @@ class MesosTask(override protected val id: ContainerId, override protected val addr: ContainerAddress, override protected val ec: ExecutionContext, override protected val logging: Logging, + override protected val as: ActorSystem, taskId: String, mesosClientActor: ActorRef, mesosConfig: MesosConfig) diff --git a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala index 5e24b29..909fffb 100644 --- a/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala +++ b/common/scala/src/main/scala/whisk/http/PoolingRestClient.scala @@ -22,12 +22,13 @@ import akka.http.scaladsl.Http import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import akka.http.scaladsl.marshalling._ import akka.http.scaladsl.model._ +import akka.http.scaladsl.settings.ConnectionPoolSettings import akka.http.scaladsl.unmarshalling._ import akka.stream.{ActorMaterializer, OverflowStrategy, QueueOfferResult} import akka.stream.scaladsl.{Flow, _} import spray.json._ - import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.concurrent.duration._ import scala.util.{Failure, Success, Try} /** @@ -43,18 +44,24 @@ class PoolingRestClient( host: String, port: Int, queueSize: Int, - httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None)( - implicit system: ActorSystem) { + httpFlow: Option[Flow[(HttpRequest, Promise[HttpResponse]), (Try[HttpResponse], Promise[HttpResponse]), Any]] = None, + timeout: Option[FiniteDuration] = None)(implicit system: ActorSystem) { require(protocol == "http" || protocol == "https", "Protocol must be one of { http, https }.") protected implicit val context: ExecutionContext = system.dispatcher protected implicit val materializer: ActorMaterializer = ActorMaterializer() + //if specified, override the ClientConnection idle-timeout value + private val timeoutSettings = { + val ps = ConnectionPoolSettings(system.settings.config) + timeout.map(t => ps.withUpdatedConnectionSettings(_.withIdleTimeout(t))).getOrElse(ps) + } + // Creates or retrieves a connection pool for the host. private val pool = if (protocol == "http") { - Http().cachedHostConnectionPool[Promise[HttpResponse]](host = host, port = port) + Http().cachedHostConnectionPool[Promise[HttpResponse]](host = host, port = port, settings = timeoutSettings) } else { - Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host = host, port = port) + Http().cachedHostConnectionPoolHttps[Promise[HttpResponse]](host = host, port = port, settings = timeoutSettings) } // Additional queue in case all connections are busy. Should hardly ever be @@ -64,8 +71,10 @@ class PoolingRestClient( .queue(queueSize, OverflowStrategy.dropNew) .via(httpFlow.getOrElse(pool)) .toMat(Sink.foreach({ - case ((Success(response), p)) => p.success(response) - case ((Failure(error), p)) => p.failure(error) + case (Success(response), p) => + p.success(response) + case (Failure(error), p) => + p.failure(error) }))(Keep.left) .run diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf index 00c3339..8edb386 100644 --- a/core/invoker/src/main/resources/application.conf +++ b/core/invoker/src/main/resources/application.conf @@ -29,6 +29,7 @@ whisk { container-pool { num-core: 4 # used for computing --cpushares, and max number of containers allowed core-share: 2 # used for computing --cpushares, and max number of containers allowed + akka-client: false # if true, use PoolingContainerClient for HTTP from invoker to action container (otherwise use ApacheBlockingContainerClient) } kubernetes { diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala index 499ae42..99ff3c3 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala @@ -20,12 +20,10 @@ package whisk.core.containerpool.docker import java.time.Instant import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicLong - import akka.actor.ActorSystem import akka.stream._ import akka.stream.scaladsl.Framing.FramingException import spray.json._ - import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import whisk.common.Logging @@ -166,7 +164,7 @@ class DockerContainer(protected val id: ContainerId, protected val addr: ContainerAddress, protected val useRunc: Boolean)(implicit docker: DockerApiWithFileAccess, runc: RuncApi, - as: ActorSystem, + override protected val as: ActorSystem, protected val ec: ExecutionContext, protected val logging: Logging) extends Container { @@ -210,29 +208,37 @@ class DockerContainer(protected val id: ContainerId, implicit transid: TransactionId): Future[RunResult] = { val started = Instant.now() val http = httpConnection.getOrElse { - val conn = new HttpUtils(s"${addr.host}:${addr.port}", timeout, ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT) + val conn = if (config.akkaClient) { + new AkkaContainerClient(addr.host, addr.port, timeout, ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT, 1024) + } else { + new ApacheBlockingContainerClient( + s"${addr.host}:${addr.port}", + timeout, + ActivationEntityLimit.MAX_ACTIVATION_ENTITY_LIMIT) + } httpConnection = Some(conn) conn } - Future { - http.post(path, body, retry) - }.flatMap { response => - val finished = Instant.now() - response.left - .map { - // Only check for memory exhaustion if there was a - // terminal connection error. - case error: ConnectionError => - isOomKilled().map { - case true => MemoryExhausted() - case false => error - } - case other => Future.successful(other) - } - .fold(_.map(Left(_)), right => Future.successful(Right(right))) - .map(res => RunResult(Interval(started, finished), res)) - } + http + .post(path, body, retry) + .flatMap { response => + val finished = Instant.now() + + response.left + .map { + // Only check for memory exhaustion if there was a + // terminal connection error. + case error: ConnectionError => + isOomKilled().map { + case true => MemoryExhausted() + case false => error + } + case other => Future.successful(other) + } + .fold(_.map(Left(_)), right => Future.successful(Right(right))) + .map(res => RunResult(Interval(started, finished), res)) + } } /** diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala index 9f0049c..f812add 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala @@ -17,15 +17,14 @@ package whisk.core.containerpool.kubernetes +import akka.actor.ActorSystem import java.time.Instant import java.util.concurrent.atomic.{AtomicLong, AtomicReference} - import akka.stream.StreamLimitReachedException import akka.stream.scaladsl.Framing.FramingException import akka.stream.scaladsl.Source import akka.util.ByteString import spray.json._ - import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ @@ -91,6 +90,7 @@ class KubernetesContainer(protected[core] val id: ContainerId, protected[core] val addr: ContainerAddress, protected[core] val workerIP: String, protected[core] val nativeContainerId: String)(implicit kubernetes: KubernetesApi, + override protected val as: ActorSystem, protected val ec: ExecutionContext, protected val logging: Logging) extends Container { diff --git a/tests/src/test/scala/actionContainers/ActionContainer.scala b/tests/src/test/scala/actionContainers/ActionContainer.scala index a168889..69f45ec 100644 --- a/tests/src/test/scala/actionContainers/ActionContainer.scala +++ b/tests/src/test/scala/actionContainers/ActionContainer.scala @@ -229,18 +229,21 @@ object ActionContainer { } private def syncPost(host: String, port: Int, endPoint: String, content: JsValue)( - implicit logging: Logging): (Int, Option[JsObject]) = { + implicit logging: Logging, + as: ActorSystem): (Int, Option[JsObject]) = { implicit val transid = TransactionId.testing - whisk.core.containerpool.HttpUtils.post(host, port, endPoint, content) + whisk.core.containerpool.AkkaContainerClient.post(host, port, endPoint, content, 30.seconds) } private def concurrentSyncPost(host: String, port: Int, endPoint: String, contents: Seq[JsValue])( implicit logging: Logging, - ec: ExecutionContext): Seq[(Int, Option[JsObject])] = { + ec: ExecutionContext, + as: ActorSystem): Seq[(Int, Option[JsObject])] = { implicit val transid = TransactionId.testing - whisk.core.containerpool.HttpUtils.concurrentPost(host, port, endPoint, contents, 30.seconds) + whisk.core.containerpool.AkkaContainerClient.concurrentPost(host, port, endPoint, contents, 30.seconds) } + } diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/AkkaContainerClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/AkkaContainerClientTests.scala new file mode 100644 index 0000000..1afdeb2 --- /dev/null +++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/AkkaContainerClientTests.scala @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.containerpool.docker.test + +import common.StreamLogging +import common.WskActorSystem +import java.nio.charset.StandardCharsets +import java.time.Instant +import org.apache.http.HttpRequest +import org.apache.http.HttpResponse +import org.apache.http.entity.StringEntity +import org.apache.http.localserver.LocalServerTestBase +import org.apache.http.protocol.HttpContext +import org.apache.http.protocol.HttpRequestHandler +import org.junit.runner.RunWith +import org.scalatest.BeforeAndAfter +import org.scalatest.BeforeAndAfterAll +import org.scalatest.FlatSpec +import org.scalatest.Matchers +import org.scalatest.junit.JUnitRunner +import scala.concurrent.Await +import scala.concurrent.TimeoutException +import scala.concurrent.duration._ +import spray.json.JsObject +import whisk.common.TransactionId +import whisk.core.containerpool.AkkaContainerClient +import whisk.core.entity.ActivationResponse._ +import whisk.core.entity.size._ + +/** + * Unit tests for AkkaContainerClientTests which communicate with containers. + */ +@RunWith(classOf[JUnitRunner]) +class AkkaContainerClientTests + extends FlatSpec + with Matchers + with BeforeAndAfter + with BeforeAndAfterAll + with StreamLogging + with WskActorSystem { + + implicit val transid = TransactionId.testing + implicit val ec = actorSystem.dispatcher + + var testHang: FiniteDuration = 0.second + var testStatusCode: Int = 200 + var testResponse: String = null + var testConnectionFailCount: Int = 0 + + val mockServer = new LocalServerTestBase { + var failcount = 0 + override def setUp() = { + super.setUp() + this.serverBootstrap + .registerHandler( + "/init", + new HttpRequestHandler() { + override def handle(request: HttpRequest, response: HttpResponse, context: HttpContext) = { + if (testHang.length > 0) { + Thread.sleep(testHang.toMillis) + } + if (testConnectionFailCount > 0 && failcount < testConnectionFailCount) { + failcount += 1 + println("failing in test") + throw new RuntimeException("failing...") + } + response.setStatusCode(testStatusCode); + if (testResponse != null) { + response.setEntity(new StringEntity(testResponse, StandardCharsets.UTF_8)) + } + } + }) + } + } + + mockServer.setUp() + val httpHost = mockServer.start() + val hostWithPort = s"${httpHost.getHostName}:${httpHost.getPort}" + + before { + testHang = 0.second + testStatusCode = 200 + testResponse = null + testConnectionFailCount = 0 + stream.reset() + } + + override def afterAll = { + mockServer.shutDown() + } + + behavior of "AkkaContainerClient" + + it should "not wait longer than set timeout" in { + val timeout = 5.seconds + val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout, 1.B, 100) + testHang = timeout * 2 + val start = Instant.now() + val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds) + + val end = Instant.now() + val waited = end.toEpochMilli - start.toEpochMilli + result shouldBe 'left + waited should be > timeout.toMillis + waited should be < (timeout * 2).toMillis + } + + it should "handle empty entity response" in { + val timeout = 5.seconds + val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout, 1.B, 100) + testStatusCode = 204 + val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds) + result shouldBe Left(NoResponseReceived()) + } + + it should "retry till timeout on StreamTcpException" in { + val timeout = 5.seconds + val connection = new AkkaContainerClient("0.0.0.0", 12345, timeout, 1.B, 100) + val start = Instant.now() + val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds) + val end = Instant.now() + val waited = end.toEpochMilli - start.toEpochMilli + result match { + case Left(Timeout(_: TimeoutException)) => // good + case _ => fail(s"$result was not a Timeout(TimeoutException)") + } + waited should be > timeout.toMillis + waited should be < (timeout * 2).toMillis + } + + it should "retry till success within timeout limit" in { + val timeout = 5.seconds + val retryInterval = 500.milliseconds + val connection = + new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout, 1.B, 100, retryInterval) + val start = Instant.now() + testConnectionFailCount = 5 + testResponse = "" + val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds) + val end = Instant.now() + val waited = end.toEpochMilli - start.toEpochMilli + result shouldBe Right { + ContainerResponse(true, "", None) + } + + waited should be > (testConnectionFailCount * retryInterval).toMillis + waited should be < timeout.toMillis + } + + it should "not truncate responses within limit" in { + val timeout = 1.minute.toMillis + val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout.millis, 50.B, 100) + Seq(true, false).foreach { success => + Seq(null, "", "abc", """{"a":"B"}""", """["a", "b"]""").foreach { r => + testStatusCode = if (success) 200 else 500 + testResponse = r + val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds) + result shouldBe Right { + ContainerResponse(okStatus = success, if (r != null) r else "", None) + } + } + } + } + + it should "truncate responses that exceed limit" in { + val timeout = 1.minute.toMillis + val limit = 1.B + val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout.millis, limit, 100) + Seq(true, false).foreach { success => + Seq("abc", """{"a":"B"}""", """["a", "b"]""").foreach { r => + testStatusCode = if (success) 200 else 500 + testResponse = r + val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds) + result shouldBe Right { + ContainerResponse(okStatus = success, r.take(limit.toBytes.toInt), Some((r.length.B, limit))) + } + } + } + } + + it should "truncate large responses that exceed limit" in { + val timeout = 1.minute.toMillis + //use a limit large enough to not fit into a single ByteString as response entity is parsed into multiple ByteStrings + //seems like this varies, but often is ~64k or ~128k + val limit = 300.KB + val connection = new AkkaContainerClient(httpHost.getHostName, httpHost.getPort, timeout.millis, limit, 100) + Seq(true, false).foreach { success => + // Generate a response that's 1MB + val response = "0" * 1024 * 1024 + testStatusCode = if (success) 200 else 500 + testResponse = response + val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds) + result shouldBe Right { + ContainerResponse(okStatus = success, response.take(limit.toBytes.toInt), Some((response.length.B, limit))) + } + + } + } +} diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala similarity index 60% rename from tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala rename to tests/src/test/scala/whisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala index 3816cfc..a38fa52 100644 --- a/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/ApacheBlockingContainerClientTests.scala @@ -19,7 +19,6 @@ package whisk.core.containerpool.docker.test import java.nio.charset.StandardCharsets import java.time.Instant - import scala.concurrent.duration._ import org.apache.http.HttpRequest import org.apache.http.HttpResponse @@ -35,23 +34,30 @@ import org.scalatest.FlatSpec import org.scalatest.Matchers import spray.json.JsObject import common.StreamLogging +import common.WskActorSystem +import org.apache.http.conn.HttpHostConnectException +import scala.concurrent.Await import whisk.common.TransactionId -import whisk.core.containerpool.HttpUtils +import whisk.core.containerpool.ApacheBlockingContainerClient +import whisk.core.containerpool.RetryableConnectionError +import whisk.core.entity.ActivationResponse.Timeout import whisk.core.entity.size._ import whisk.core.entity.ActivationResponse._ /** - * Unit tests for HttpUtils which communicate with containers. + * Unit tests for ApacheBlockingContainerClient which communicate with containers. */ @RunWith(classOf[JUnitRunner]) -class ContainerConnectionTests +class ApacheBlockingContainerClientTests extends FlatSpec with Matchers with BeforeAndAfter with BeforeAndAfterAll - with StreamLogging { + with StreamLogging + with WskActorSystem { implicit val transid = TransactionId.testing + implicit val ec = actorSystem.dispatcher var testHang: FiniteDuration = 0.second var testStatusCode: Int = 200 @@ -89,14 +95,15 @@ class ContainerConnectionTests mockServer.shutDown() } - behavior of "Container HTTP Utils" + behavior of "ApacheBlockingContainerClient" it should "not wait longer than set timeout" in { val timeout = 5.seconds - val connection = new HttpUtils(hostWithPort, timeout, 1.B) + val connection = new ApacheBlockingContainerClient(hostWithPort, timeout, 1.B) testHang = timeout * 2 val start = Instant.now() - val result = connection.post("/init", JsObject.empty, retry = true) + val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds) + val end = Instant.now() val waited = end.toEpochMilli - start.toEpochMilli result shouldBe 'left @@ -106,22 +113,41 @@ class ContainerConnectionTests it should "handle empty entity response" in { val timeout = 5.seconds - val connection = new HttpUtils(hostWithPort, timeout, 1.B) + val connection = new ApacheBlockingContainerClient(hostWithPort, timeout, 1.B) testStatusCode = 204 - val result = connection.post("/init", JsObject.empty, retry = true) + val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds) result shouldBe Left(NoResponseReceived()) } + it should "retry till timeout on HttpHostConnectException" in { + val timeout = 5.seconds + val badHostAndPort = "0.0.0.0:12345" + val connection = new ApacheBlockingContainerClient(badHostAndPort, timeout, 1.B) + testStatusCode = 204 + val start = Instant.now() + val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds) + val end = Instant.now() + val waited = end.toEpochMilli - start.toEpochMilli + result match { + case Left(Timeout(RetryableConnectionError(_: HttpHostConnectException))) => // all good + case _ => + fail(s"$result was not a Timeout(RetryableConnectionError(HttpHostConnectException)))") + } + + waited should be > timeout.toMillis + waited should be < (timeout * 2).toMillis + } + it should "not truncate responses within limit" in { val timeout = 1.minute.toMillis - val connection = new HttpUtils(hostWithPort, timeout.millis, 50.B) - Seq(true, false).foreach { code => + val connection = new ApacheBlockingContainerClient(hostWithPort, timeout.millis, 50.B) + Seq(true, false).foreach { success => Seq(null, "", "abc", """{"a":"B"}""", """["a", "b"]""").foreach { r => - testStatusCode = if (code) 200 else 500 + testStatusCode = if (success) 200 else 500 testResponse = r - val result = connection.post("/init", JsObject.empty, retry = true) + val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds) result shouldBe Right { - ContainerResponse(okStatus = code, if (r != null) r else "", None) + ContainerResponse(okStatus = success, if (r != null) r else "", None) } } } @@ -131,14 +157,14 @@ class ContainerConnectionTests val timeout = 1.minute.toMillis val limit = 1.B val excess = limit + 1.B - val connection = new HttpUtils(hostWithPort, timeout.millis, limit) - Seq(true, false).foreach { code => + val connection = new ApacheBlockingContainerClient(hostWithPort, timeout.millis, limit) + Seq(true, false).foreach { success => Seq("abc", """{"a":"B"}""", """["a", "b"]""").foreach { r => - testStatusCode = if (code) 200 else 500 + testStatusCode = if (success) 200 else 500 testResponse = r - val result = connection.post("/init", JsObject.empty, retry = true) + val result = Await.result(connection.post("/init", JsObject.empty, retry = true), 10.seconds) result shouldBe Right { - ContainerResponse(okStatus = code, r.take(limit.toBytes.toInt), Some((r.length.B, limit))) + ContainerResponse(okStatus = success, r.take(limit.toBytes.toInt), Some((r.length.B, limit))) } } } diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala index 5190a2f..1f584d7 100644 --- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesClientTests.scala @@ -92,7 +92,7 @@ class KubernetesClientTests def kubernetesContainer(id: ContainerId) = new KubernetesContainer(id, ContainerAddress("ip"), "ip", "docker://" + id.asString)(kubernetesClient { Future.successful("") - }, global, logging) + }, actorSystem, global, logging) behavior of "KubernetesClient" @@ -188,7 +188,7 @@ object KubernetesClientTests { implicit def strToInstant(str: String): Instant = strToDate(str).get - class TestKubernetesClient extends KubernetesApi with StreamLogging { + class TestKubernetesClient(implicit as: ActorSystem) extends KubernetesApi with StreamLogging { var runs = mutable.Buffer.empty[(String, String, Map[String, String], Map[String, String])] var rms = mutable.Buffer.empty[ContainerId] var rmByLabels = mutable.Buffer.empty[(String, String)] @@ -238,7 +238,9 @@ object KubernetesClientTests { } } - class TestKubernetesClientWithInvokerAgent extends TestKubernetesClient with KubernetesApiWithInvokerAgent { + class TestKubernetesClientWithInvokerAgent(implicit as: ActorSystem) + extends TestKubernetesClient + with KubernetesApiWithInvokerAgent { var agentCommands = mutable.Buffer.empty[(ContainerId, String, Option[Map[String, JsValue]])] var forwardLogs = mutable.Buffer.empty[(ContainerId, Long)] diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala index c63cad5..2de3543 100644 --- a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationLogStoreTests.scala @@ -17,6 +17,7 @@ package whisk.core.containerpool.logging.test +import akka.actor.ActorSystem import common.{StreamLogging, WskActorSystem} import org.junit.runner.RunWith import org.scalatest.{FlatSpec, Matchers} @@ -25,14 +26,12 @@ import whisk.core.containerpool.logging.{DockerToActivationLogStoreProvider, Log import whisk.core.entity.ExecManifest.{ImageName, RuntimeManifest} import whisk.core.entity._ import java.time.Instant - import akka.stream.scaladsl.Source import akka.util.ByteString import spray.json._ import whisk.common.{Logging, TransactionId} import whisk.core.containerpool.{Container, ContainerAddress, ContainerId} import whisk.http.Messages - import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration._ @@ -107,5 +106,7 @@ class DockerToActivationLogStoreTests extends FlatSpec with Matchers with WskAct def resume()(implicit transid: TransactionId): Future[Unit] = ??? def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId) = lines + + override implicit protected val as: ActorSystem = actorSystem } } diff --git a/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala b/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala index 51d899c..b686d56 100644 --- a/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala +++ b/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala @@ -78,7 +78,7 @@ class MesosContainerFactoryTest lastTaskId } - val poolConfig = ContainerPoolConfig(8, 10) + val poolConfig = ContainerPoolConfig(8, 10, false) val dockerCpuShares = poolConfig.cpuShare val mesosCpus = poolConfig.cpuShare / 1024.0 diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala index 66630b2..3de575d 100644 --- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala @@ -113,6 +113,8 @@ class ContainerPoolTests (containers, factory) } + def poolConfig(numCore: Int, coreShare: Int) = ContainerPoolConfig(numCore, coreShare, false) + behavior of "ContainerPool" /* @@ -124,7 +126,7 @@ class ContainerPoolTests it should "reuse a warm container" in within(timeout) { val (containers, factory) = testContainers(2) val feed = TestProbe() - val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(2, 2), feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, poolConfig(2, 2), feed.ref)) pool ! runMessage containers(0).expectMsg(runMessage) @@ -138,7 +140,7 @@ class ContainerPoolTests it should "reuse a warm container when action is the same even if revision changes" in within(timeout) { val (containers, factory) = testContainers(2) val feed = TestProbe() - val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(2, 2), feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, poolConfig(2, 2), feed.ref)) pool ! runMessage containers(0).expectMsg(runMessage) @@ -153,7 +155,7 @@ class ContainerPoolTests val (containers, factory) = testContainers(2) val feed = TestProbe() - val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(2, 2), feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, poolConfig(2, 2), feed.ref)) pool ! runMessage containers(0).expectMsg(runMessage) // Note that the container doesn't respond, thus it's not free to take work @@ -167,7 +169,7 @@ class ContainerPoolTests val feed = TestProbe() // a pool with only 1 slot - val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(1, 1), feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, poolConfig(1, 1), feed.ref)) pool ! runMessage containers(0).expectMsg(runMessage) containers(0).send(pool, NeedWork(warmedData())) @@ -182,7 +184,7 @@ class ContainerPoolTests val feed = TestProbe() // a pool with only 1 active slot but 2 slots in total - val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(1, 2), feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, poolConfig(1, 2), feed.ref)) // Run the first container pool ! runMessage @@ -208,7 +210,7 @@ class ContainerPoolTests val feed = TestProbe() // a pool with only 1 slot - val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(1, 1), feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, poolConfig(1, 1), feed.ref)) pool ! runMessage containers(0).expectMsg(runMessage) containers(0).send(pool, NeedWork(warmedData())) @@ -223,7 +225,7 @@ class ContainerPoolTests val feed = TestProbe() // a pool with only 1 slot - val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(1, 1), feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, poolConfig(1, 1), feed.ref)) pool ! runMessage containers(0).expectMsg(runMessage) containers(0).send(pool, RescheduleJob) // emulate container failure ... @@ -241,7 +243,8 @@ class ContainerPoolTests val pool = system.actorOf( - ContainerPool.props(factory, ContainerPoolConfig(0, 0), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit)))) + ContainerPool + .props(factory, poolConfig(0, 0), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit)))) containers(0).expectMsg(Start(exec, memoryLimit)) } @@ -251,7 +254,8 @@ class ContainerPoolTests val pool = system.actorOf( - ContainerPool.props(factory, ContainerPoolConfig(1, 1), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit)))) + ContainerPool + .props(factory, poolConfig(1, 1), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit)))) containers(0).expectMsg(Start(exec, memoryLimit)) containers(0).send(pool, NeedWork(preWarmedData(exec.kind))) pool ! runMessage @@ -266,7 +270,7 @@ class ContainerPoolTests val pool = system.actorOf( ContainerPool - .props(factory, ContainerPoolConfig(1, 1), feed.ref, List(PrewarmingConfig(1, alternativeExec, memoryLimit)))) + .props(factory, poolConfig(1, 1), feed.ref, List(PrewarmingConfig(1, alternativeExec, memoryLimit)))) containers(0).expectMsg(Start(alternativeExec, memoryLimit)) // container0 was prewarmed containers(0).send(pool, NeedWork(preWarmedData(alternativeExec.kind))) pool ! runMessage @@ -282,7 +286,7 @@ class ContainerPoolTests val pool = system.actorOf( ContainerPool - .props(factory, ContainerPoolConfig(1, 1), feed.ref, List(PrewarmingConfig(1, exec, alternativeLimit)))) + .props(factory, poolConfig(1, 1), feed.ref, List(PrewarmingConfig(1, exec, alternativeLimit)))) containers(0).expectMsg(Start(exec, alternativeLimit)) // container0 was prewarmed containers(0).send(pool, NeedWork(preWarmedData(exec.kind, alternativeLimit))) pool ! runMessage @@ -296,7 +300,7 @@ class ContainerPoolTests val (containers, factory) = testContainers(2) val feed = TestProbe() - val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(2, 2), feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, poolConfig(2, 2), feed.ref)) // container0 is created and used pool ! runMessage diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala index 9c5ef66..25b0303 100644 --- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala @@ -167,7 +167,7 @@ class ContainerProxyTests Future.successful(()) } - val poolConfig = ContainerPoolConfig(1, 2) + val poolConfig = ContainerPoolConfig(1, 2, false) behavior of "ContainerProxy" @@ -732,6 +732,7 @@ class ContainerProxyTests protected val addr = ContainerAddress("0.0.0.0") protected implicit val logging: Logging = log protected implicit val ec: ExecutionContext = system.dispatcher + override implicit protected val as: ActorSystem = system var suspendCount = 0 var resumeCount = 0 var destroyCount = 0