This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new be43ad9 ContainerFactory SPI (#2659) be43ad9 is described below commit be43ad90e0dce26c7fa76573605324be22817b30 Author: tysonnorris <tysonnor...@gmail.com> AuthorDate: Mon Sep 25 09:50:04 2017 -0700 ContainerFactory SPI (#2659) --- common/scala/src/main/resources/reference.conf | 1 + .../scala/whisk/core/containerpool/Container.scala | 199 +++++++++++++++++++++ .../core/containerpool/ContainerFactory.scala | 59 ++++++ .../whisk/core/containerpool}/HttpUtils.scala | 4 +- .../scala/whisk/core/containerpool/Container.scala | 83 --------- .../whisk/core/containerpool/ContainerProxy.scala | 2 +- .../core/containerpool/docker/DockerClient.scala | 17 +- .../docker/DockerClientWithFileAccess.scala | 10 +- .../containerpool/docker/DockerContainer.scala | 163 +++-------------- .../docker/DockerContainerFactory.scala | 101 +++++++++++ .../core/containerpool/docker/RuncClient.scala | 1 + .../scala/whisk/core/invoker/InvokerReactive.scala | 72 ++------ .../scala/actionContainers/ActionContainer.scala | 2 +- .../docker/test/ContainerConnectionTests.scala | 4 +- .../docker/test/DockerClientTests.scala | 8 +- .../test/DockerClientWithFileAccessTests.scala | 14 +- .../docker/test/DockerContainerTests.scala | 52 ++++-- .../docker/test/RuncClientTests.scala | 2 +- .../containerpool/test/ContainerProxyTests.scala | 22 ++- 19 files changed, 481 insertions(+), 335 deletions(-) diff --git a/common/scala/src/main/resources/reference.conf b/common/scala/src/main/resources/reference.conf index 52f30c3..50a36a5 100644 --- a/common/scala/src/main/resources/reference.conf +++ b/common/scala/src/main/resources/reference.conf @@ -1,4 +1,5 @@ whisk.spi{ ArtifactStoreProvider = whisk.core.database.CouchDbStoreProvider MessagingProvider = whisk.connector.kafka.KafkaMessagingProvider + ContainerFactoryProvider = whisk.core.containerpool.docker.DockerContainerFactoryProvider } \ No newline at end of file diff --git a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala new file mode 100644 index 0000000..0cecbe6 --- /dev/null +++ b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.containerpool + +import java.time.Instant +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration.Duration +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration._ +import scala.util.Failure +import scala.util.Success +import spray.json.JsObject +import spray.json.DefaultJsonProtocol._ +import whisk.common.Logging +import whisk.common.LoggingMarkers +import whisk.common.TransactionId +import whisk.core.entity.ActivationResponse +import whisk.core.entity.ActivationResponse.ContainerConnectionError +import whisk.core.entity.ActivationResponse.ContainerResponse +import whisk.core.entity.ByteSize +import whisk.core.entity.size._ +import whisk.http.Messages + +/** + * An OpenWhisk biased container abstraction. This is **not only** an abstraction + * for different container providers, but the implementation also needs to include + * OpenWhisk specific behavior, especially for initialize and run. + */ +case class ContainerId(val asString: String) { + require(asString.nonEmpty, "ContainerId must not be empty") +} +case class ContainerAddress(val host: String, val port: Int = 8080) { + require(host.nonEmpty, "ContainerIp must not be empty") +} + +trait Container { + + protected val id: ContainerId + protected val addr: ContainerAddress + protected implicit val logging: Logging + protected implicit val ec: ExecutionContext + + /** HTTP connection to the container, will be lazily established by callContainer */ + private var httpConnection: Option[HttpUtils] = None + + /** Stops the container from consuming CPU cycles. */ + def suspend()(implicit transid: TransactionId): Future[Unit] + + /** Dual of halt. */ + def resume()(implicit transid: TransactionId): Future[Unit] + + /** Obtains logs up to a given threshold from the container. Optionally waits for a sentinel to appear. */ + def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Future[Vector[String]] + + /** Completely destroys this instance of the container. */ + def destroy()(implicit transid: TransactionId): Future[Unit] = { + Future.successful(httpConnection.foreach(_.close())) + } + + /** Initializes code in the container. */ + def initialize(initializer: JsObject, timeout: FiniteDuration)(implicit transid: TransactionId): Future[Interval] = { + val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION_INIT, s"sending initialization to $id $addr") + + val body = JsObject("value" -> initializer) + callContainer("/init", body, timeout, retry = true) + .andThen { // never fails + case Success(r: RunResult) => + transid.finished( + this, + start.copy(start = r.interval.start), + s"initialization result: ${r.toBriefString}", + endTime = r.interval.end) + case Failure(t) => + transid.failed(this, start, s"initializiation failed with $t") + } + .flatMap { result => + if (result.ok) { + Future.successful(result.interval) + } else if (result.interval.duration >= timeout) { + Future.failed( + InitializationError( + result.interval, + ActivationResponse.applicationError(Messages.timedoutActivation(timeout, true)))) + } else { + Future.failed( + InitializationError( + result.interval, + ActivationResponse.processInitResponseContent(result.response, logging))) + } + } + } + + /** Runs code in the container. */ + def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration)( + implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = { + val actionName = environment.fields.get("action_name").map(_.convertTo[String]).getOrElse("") + val start = + transid.started(this, LoggingMarkers.INVOKER_ACTIVATION_RUN, s"sending arguments to $actionName at $id $addr") + + val parameterWrapper = JsObject("value" -> parameters) + val body = JsObject(parameterWrapper.fields ++ environment.fields) + callContainer("/run", body, timeout, retry = false) + .andThen { // never fails + case Success(r: RunResult) => + transid.finished( + this, + start.copy(start = r.interval.start), + s"running result: ${r.toBriefString}", + endTime = r.interval.end) + case Failure(t) => + transid.failed(this, start, s"run failed with $t") + } + .map { result => + val response = if (result.interval.duration >= timeout) { + ActivationResponse.applicationError(Messages.timedoutActivation(timeout, false)) + } else { + ActivationResponse.processRunResponseContent(result.response, logging) + } + + (result.interval, response) + } + } + + /** + * Makes an HTTP request to the container. + * + * Note that `http.post` will not throw an exception, hence the generated Future cannot fail. + * + * @param path relative path to use in the http request + * @param body body to send + * @param timeout timeout of the request + * @param retry whether or not to retry the request + */ + protected def callContainer(path: String, + body: JsObject, + timeout: FiniteDuration, + retry: Boolean = false): Future[RunResult] = { + val started = Instant.now() + val http = httpConnection.getOrElse { + val conn = new HttpUtils(s"${addr.host}:${addr.port}", timeout, 1.MB) + httpConnection = Some(conn) + conn + } + Future { + http.post(path, body, retry) + }.map { response => + val finished = Instant.now() + RunResult(Interval(started, finished), response) + } + } +} + +/** Indicates a general error with the container */ +sealed abstract class ContainerError(msg: String) extends Exception(msg) + +/** Indicates an error while starting a container */ +sealed abstract class ContainerStartupError(msg: String) extends ContainerError(msg) + +/** Indicates any error while starting a container either of a managed runtime or a non-application-specific blackbox container */ +case class WhiskContainerStartupError(msg: String) extends ContainerStartupError(msg) + +/** Indicates an application-specific error while starting a blackbox container */ +case class BlackboxStartupError(msg: String) extends ContainerStartupError(msg) + +/** Indicates an error while initializing a container */ +case class InitializationError(interval: Interval, response: ActivationResponse) extends Exception(response.toString) + +case class Interval(start: Instant, end: Instant) { + def duration = Duration.create(end.toEpochMilli() - start.toEpochMilli(), MILLISECONDS) +} + +case class RunResult(interval: Interval, response: Either[ContainerConnectionError, ContainerResponse]) { + def ok = response.right.exists(_.ok) + def toBriefString = response.fold(_.toString, _.toString) +} +object Interval { + + /** An interval starting now with zero duration. */ + def zero = { + val now = Instant.now + Interval(now, now) + } +} diff --git a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala new file mode 100644 index 0000000..fb04f0f --- /dev/null +++ b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.containerpool + +import akka.actor.ActorSystem +import scala.concurrent.Future +import whisk.common.Logging +import whisk.common.TransactionId +import whisk.core.WhiskConfig +import whisk.core.entity.ByteSize +import whisk.core.entity.ExecManifest +import whisk.core.entity.InstanceId +import whisk.spi.Spi + +/** + * An abstraction for Container creation + */ +trait ContainerFactory { + + /** create a new Container */ + def createContainer(tid: TransactionId, + name: String, + actionImage: ExecManifest.ImageName, + userProvidedImage: Boolean, + memory: ByteSize)(implicit config: WhiskConfig, logging: Logging): Future[Container] + + /** perform any initialization */ + def init(): Unit + + /** cleanup any remaining Containers; should block until complete; should ONLY be run at startup/shutdown */ + def cleanup(): Unit +} + +/** + * An SPI for ContainerFactory creation + * All impls should use the parameters specified as additional args to "docker run" commands + */ +trait ContainerFactoryProvider extends Spi { + def getContainerFactory(actorSystem: ActorSystem, + logging: Logging, + config: WhiskConfig, + instance: InstanceId, + parameters: Map[String, Set[String]]): ContainerFactory +} diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/HttpUtils.scala b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala similarity index 98% rename from core/invoker/src/main/scala/whisk/core/containerpool/docker/HttpUtils.scala rename to common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala index a6b53a7..2815068 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/HttpUtils.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/HttpUtils.scala @@ -15,12 +15,10 @@ * limitations under the License. */ -package whisk.core.containerpool.docker +package whisk.core.containerpool import java.nio.charset.StandardCharsets -import scala.Left -import scala.Right import scala.concurrent.duration.DurationInt import scala.concurrent.duration.FiniteDuration import scala.util.Failure diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/Container.scala b/core/invoker/src/main/scala/whisk/core/containerpool/Container.scala deleted file mode 100644 index bffc75e..0000000 --- a/core/invoker/src/main/scala/whisk/core/containerpool/Container.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package whisk.core.containerpool - -import scala.concurrent.Future -import scala.concurrent.duration.FiniteDuration - -import spray.json.JsObject -import whisk.common.TransactionId -import whisk.core.entity.ActivationResponse -import whisk.core.entity.ByteSize -import java.time.Instant -import scala.concurrent.duration._ - -/** - * An OpenWhisk biased container abstraction. This is **not only** an abstraction - * for different container providers, but the implementation also needs to include - * OpenWhisk specific behavior, especially for initialize and run. - */ -trait Container { - - /** Stops the container from consuming CPU cycles. */ - def suspend()(implicit transid: TransactionId): Future[Unit] - - /** Dual of halt. */ - def resume()(implicit transid: TransactionId): Future[Unit] - - /** Completely destroys this instance of the container. */ - def destroy()(implicit transid: TransactionId): Future[Unit] - - /** Initializes code in the container. */ - def initialize(initializer: JsObject, timeout: FiniteDuration)(implicit transid: TransactionId): Future[Interval] - - /** Runs code in the container. */ - def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration)( - implicit transid: TransactionId): Future[(Interval, ActivationResponse)] - - /** Obtains logs up to a given threshold from the container. Optionally waits for a sentinel to appear. */ - def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Future[Vector[String]] -} - -/** Indicates a general error with the container */ -sealed abstract class ContainerError(msg: String) extends Exception(msg) - -/** Indicates an error while starting a container */ -sealed abstract class ContainerStartupError(msg: String) extends ContainerError(msg) - -/** Indicates any error while starting a container either of a managed runtime or a non-application-specific blackbox container */ -case class WhiskContainerStartupError(msg: String) extends ContainerStartupError(msg) - -/** Indicates an application-specific error while starting a blackbox container */ -case class BlackboxStartupError(msg: String) extends ContainerStartupError(msg) - -/** Indicates an error while initializing a container */ -case class InitializationError(interval: Interval, response: ActivationResponse) extends Exception(response.toString) - -case class Interval(start: Instant, end: Instant) { - def duration = Duration.create(end.toEpochMilli() - start.toEpochMilli(), MILLISECONDS) -} - -object Interval { - - /** An interval starting now with zero duration. */ - def zero = { - val now = Instant.now - Interval(now, now) - } -} diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala index da66e5a..a9d3f82 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala @@ -100,7 +100,7 @@ class ContainerProxy(factory: (TransactionId, String, ImageName, Boolean, ByteSi extends FSM[ContainerState, ContainerData] with Stash { implicit val ec = context.system.dispatcher - val logging = new AkkaLogging(context.system.log) + implicit val logging = new AkkaLogging(context.system.log) startWith(Uninitialized, NoData()) diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala index 915e7f6..28b5b7a 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClient.scala @@ -20,18 +20,18 @@ package whisk.core.containerpool.docker import java.io.FileNotFoundException import java.nio.file.Files import java.nio.file.Paths - import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.util.Failure import scala.util.Success import scala.util.Try - import akka.event.Logging.ErrorLevel import whisk.common.Logging import whisk.common.LoggingMarkers import whisk.common.TransactionId import scala.collection.concurrent.TrieMap +import whisk.core.containerpool.ContainerId +import whisk.core.containerpool.ContainerAddress /** * Serves as interface to the docker CLI tool. @@ -64,11 +64,11 @@ class DockerClient(dockerHost: Option[String] = None)(executionContext: Executio def run(image: String, args: Seq[String] = Seq.empty[String])(implicit transid: TransactionId): Future[ContainerId] = runCmd((Seq("run", "-d") ++ args ++ Seq(image)): _*).map(ContainerId.apply) - def inspectIPAddress(id: ContainerId, network: String)(implicit transid: TransactionId): Future[ContainerIp] = + def inspectIPAddress(id: ContainerId, network: String)(implicit transid: TransactionId): Future[ContainerAddress] = runCmd("inspect", "--format", s"{{.NetworkSettings.Networks.${network}.IPAddress}}", id.asString).flatMap { _ match { case "<no value>" => Future.failed(new NoSuchElementException) - case stdout => Future.successful(ContainerIp(stdout)) + case stdout => Future.successful(ContainerAddress(stdout)) } } @@ -110,13 +110,6 @@ class DockerClient(dockerHost: Option[String] = None)(executionContext: Executio } } -case class ContainerId(val asString: String) { - require(asString.nonEmpty, "ContainerId must not be empty") -} -case class ContainerIp(val asString: String) { - require(asString.nonEmpty, "ContainerIp must not be empty") -} - trait DockerApi { /** @@ -139,7 +132,7 @@ trait DockerApi { * @param network name of the network to get the IP address from * @return ip of the container */ - def inspectIPAddress(id: ContainerId, network: String)(implicit transid: TransactionId): Future[ContainerIp] + def inspectIPAddress(id: ContainerId, network: String)(implicit transid: TransactionId): Future[ContainerAddress] /** * Pauses the container with the given id. diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala index 0407346..444c365 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerClientWithFileAccess.scala @@ -22,16 +22,16 @@ import java.io.FileInputStream import java.io.IOException import java.nio.ByteBuffer import java.nio.file.Paths - import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.blocking import scala.io.Source - import spray.json.DefaultJsonProtocol._ import spray.json._ import whisk.common.Logging import whisk.common.TransactionId +import whisk.core.containerpool.ContainerId +import whisk.core.containerpool.ContainerAddress class DockerClientWithFileAccess( dockerHost: Option[String] = None, @@ -113,18 +113,18 @@ class DockerClientWithFileAccess( * @param network name of the network to get the IP address from * @return the ip address of the container */ - protected def ipAddressFromFile(id: ContainerId, network: String): Future[ContainerIp] = { + protected def ipAddressFromFile(id: ContainerId, network: String): Future[ContainerAddress] = { configFileContents(containerConfigFile(id)).map { json => val networks = json.fields("NetworkSettings").asJsObject.fields("Networks").asJsObject val specifiedNetwork = networks.fields(network).asJsObject val ipAddr = specifiedNetwork.fields("IPAddress") - ContainerIp(ipAddr.convertTo[String]) + ContainerAddress(ipAddr.convertTo[String]) } } // See extended trait for description override def inspectIPAddress(id: ContainerId, network: String)( - implicit transid: TransactionId): Future[ContainerIp] = { + implicit transid: TransactionId): Future[ContainerAddress] = { ipAddressFromFile(id, network).recoverWith { case _ => super.inspectIPAddress(id, network) } diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala index 7e07247..a63c304 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainer.scala @@ -18,7 +18,6 @@ package whisk.core.containerpool.docker import java.nio.charset.StandardCharsets -import java.time.Instant import akka.actor.ActorSystem @@ -26,23 +25,15 @@ import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Failure -import scala.util.Success -import spray.json._ -import spray.json.DefaultJsonProtocol._ import whisk.common.Logging -import whisk.common.LoggingMarkers import whisk.common.TransactionId -import whisk.core.containerpool.Interval import whisk.core.containerpool.BlackboxStartupError import whisk.core.containerpool.Container -import whisk.core.containerpool.InitializationError +import whisk.core.containerpool.ContainerId +import whisk.core.containerpool.ContainerAddress import whisk.core.containerpool.WhiskContainerStartupError -import whisk.core.entity.ActivationResponse import whisk.core.entity.ByteSize import whisk.core.entity.size._ -import whisk.http.Messages -import whisk.core.entity.ActivationResponse.ContainerConnectionError -import whisk.core.entity.ActivationResponse.ContainerResponse object DockerContainer { @@ -69,28 +60,23 @@ object DockerContainer { environment: Map[String, String] = Map(), network: String = "bridge", dnsServers: Seq[String] = Seq(), - name: Option[String] = None)(implicit docker: DockerApiWithFileAccess, - runc: RuncApi, - as: ActorSystem, - ec: ExecutionContext, - log: Logging): Future[DockerContainer] = { + name: Option[String] = None, + dockerRunParameters: Map[String, Set[String]])(implicit docker: DockerApiWithFileAccess, + runc: RuncApi, + as: ActorSystem, + ec: ExecutionContext, + log: Logging): Future[DockerContainer] = { implicit val tid = transid - val environmentArgs = environment.map { + val environmentArgs = environment.flatMap { case (key, value) => Seq("-e", s"$key=$value") - }.flatten + } - val dnsArgs = dnsServers.map(Seq("--dns", _)).flatten + val params = dockerRunParameters.flatMap { + case (key, valueList) => valueList.toList.flatMap(Seq(key, _)) + } val args = Seq( - "--cap-drop", - "NET_RAW", - "--cap-drop", - "NET_ADMIN", - "--ulimit", - "nofile=1024:1024", - "--pids-limit", - "1024", "--cpu-shares", cpuShares.toString, "--memory", @@ -99,10 +85,9 @@ object DockerContainer { s"${memory.toMB}m", "--network", network) ++ - dnsArgs ++ environmentArgs ++ - name.map(n => Seq("--name", n)).getOrElse(Seq.empty) - + name.map(n => Seq("--name", n)).getOrElse(Seq.empty) ++ + params val pulled = if (userProvidedImage) { docker.pull(image).recoverWith { case _ => Future.failed(BlackboxStartupError(s"Failed to pull container image '${image}'.")) @@ -133,13 +118,14 @@ object DockerContainer { * * @constructor * @param id the id of the container - * @param ip the ip of the container + * @param addr the ip of the container */ -class DockerContainer(id: ContainerId, ip: ContainerIp)(implicit docker: DockerApiWithFileAccess, - runc: RuncApi, - as: ActorSystem, - ec: ExecutionContext, - logger: Logging) +class DockerContainer(protected val id: ContainerId, protected val addr: ContainerAddress)( + implicit docker: DockerApiWithFileAccess, + runc: RuncApi, + as: ActorSystem, + protected val ec: ExecutionContext, + protected val logging: Logging) extends Container with DockerActionLogDriver { @@ -149,78 +135,13 @@ class DockerContainer(id: ContainerId, ip: ContainerIp)(implicit docker: DockerA protected val logsRetryCount = 15 protected val logsRetryWait = 100.millis - /** HTTP connection to the container, will be lazily established by callContainer */ - private var httpConnection: Option[HttpUtils] = None - def suspend()(implicit transid: TransactionId): Future[Unit] = runc.pause(id) def resume()(implicit transid: TransactionId): Future[Unit] = runc.resume(id) - def destroy()(implicit transid: TransactionId): Future[Unit] = { - httpConnection.foreach(_.close()) + override def destroy()(implicit transid: TransactionId): Future[Unit] = { + super.destroy() docker.rm(id) } - def initialize(initializer: JsObject, timeout: FiniteDuration)(implicit transid: TransactionId): Future[Interval] = { - val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION_INIT, s"sending initialization to $id $ip") - - val body = JsObject("value" -> initializer) - callContainer("/init", body, timeout, retry = true) - .andThen { // never fails - case Success(r: RunResult) => - transid.finished( - this, - start.copy(start = r.interval.start), - s"initialization result: ${r.toBriefString}", - endTime = r.interval.end) - case Failure(t) => - transid.failed(this, start, s"initializiation failed with $t") - } - .flatMap { result => - if (result.ok) { - Future.successful(result.interval) - } else if (result.interval.duration >= timeout) { - Future.failed( - InitializationError( - result.interval, - ActivationResponse.applicationError(Messages.timedoutActivation(timeout, true)))) - } else { - Future.failed( - InitializationError( - result.interval, - ActivationResponse.processInitResponseContent(result.response, logger))) - } - } - } - - def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration)( - implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = { - val actionName = environment.fields.get("action_name").map(_.convertTo[String]).getOrElse("") - val start = - transid.started(this, LoggingMarkers.INVOKER_ACTIVATION_RUN, s"sending arguments to $actionName at $id $ip") - - val parameterWrapper = JsObject("value" -> parameters) - val body = JsObject(parameterWrapper.fields ++ environment.fields) - callContainer("/run", body, timeout, retry = false) - .andThen { // never fails - case Success(r: RunResult) => - transid.finished( - this, - start.copy(start = r.interval.start), - s"running result: ${r.toBriefString}", - endTime = r.interval.end) - case Failure(t) => - transid.failed(this, start, s"run failed with $t") - } - .map { result => - val response = if (result.interval.duration >= timeout) { - ActivationResponse.applicationError(Messages.timedoutActivation(timeout, false)) - } else { - ActivationResponse.processRunResponseContent(result.response, logger) - } - - (result.interval, response) - } - } - /** * Obtains the container's stdout and stderr output and converts it to our own JSON format. * At the moment, this is done by reading the internal Docker log file for the container. @@ -254,7 +175,7 @@ class DockerContainer(id: ContainerId, ip: ContainerIp)(implicit docker: DockerA val (isComplete, isTruncated, formattedLogs) = processJsonDriverLogContents(rawLog, waitForSentinel, limit) if (retries > 0 && !isComplete && !isTruncated) { - logger.info(this, s"log cursor advanced but missing sentinel, trying $retries more times") + logging.info(this, s"log cursor advanced but missing sentinel, trying $retries more times") akka.pattern.after(logsRetryWait, as.scheduler)(readLogs(retries - 1)) } else { logFileOffset += rawLogBytes.position - rawLogBytes.arrayOffset @@ -263,43 +184,11 @@ class DockerContainer(id: ContainerId, ip: ContainerIp)(implicit docker: DockerA } .andThen { case Failure(e) => - logger.error(this, s"Failed to obtain logs of ${id.asString}: ${e.getClass} - ${e.getMessage}") + logging.error(this, s"Failed to obtain logs of ${id.asString}: ${e.getClass} - ${e.getMessage}") } } readLogs(logsRetryCount) } - /** - * Makes an HTTP request to the container. - * - * Note that `http.post` will not throw an exception, hence the generated Future cannot fail. - * - * @param path relative path to use in the http request - * @param body body to send - * @param timeout timeout of the request - * @param retry whether or not to retry the request - */ - protected def callContainer(path: String, - body: JsObject, - timeout: FiniteDuration, - retry: Boolean = false): Future[RunResult] = { - val started = Instant.now() - val http = httpConnection.getOrElse { - val conn = new HttpUtils(s"${ip.asString}:8080", timeout, 1.MB) - httpConnection = Some(conn) - conn - } - Future { - http.post(path, body, retry) - }.map { response => - val finished = Instant.now() - RunResult(Interval(started, finished), response) - } - } -} - -case class RunResult(interval: Interval, response: Either[ContainerConnectionError, ContainerResponse]) { - def ok = response.right.exists(_.ok) - def toBriefString = response.fold(_.toString, _.toString) } diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala new file mode 100644 index 0000000..61e9b7b --- /dev/null +++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.containerpool.docker + +import akka.actor.ActorSystem +import scala.concurrent.Await +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import whisk.common.Logging +import whisk.common.TransactionId +import whisk.core.WhiskConfig +import whisk.core.containerpool.Container +import whisk.core.containerpool.ContainerFactory +import whisk.core.containerpool.ContainerFactoryProvider +import whisk.core.entity.ByteSize +import whisk.core.entity.ExecManifest +import whisk.core.entity.InstanceId +import scala.concurrent.duration._ + +class DockerContainerFactory(config: WhiskConfig, instance: InstanceId, parameters: Map[String, Set[String]])( + implicit actorSystem: ActorSystem, + ec: ExecutionContext, + logging: Logging) + extends ContainerFactory { + + /** Initialize container clients */ + implicit val docker = new DockerClientWithFileAccess()(ec) + implicit val runc = new RuncClient(ec) + + /** Create a container using docker cli */ + override def createContainer(tid: TransactionId, + name: String, + actionImage: ExecManifest.ImageName, + userProvidedImage: Boolean, + memory: ByteSize)(implicit config: WhiskConfig, logging: Logging): Future[Container] = { + val image = if (userProvidedImage) { + actionImage.publicImageName + } else { + actionImage.localImageName(config.dockerRegistry, config.dockerImagePrefix, Some(config.dockerImageTag)) + } + + DockerContainer.create( + tid, + image = image, + userProvidedImage = userProvidedImage, + memory = memory, + cpuShares = config.invokerCoreShare.toInt, + environment = Map("__OW_API_HOST" -> config.wskApiHost), + network = config.invokerContainerNetwork, + dnsServers = config.invokerContainerDns, + name = Some(name), + parameters) + } + + /** Perform cleanup on init */ + override def init(): Unit = cleanup() + + /** Cleans up all running wsk_ containers */ + override def cleanup(): Unit = { + val cleaning = docker.ps(Seq("name" -> s"wsk${instance.toInt}_"))(TransactionId.invokerNanny).flatMap { + containers => + val removals = containers.map { id => + runc + .resume(id)(TransactionId.invokerNanny) + .recoverWith { + // Ignore resume failures and try to remove anyway + case _ => Future.successful(()) + } + .flatMap { _ => + docker.rm(id)(TransactionId.invokerNanny) + } + } + Future.sequence(removals) + } + Await.ready(cleaning, 30.seconds) + } +} + +object DockerContainerFactoryProvider extends ContainerFactoryProvider { + override def getContainerFactory(actorSystem: ActorSystem, + logging: Logging, + config: WhiskConfig, + instanceId: InstanceId, + parameters: Map[String, Set[String]]): ContainerFactory = + new DockerContainerFactory(config, instanceId, parameters)(actorSystem, actorSystem.dispatcher, logging) +} diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/RuncClient.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/RuncClient.scala index 1a488b8..c398765 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/RuncClient.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/RuncClient.scala @@ -25,6 +25,7 @@ import scala.util.Success import whisk.common.LoggingMarkers import whisk.common.Logging import akka.event.Logging.ErrorLevel +import whisk.core.containerpool.ContainerId /** * Serves as interface to the docker CLI tool. diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala index 8a3d750..5148824 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala @@ -19,15 +19,11 @@ package whisk.core.invoker import java.nio.charset.StandardCharsets import java.time.Instant - -import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success - import org.apache.kafka.common.errors.RecordTooLargeException - import akka.actor.ActorRefFactory import akka.actor.ActorSystem import akka.actor.Props @@ -42,16 +38,13 @@ import whisk.core.connector.CompletionMessage import whisk.core.connector.MessageFeed import whisk.core.connector.MessageProducer import whisk.core.connector.MessagingProvider +import whisk.core.containerpool.ContainerFactoryProvider import whisk.core.containerpool.ContainerPool import whisk.core.containerpool.ContainerProxy import whisk.core.containerpool.PrewarmingConfig import whisk.core.containerpool.Run -import whisk.core.containerpool.docker.DockerClientWithFileAccess -import whisk.core.containerpool.docker.DockerContainer -import whisk.core.containerpool.docker.RuncClient import whisk.core.database.NoDocumentException import whisk.core.entity._ -import whisk.core.entity.ExecManifest.ImageName import whisk.core.entity.size._ import whisk.http.Messages import whisk.spi.SpiLoader @@ -61,6 +54,7 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa logging: Logging) { implicit val ec = actorSystem.dispatcher + implicit val cfg = config /** Initialize needed databases */ private val entityStore = WhiskEntityStore.datastore(config) @@ -81,53 +75,22 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa new MessageFeed("activation", logging, consumer, maximumContainers, 500.milliseconds, processActivationMessage) }) - /** Initialize container clients */ - implicit val docker = new DockerClientWithFileAccess()(ec) - implicit val runc = new RuncClient(ec) - - /** Cleans up all running wsk_ containers */ - def cleanup() = { - val cleaning = docker.ps(Seq("name" -> s"wsk${instance.toInt}_"))(TransactionId.invokerNanny).flatMap { - containers => - val removals = containers.map { id => - runc - .resume(id)(TransactionId.invokerNanny) - .recoverWith { - // Ignore resume failures and try to remove anyway - case _ => Future.successful(()) - } - .flatMap { _ => - docker.rm(id)(TransactionId.invokerNanny) - } - } - Future.sequence(removals) - } - - Await.ready(cleaning, 30.seconds) - } - cleanup() - sys.addShutdownHook(cleanup()) - /** Factory used by the ContainerProxy to physically create a new container. */ val containerFactory = - (tid: TransactionId, name: String, actionImage: ImageName, userProvidedImage: Boolean, memory: ByteSize) => { - val image = if (userProvidedImage) { - actionImage.publicImageName - } else { - actionImage.localImageName(config.dockerRegistry, config.dockerImagePrefix, Some(config.dockerImageTag)) - } - - DockerContainer.create( - tid, - image = image, - userProvidedImage = userProvidedImage, - memory = memory, - cpuShares = config.invokerCoreShare.toInt, - environment = Map("__OW_API_HOST" -> config.wskApiHost), - network = config.invokerContainerNetwork, - dnsServers = config.invokerContainerDns, - name = Some(name)) - } + SpiLoader + .get[ContainerFactoryProvider] + .getContainerFactory( + actorSystem, + logging, + config, + instance, + Map( + "--cap-drop" -> Set("NET_RAW", "NET_ADMIN"), + "--ulimit" -> Set("nofile=1024:1024"), + "--pids-limit" -> Set("1024"), + "--dns" -> config.invokerContainerDns.toSet)) + containerFactory.init() + sys.addShutdownHook(containerFactory.cleanup()) /** Sends an active-ack. */ val ack = (tid: TransactionId, @@ -164,7 +127,8 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa } /** Creates a ContainerProxy Actor when being called. */ - val childFactory = (f: ActorRefFactory) => f.actorOf(ContainerProxy.props(containerFactory, ack, store, instance)) + val childFactory = (f: ActorRefFactory) => + f.actorOf(ContainerProxy.props(containerFactory.createContainer _, ack, store, instance)) val prewarmKind = "nodejs:6" val prewarmExec = ExecManifest.runtimesManifest diff --git a/tests/src/test/scala/actionContainers/ActionContainer.scala b/tests/src/test/scala/actionContainers/ActionContainer.scala index f60cbfe..94f6763 100644 --- a/tests/src/test/scala/actionContainers/ActionContainer.scala +++ b/tests/src/test/scala/actionContainers/ActionContainer.scala @@ -164,7 +164,7 @@ object ActionContainer { } private def syncPost(host: String, port: Int, endPoint: String, content: JsValue): (Int, Option[JsObject]) = { - whisk.core.containerpool.docker.HttpUtils.post(host, port, endPoint, content) + whisk.core.containerpool.HttpUtils.post(host, port, endPoint, content) } private class ActionContainerImpl() extends ActionContainer { diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala index 809165a..0e0867a 100644 --- a/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/ContainerConnectionTests.scala @@ -36,9 +36,9 @@ import org.scalatest.FlatSpec import org.scalatest.Matchers import spray.json.JsObject -import whisk.core.containerpool.docker.HttpUtils -import whisk.core.entity.ActivationResponse._ +import whisk.core.containerpool.HttpUtils import whisk.core.entity.size._ +import whisk.core.entity.ActivationResponse._ /** * Unit tests for HttpUtils which communicate with containers. diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala index 9146034..1235b92 100644 --- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientTests.scala @@ -23,21 +23,19 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import scala.concurrent.duration.DurationInt import scala.concurrent.duration.FiniteDuration - import org.junit.runner.RunWith import org.scalatest.BeforeAndAfterEach import org.scalatest.FlatSpec import org.scalatest.junit.JUnitRunner import org.scalatest.Matchers - import common.StreamLogging import whisk.common.LogMarker import whisk.common.LoggingMarkers.INVOKER_DOCKER_CMD import whisk.common.TransactionId -import whisk.core.containerpool.docker.ContainerId -import whisk.core.containerpool.docker.ContainerIp import whisk.core.containerpool.docker.DockerClient import scala.concurrent.Promise +import whisk.core.containerpool.ContainerId +import whisk.core.containerpool.ContainerAddress import whisk.utils.retry @RunWith(classOf[JUnitRunner]) @@ -171,7 +169,7 @@ class DockerClientTests extends FlatSpec with Matchers with StreamLogging with B val network = "userland" val inspectArgs = Seq("--format", s"{{.NetworkSettings.Networks.${network}.IPAddress}}", id.asString) - runAndVerify(dc.inspectIPAddress(id, network), "inspect", inspectArgs) shouldBe ContainerIp(stdout) + runAndVerify(dc.inspectIPAddress(id, network), "inspect", inspectArgs) shouldBe ContainerAddress(stdout) val image = "image" val runArgs = Seq("--memory", "256m", "--cpushares", "1024") diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala index 8bf736d..7c6acbf 100644 --- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerClientWithFileAccessTests.scala @@ -40,8 +40,8 @@ import org.scalatest.fixture.{FlatSpec => FixtureFlatSpec} import common.StreamLogging import spray.json._ import whisk.common.TransactionId -import whisk.core.containerpool.docker.ContainerId -import whisk.core.containerpool.docker.ContainerIp +import whisk.core.containerpool.ContainerId +import whisk.core.containerpool.ContainerAddress import whisk.core.containerpool.docker.DockerClientWithFileAccess @RunWith(classOf[JUnitRunner]) @@ -57,25 +57,25 @@ class DockerClientWithFileAccessTestsIp extends FlatSpec with Matchers with Stre val dockerCommand = "docker" val networkInConfigFile = "networkConfig" val networkInDockerInspect = "networkInspect" - val ipInConfigFile = ContainerIp("10.0.0.1") - val ipInDockerInspect = ContainerIp("10.0.0.2") + val ipInConfigFile = ContainerAddress("10.0.0.1") + val ipInDockerInspect = ContainerAddress("10.0.0.2") val dockerConfig = JsObject( "NetworkSettings" -> JsObject( "Networks" -> JsObject(networkInConfigFile -> - JsObject("IPAddress" -> JsString(ipInConfigFile.asString))))) + JsObject("IPAddress" -> JsString(ipInConfigFile.host))))) /** Returns a DockerClient with mocked results */ - def dockerClient(execResult: Future[String] = Future.successful(ipInDockerInspect.asString), + def dockerClient(execResult: Future[String] = Future.successful(ipInDockerInspect.host), readResult: Future[JsObject] = Future.successful(dockerConfig)) = new DockerClientWithFileAccess()(global) { override val dockerCmd = Seq(dockerCommand) override def executeProcess(args: String*)(implicit ec: ExecutionContext) = execResult override def configFileContents(configFile: File) = readResult // Make protected ipAddressFromFile available for testing - requires reflectiveCalls - def publicIpAddressFromFile(id: ContainerId, network: String): Future[ContainerIp] = + def publicIpAddressFromFile(id: ContainerId, network: String): Future[ContainerAddress] = ipAddressFromFile(id, network) } diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala index 1451c5d..47e67c2 100644 --- a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerTests.scala @@ -21,7 +21,6 @@ import java.io.IOException import java.nio.ByteBuffer import java.nio.charset.StandardCharsets import java.time.Instant - import scala.collection.mutable import scala.concurrent.Await import scala.concurrent.duration._ @@ -71,12 +70,12 @@ class DockerContainerTests * Constructs a testcontainer with overridden IO methods. Results of the override can be provided * as parameters. */ - def dockerContainer(id: ContainerId = containerId, ip: ContainerIp = ContainerIp("ip"))( + def dockerContainer(id: ContainerId = containerId, addr: ContainerAddress = ContainerAddress("ip"))( ccRes: Future[RunResult] = Future.successful(RunResult(intervalOf(1.millisecond), Right(ContainerResponse(true, "", None)))), retryCount: Int = 0)(implicit docker: DockerApiWithFileAccess, runc: RuncApi): DockerContainer = { - new DockerContainer(id, ip) { + new DockerContainer(id, addr) { override protected def callContainer(path: String, body: JsObject, timeout: FiniteDuration, @@ -94,6 +93,10 @@ class DockerContainerTests behavior of "DockerContainer" implicit val transid = TransactionId.testing + val parameters = Map( + "--cap-drop" -> Set("NET_RAW", "NET_ADMIN"), + "--ulimit" -> Set("nofile=1024:1024"), + "--pids-limit" -> Set("1024")) /* * CONTAINER CREATION @@ -108,7 +111,6 @@ class DockerContainerTests val environment = Map("test" -> "hi") val network = "testwork" val name = "myContainer" - val container = DockerContainer.create( transid = transid, image = image, @@ -116,7 +118,8 @@ class DockerContainerTests cpuShares = cpuShares, environment = environment, network = network, - name = Some(name)) + name = Some(name), + dockerRunParameters = parameters) await(container) @@ -148,7 +151,11 @@ class DockerContainerTests implicit val docker = new TestDockerClient implicit val runc = stub[RuncApi] - val container = DockerContainer.create(transid = transid, image = "image", userProvidedImage = true) + val container = DockerContainer.create( + transid = transid, + image = "image", + userProvidedImage = true, + dockerRunParameters = parameters) await(container) docker.pulls should have size 1 @@ -160,14 +167,14 @@ class DockerContainerTests it should "remove the container if inspect fails" in { implicit val docker = new TestDockerClient { override def inspectIPAddress(id: ContainerId, - network: String)(implicit transid: TransactionId): Future[ContainerIp] = { + network: String)(implicit transid: TransactionId): Future[ContainerAddress] = { inspects += ((id, network)) Future.failed(new RuntimeException()) } } implicit val runc = stub[RuncApi] - val container = DockerContainer.create(transid = transid, image = "image") + val container = DockerContainer.create(transid = transid, image = "image", dockerRunParameters = parameters) a[WhiskContainerStartupError] should be thrownBy await(container) docker.pulls should have size 0 @@ -186,7 +193,11 @@ class DockerContainerTests } implicit val runc = stub[RuncApi] - val container = DockerContainer.create(transid = transid, image = "image", userProvidedImage = true) + val container = DockerContainer.create( + transid = transid, + image = "image", + userProvidedImage = true, + dockerRunParameters = parameters) a[WhiskContainerStartupError] should be thrownBy await(container) docker.pulls should have size 1 @@ -198,14 +209,18 @@ class DockerContainerTests it should "provide a proper error if inspect fails for blackbox containers" in { implicit val docker = new TestDockerClient { override def inspectIPAddress(id: ContainerId, - network: String)(implicit transid: TransactionId): Future[ContainerIp] = { + network: String)(implicit transid: TransactionId): Future[ContainerAddress] = { inspects += ((id, network)) Future.failed(new RuntimeException()) } } implicit val runc = stub[RuncApi] - val container = DockerContainer.create(transid = transid, image = "image", userProvidedImage = true) + val container = DockerContainer.create( + transid = transid, + image = "image", + userProvidedImage = true, + dockerRunParameters = parameters) a[WhiskContainerStartupError] should be thrownBy await(container) docker.pulls should have size 1 @@ -223,7 +238,11 @@ class DockerContainerTests } implicit val runc = stub[RuncApi] - val container = DockerContainer.create(transid = transid, image = "image", userProvidedImage = true) + val container = DockerContainer.create( + transid = transid, + image = "image", + userProvidedImage = true, + dockerRunParameters = parameters) a[BlackboxStartupError] should be thrownBy await(container) docker.pulls should have size 1 @@ -240,7 +259,7 @@ class DockerContainerTests implicit val runc = stub[RuncApi] val id = ContainerId("id") - val container = new DockerContainer(id, ContainerIp("ip")) + val container = new DockerContainer(id, ContainerAddress("ip")) container.suspend() container.resume() @@ -254,7 +273,7 @@ class DockerContainerTests implicit val runc = stub[RuncApi] val id = ContainerId("id") - val container = new DockerContainer(id, ContainerIp("ip")) + val container = new DockerContainer(id, ContainerAddress("ip")) container.destroy() @@ -632,9 +651,10 @@ class DockerContainerTests Future.successful(ContainerId("testId")) } - def inspectIPAddress(id: ContainerId, network: String)(implicit transid: TransactionId): Future[ContainerIp] = { + def inspectIPAddress(id: ContainerId, network: String)( + implicit transid: TransactionId): Future[ContainerAddress] = { inspects += ((id, network)) - Future.successful(ContainerIp("testIp")) + Future.successful(ContainerAddress("testIp")) } def pause(id: ContainerId)(implicit transid: TransactionId): Future[Unit] = { diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/RuncClientTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/RuncClientTests.scala index 4d2db5e..b4e2e47 100644 --- a/tests/src/test/scala/whisk/core/containerpool/docker/test/RuncClientTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/RuncClientTests.scala @@ -30,7 +30,7 @@ import scala.concurrent.Await import org.scalatest.Matchers import whisk.core.containerpool.docker.RuncClient import common.StreamLogging -import whisk.core.containerpool.docker.ContainerId +import whisk.core.containerpool.ContainerId import whisk.common.TransactionId import org.scalatest.BeforeAndAfterEach import whisk.common.LogMarker diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala index 0f1e1b6..ca890a3 100644 --- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala @@ -18,18 +18,15 @@ package whisk.core.containerpool.test import java.time.Instant - import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration._ - import org.junit.runner.RunWith import org.scalamock.scalatest.MockFactory import org.scalatest.BeforeAndAfterAll import org.scalatest.FlatSpecLike import org.scalatest.Matchers import org.scalatest.junit.JUnitRunner - import akka.actor.ActorRef import akka.actor.ActorSystem import akka.actor.FSM @@ -39,8 +36,11 @@ import akka.actor.FSM.Transition import akka.testkit.ImplicitSender import akka.testkit.TestKit import common.LoggedFunction +import common.StreamLogging +import scala.concurrent.ExecutionContext import spray.json._ import spray.json.DefaultJsonProtocol._ +import whisk.common.Logging import whisk.common.TransactionId import whisk.core.connector.ActivationMessage import whisk.core.containerpool._ @@ -56,11 +56,13 @@ class ContainerProxyTests with FlatSpecLike with Matchers with BeforeAndAfterAll - with MockFactory { + with MockFactory + with StreamLogging { override def afterAll = TestKit.shutdownActorSystem(system) val timeout = 5.seconds + val log = logging // Common entities to pass to the tests. We don't really care what's inside // those for the behavior testing here, as none of the contents will really @@ -504,6 +506,10 @@ class ContainerProxyTests * Implements all the good cases of a perfect run to facilitate error case overriding. */ class TestContainer extends Container { + protected val id = ContainerId("testcontainer") + protected val addr = ContainerAddress("0.0.0.0") + protected implicit val logging: Logging = log + protected implicit val ec: ExecutionContext = system.dispatcher var suspendCount = 0 var resumeCount = 0 var destroyCount = 0 @@ -519,18 +525,18 @@ class ContainerProxyTests resumeCount += 1 Future.successful(()) } - def destroy()(implicit transid: TransactionId): Future[Unit] = { + override def destroy()(implicit transid: TransactionId): Future[Unit] = { destroyCount += 1 - Future.successful(()) + super.destroy() } - def initialize(initializer: JsObject, timeout: FiniteDuration)( + override def initialize(initializer: JsObject, timeout: FiniteDuration)( implicit transid: TransactionId): Future[Interval] = { initializeCount += 1 initializer shouldBe action.containerInitializer timeout shouldBe action.limits.timeout.duration Future.successful(Interval.zero) } - def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration)( + override def run(parameters: JsObject, environment: JsObject, timeout: FiniteDuration)( implicit transid: TransactionId): Future[(Interval, ActivationResponse)] = { runCount += 1 environment.fields("api_key") shouldBe message.user.authkey.toJson -- To stop receiving notification emails like this one, please contact ['"commits@openwhisk.apache.org" <commits@openwhisk.apache.org>'].