This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new d7c9fd6 Implement ContainerFactory.cpuShare to calculate per container share. (#3211) d7c9fd6 is described below commit d7c9fd661a8a5b339f20b3f549bfa4fa47124706 Author: tysonnorris <tysonnor...@gmail.com> AuthorDate: Fri Apr 13 01:10:44 2018 -0700 Implement ContainerFactory.cpuShare to calculate per container share. (#3211) Fixes #3110. --- ansible/roles/invoker/tasks/deploy.yml | 6 +- .../src/main/scala/whisk/core/WhiskConfig.scala | 8 +- .../core/containerpool/ContainerFactory.scala | 20 +++- .../whisk/core/mesos/MesosContainerFactory.scala | 5 +- core/invoker/src/main/resources/application.conf | 12 +- .../whisk/core/containerpool/ContainerPool.scala | 19 ++-- .../whisk/core/containerpool/ContainerProxy.scala | 14 ++- .../docker/DockerContainerFactory.scala | 46 +++++--- .../kubernetes/KubernetesContainerFactory.scala | 3 +- .../main/scala/whisk/core/invoker/Invoker.scala | 6 +- .../scala/whisk/core/invoker/InvokerReactive.scala | 19 ++-- tests/src/test/scala/common/LoggedFunction.scala | 10 ++ .../docker/test/DockerContainerFactoryTests.scala | 122 +++++++++++++++++++++ .../mesos/test/MesosContainerFactoryTest.scala | 38 +++++-- .../test/ContainerArgsConfigTest.scala | 6 +- .../containerpool/test/ContainerPoolTests.scala | 29 +++-- .../containerpool/test/ContainerProxyTests.scala | 54 ++++++--- 17 files changed, 315 insertions(+), 102 deletions(-) diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml index e9302eb..a1aca12 100644 --- a/ansible/roles/invoker/tasks/deploy.yml +++ b/ansible/roles/invoker/tasks/deploy.yml @@ -192,9 +192,9 @@ {% for item in (invoker_container_network_dns_servers | default()).split(' ') %} -e CONFIG_whisk_containerFactory_containerArgs_dnsServers_{{loop.index0}}={{ item }} {% endfor %} - -e INVOKER_NUMCORE='{{ invoker.numcore }}' - -e INVOKER_CORESHARE='{{ invoker.coreshare }}' - -e INVOKER_USE_RUNC='{{ invoker.useRunc }}' + -e CONFIG_whisk_containerPool_numCore='{{ invoker.numcore }}' + -e CONFIG_whisk_containerPool_coreShare='{{ invoker.coreshare }}' + -e CONFIG_whisk_docker_containerFactory_useRunc='{{ invoker.useRunc }}' -e INVOKER_NAME='{{ groups['invokers'].index(inventory_hostname) }}' -e WHISK_LOGS_DIR='{{ whisk_logs_dir }}' -e METRICS_KAMON='{{ metrics.kamon.enabled }}' diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala index 8779323..eb22e9d 100644 --- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala +++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala @@ -58,9 +58,6 @@ class WhiskConfig(requiredProperties: Map[String, String], val dockerImagePrefix = this(WhiskConfig.dockerImagePrefix) val dockerImageTag = this(WhiskConfig.dockerImageTag) - val invokerNumCore = this(WhiskConfig.invokerNumCore) - val invokerCoreShare = this(WhiskConfig.invokerCoreShare) - val invokerUseRunc = this.getAsBoolean(WhiskConfig.invokerUseRunc, true) val invokerName = this(WhiskConfig.invokerName) val wskApiHost = this(WhiskConfig.wskApiProtocol) + "://" + this(WhiskConfig.wskApiHostname) + ":" + this( @@ -170,9 +167,6 @@ object WhiskConfig { val dockerImagePrefix = "docker.image.prefix" val dockerImageTag = "docker.image.tag" - val invokerNumCore = "invoker.numcore" - val invokerCoreShare = "invoker.coreshare" - val invokerUseRunc = "invoker.use.runc" val invokerName = "invoker.name" val wskApiProtocol = "whisk.api.host.proto" @@ -233,10 +227,12 @@ object ConfigKeys { val docker = "whisk.docker" val dockerTimeouts = s"$docker.timeouts" + val dockerContainerFactory = s"${docker}.container-factory" val runc = "whisk.runc" val runcTimeouts = s"$runc.timeouts" val containerFactory = "whisk.container-factory" val containerArgs = s"$containerFactory.container-args" + val containerPool = "whisk.container-pool" val blacklist = "whisk.blacklist" val kubernetes = "whisk.kubernetes" diff --git a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala index 17860c0..35d3b8b 100644 --- a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala @@ -31,6 +31,23 @@ case class ContainerArgsConfig(network: String, dnsServers: Seq[String] = Seq.empty, extraArgs: Map[String, Set[String]] = Map.empty) +case class ContainerPoolConfig(numCore: Int, coreShare: Int) { + + /** + * The total number of containers is simply the number of cores dilated by the cpu sharing. + */ + def maxActiveContainers = numCore * coreShare + + /** + * The shareFactor indicates the number of containers that would share a single core, on average. + * cpuShare is a docker option (-c) whereby a container's CPU access is limited. + * A value of 1024 is the full share so a strict resource division with a shareFactor of 2 would yield 512. + * On an idle/underloaded system, a container will still get to use underutilized CPU shares. + */ + private val totalShare = 1024.0 // This is a pre-defined value coming from docker and not our hard-coded value. + def cpuShare = (totalShare / maxActiveContainers).toInt +} + /** * An abstraction for Container creation */ @@ -41,7 +58,8 @@ trait ContainerFactory { name: String, actionImage: ExecManifest.ImageName, userProvidedImage: Boolean, - memory: ByteSize)(implicit config: WhiskConfig, logging: Logging): Future[Container] + memory: ByteSize, + cpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container] /** perform any initialization */ def init(): Unit diff --git a/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala b/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala index 9d6204b..c9e634e 100644 --- a/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala +++ b/common/scala/src/main/scala/whisk/core/mesos/MesosContainerFactory.scala @@ -100,7 +100,8 @@ class MesosContainerFactory(config: WhiskConfig, name: String, actionImage: ExecManifest.ImageName, userProvidedImage: Boolean, - memory: ByteSize)(implicit config: WhiskConfig, logging: Logging): Future[Container] = { + memory: ByteSize, + cpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container] = { implicit val transid = tid val image = if (userProvidedImage) { actionImage.publicImageName @@ -117,7 +118,7 @@ class MesosContainerFactory(config: WhiskConfig, image = image, userProvidedImage = userProvidedImage, memory = memory, - cpuShares = config.invokerCoreShare.toInt, + cpuShares = cpuShares, environment = Map("__OW_API_HOST" -> config.wskApiHost), network = containerArgs.network, dnsServers = containerArgs.dnsServers, diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf index 3ee046d..aaff48f 100644 --- a/core/invoker/src/main/resources/application.conf +++ b/core/invoker/src/main/resources/application.conf @@ -18,6 +18,16 @@ whisk { unpause: 10 seconds } + docker.container-factory { + # Use runc (docker-runc) for pause/resume functionality in DockerContainerFactory + use-runc: true + } + + container-pool { + num-core: 4 # used for computing --cpushares, and max number of containers allowed + core-share: 2 # used for computing --cpushares, and max number of containers allowed + } + kubernetes { # Timeouts for k8s commands. Set to "Inf" to disable timeout. timeouts { @@ -42,7 +52,7 @@ whisk { container-factory.container-args { network: bridge dns-servers: [] - extra-args: {} + extra-args: {} # to pass additional args to 'docker run'; format is `{key1: [v1, v2], key2: [v1, v2]}` } container-proxy { diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala index 801fc09..1835569 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala @@ -54,16 +54,14 @@ case class WorkerData(data: ContainerData, state: WorkerState) * (kind, memory) and there is space in the pool. * * @param childFactory method to create new container proxy actor - * @param maxActiveContainers maximum amount of containers doing work - * @param maxPoolSize maximum size of containers allowed in the pool * @param feed actor to request more work from * @param prewarmConfig optional settings for container prewarming + * @param poolConfig config for the ContainerPool */ class ContainerPool(childFactory: ActorRefFactory => ActorRef, - maxActiveContainers: Int, - maxPoolSize: Int, feed: ActorRef, - prewarmConfig: Option[PrewarmingConfig] = None) + prewarmConfig: Option[PrewarmingConfig] = None, + poolConfig: ContainerPoolConfig) extends Actor { implicit val logging = new AkkaLogging(context.system.log) @@ -98,7 +96,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, // their requests and send them back to the pool for rescheduling (this may happen if "docker" operations // fail for example, or a container has aged and was destroying itself when a new request was assigned) case r: Run => - val createdContainer = if (busyPool.size < maxActiveContainers) { + val createdContainer = if (busyPool.size < poolConfig.maxActiveContainers) { // Schedule a job to a warm container ContainerPool @@ -107,7 +105,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, (container, "warm") }) .orElse { - if (busyPool.size + freePool.size < maxPoolSize) { + if (busyPool.size + freePool.size < poolConfig.maxActiveContainers) { takePrewarmContainer(r.action) .map(container => { (container, "prewarmed") @@ -147,7 +145,7 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, logging.error( this, s"Rescheduling Run message, too many message in the pool, freePoolSize: ${freePool.size}, " + - s"busyPoolSize: ${busyPool.size}, maxActiveContainers $maxActiveContainers, " + + s"busyPoolSize: ${busyPool.size}, maxActiveContainers ${poolConfig.maxActiveContainers}, " + s"userNamespace: ${r.msg.user.namespace}, action: ${r.action}")(r.msg.transid) Some(logMessageInterval.fromNow) } else { @@ -282,11 +280,10 @@ object ContainerPool { } def props(factory: ActorRefFactory => ActorRef, - maxActive: Int, - size: Int, + poolConfig: ContainerPoolConfig, feed: ActorRef, prewarmConfig: Option[PrewarmingConfig] = None) = - Props(new ContainerPool(factory, maxActive, size, feed, prewarmConfig)) + Props(new ContainerPool(factory, feed, prewarmConfig, poolConfig)) } /** Contains settings needed to perform container prewarming */ diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala index 99557e6..dd9885e 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala @@ -94,11 +94,12 @@ case object RescheduleJob // job is sent back to parent and could not be process * @param pauseGrace time to wait for new work before pausing the container */ class ContainerProxy( - factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container], + factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container], sendActiveAck: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any], storeActivation: (TransactionId, WhiskActivation) => Future[Any], collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs], instance: InstanceId, + poolConfig: ContainerPoolConfig, unusedTimeout: FiniteDuration, pauseGrace: FiniteDuration) extends FSM[ContainerState, ContainerData] @@ -117,7 +118,8 @@ class ContainerProxy( ContainerProxy.containerName(instance, "prewarm", job.exec.kind), job.exec.image, job.exec.pull, - job.memoryLimit) + job.memoryLimit, + poolConfig.cpuShare) .map(container => PreWarmedData(container, job.exec.kind, job.memoryLimit)) .pipeTo(self) @@ -133,7 +135,8 @@ class ContainerProxy( ContainerProxy.containerName(instance, job.msg.user.namespace.name, job.action.name.name), job.action.exec.image, job.action.exec.pull, - job.action.limits.memory.megabytes.MB) + job.action.limits.memory.megabytes.MB, + poolConfig.cpuShare) // container factory will either yield a new container ready to execute the action, or // starting up the container failed; for the latter, it's either an internal error starting @@ -413,14 +416,15 @@ final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration, paus object ContainerProxy { def props( - factory: (TransactionId, String, ImageName, Boolean, ByteSize) => Future[Container], + factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container], ack: (TransactionId, WhiskActivation, Boolean, InstanceId) => Future[Any], store: (TransactionId, WhiskActivation) => Future[Any], collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs], instance: InstanceId, + poolConfig: ContainerPoolConfig, unusedTimeout: FiniteDuration = timeouts.idleContainer, pauseGrace: FiniteDuration = timeouts.pauseGrace) = - Props(new ContainerProxy(factory, ack, store, collectLogs, instance, unusedTimeout, pauseGrace)) + Props(new ContainerProxy(factory, ack, store, collectLogs, instance, poolConfig, unusedTimeout, pauseGrace)) // Needs to be thread-safe as it's used by multiple proxies concurrently. private val containerCount = new Counter diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala index ff0dfb5..3f2ee86 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/docker/DockerContainerFactory.scala @@ -36,26 +36,28 @@ import pureconfig._ import whisk.core.ConfigKeys import whisk.core.containerpool.ContainerArgsConfig -class DockerContainerFactory(config: WhiskConfig, - instance: InstanceId, +case class DockerContainerFactoryConfig(useRunc: Boolean) + +class DockerContainerFactory(instance: InstanceId, parameters: Map[String, Set[String]], - containerArgs: ContainerArgsConfig = - loadConfigOrThrow[ContainerArgsConfig](ConfigKeys.containerArgs))( + containerArgsConfig: ContainerArgsConfig = + loadConfigOrThrow[ContainerArgsConfig](ConfigKeys.containerArgs), + dockerContainerFactoryConfig: DockerContainerFactoryConfig = + loadConfigOrThrow[DockerContainerFactoryConfig](ConfigKeys.dockerContainerFactory))( implicit actorSystem: ActorSystem, ec: ExecutionContext, - logging: Logging) + logging: Logging, + docker: DockerApiWithFileAccess, + runc: RuncApi) extends ContainerFactory { - /** Initialize container clients */ - implicit val docker = new DockerClientWithFileAccess()(ec) - implicit val runc = new RuncClient()(ec) - /** Create a container using docker cli */ override def createContainer(tid: TransactionId, name: String, actionImage: ExecManifest.ImageName, userProvidedImage: Boolean, - memory: ByteSize)(implicit config: WhiskConfig, logging: Logging): Future[Container] = { + memory: ByteSize, + cpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container] = { val image = if (userProvidedImage) { actionImage.publicImageName } else { @@ -67,13 +69,13 @@ class DockerContainerFactory(config: WhiskConfig, image = image, userProvidedImage = userProvidedImage, memory = memory, - cpuShares = config.invokerCoreShare.toInt, + cpuShares = cpuShares, environment = Map("__OW_API_HOST" -> config.wskApiHost), - network = containerArgs.network, - dnsServers = containerArgs.dnsServers, + network = containerArgsConfig.network, + dnsServers = containerArgsConfig.dnsServers, name = Some(name), - useRunc = config.invokerUseRunc, - parameters ++ containerArgs.extraArgs) + useRunc = dockerContainerFactoryConfig.useRunc, + parameters ++ containerArgsConfig.extraArgs.map { case (k, v) => ("--" + k, v) }) } /** Perform cleanup on init */ @@ -111,7 +113,7 @@ class DockerContainerFactory(config: WhiskConfig, containers => logging.info(this, s"removing ${containers.size} action containers.") val removals = containers.map { id => - (if (config.invokerUseRunc) { + (if (dockerContainerFactoryConfig.useRunc) { runc.resume(id) } else { docker.unpause(id) @@ -135,6 +137,14 @@ object DockerContainerFactoryProvider extends ContainerFactoryProvider { logging: Logging, config: WhiskConfig, instanceId: InstanceId, - parameters: Map[String, Set[String]]): ContainerFactory = - new DockerContainerFactory(config, instanceId, parameters)(actorSystem, actorSystem.dispatcher, logging) + parameters: Map[String, Set[String]]): ContainerFactory = { + + new DockerContainerFactory(instanceId, parameters)( + actorSystem, + actorSystem.dispatcher, + logging, + new DockerClientWithFileAccess()(actorSystem.dispatcher)(logging, actorSystem), + new RuncClient()(actorSystem.dispatcher)(logging, actorSystem)) + } + } diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala index 8b2d918..e332b84 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainerFactory.scala @@ -64,7 +64,8 @@ class KubernetesContainerFactory(label: String, config: WhiskConfig)(implicit ac name: String, actionImage: ImageName, userProvidedImage: Boolean, - memory: ByteSize)(implicit config: WhiskConfig, logging: Logging): Future[Container] = { + memory: ByteSize, + cpuShares: Int)(implicit config: WhiskConfig, logging: Logging): Future[Container] = { val image = if (userProvidedImage) { actionImage.publicImageName } else { diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala index e8e1dae..58a4d1d 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala @@ -56,11 +56,7 @@ object Invoker { ExecManifest.requiredProperties ++ kafkaHosts ++ zookeeperHosts ++ - wskApiHost ++ Map( - dockerImageTag -> "latest", - invokerNumCore -> "4", - invokerCoreShare -> "2", - invokerUseRunc -> "true") ++ + wskApiHost ++ Map(dockerImageTag -> "latest") ++ Map(invokerName -> "") def main(args: Array[String]): Unit = { diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala index 0729103..8601e4c 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala @@ -41,7 +41,11 @@ import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import scala.util.{Failure, Success} -class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: MessageProducer)( +class InvokerReactive( + config: WhiskConfig, + instance: InstanceId, + producer: MessageProducer, + poolConfig: ContainerPoolConfig = loadConfigOrThrow[ContainerPoolConfig](ConfigKeys.containerPool))( implicit actorSystem: ActorSystem, logging: Logging) { @@ -91,7 +95,7 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa /** Initialize message consumers */ private val topic = s"invoker${instance.toInt}" - private val maximumContainers = config.invokerNumCore.toInt * config.invokerCoreShare.toInt + private val maximumContainers = poolConfig.maxActiveContainers private val msgProvider = SpiLoader.get[MessagingProvider] private val consumer = msgProvider.getConsumer( config, @@ -140,7 +144,9 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa /** Creates a ContainerProxy Actor when being called. */ private val childFactory = (f: ActorRefFactory) => - f.actorOf(ContainerProxy.props(containerFactory.createContainer, ack, store, logsProvider.collectLogs, instance)) + f.actorOf( + ContainerProxy + .props(containerFactory.createContainer, ack, store, logsProvider.collectLogs, instance, poolConfig)) private val prewarmKind = "nodejs:6" private val prewarmExec = ExecManifest.runtimesManifest @@ -149,12 +155,7 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa .get private val pool = actorSystem.actorOf( - ContainerPool.props( - childFactory, - maximumContainers, - maximumContainers, - activationFeed, - Some(PrewarmingConfig(2, prewarmExec, 256.MB)))) + ContainerPool.props(childFactory, poolConfig, activationFeed, Some(PrewarmingConfig(2, prewarmExec, 256.MB)))) /** Is called when an ActivationMessage is read from Kafka */ def processActivationMessage(bytes: Array[Byte]): Future[Unit] = { diff --git a/tests/src/test/scala/common/LoggedFunction.scala b/tests/src/test/scala/common/LoggedFunction.scala index 3789de7..e8b7261 100644 --- a/tests/src/test/scala/common/LoggedFunction.scala +++ b/tests/src/test/scala/common/LoggedFunction.scala @@ -72,6 +72,15 @@ class LoggedFunction5[A1, A2, A3, A4, A5, B](body: (A1, A2, A3, A4, A5) => B) ex body(v1, v2, v3, v4, v5) } } +class LoggedFunction6[A1, A2, A3, A4, A5, A6, B](body: (A1, A2, A3, A4, A5, A6) => B) + extends Function6[A1, A2, A3, A4, A5, A6, B] { + val calls = mutable.Buffer[(A1, A2, A3, A4, A5, A6)]() + + override def apply(v1: A1, v2: A2, v3: A3, v4: A4, v5: A5, v6: A6): B = { + calls += ((v1, v2, v3, v4, v5, v6)) + body(v1, v2, v3, v4, v5, v6) + } +} object LoggedFunction { def apply[A1, B](body: (A1) => B) = new LoggedFunction1(body) @@ -79,4 +88,5 @@ object LoggedFunction { def apply[A1, A2, A3, B](body: (A1, A2, A3) => B) = new LoggedFunction3(body) def apply[A1, A2, A3, A4, B](body: (A1, A2, A3, A4) => B) = new LoggedFunction4(body) def apply[A1, A2, A3, A4, A5, B](body: (A1, A2, A3, A4, A5) => B) = new LoggedFunction5(body) + def apply[A1, A2, A3, A4, A5, A6, B](body: (A1, A2, A3, A4, A5, A6) => B) = new LoggedFunction6(body) } diff --git a/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala new file mode 100644 index 0000000..31139dc --- /dev/null +++ b/tests/src/test/scala/whisk/core/containerpool/docker/test/DockerContainerFactoryTests.scala @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.containerpool.docker.test + +import common.StreamLogging +import common.TimingHelpers +import common.WskActorSystem +import org.junit.runner.RunWith +import org.scalamock.scalatest.MockFactory +import org.scalatest.BeforeAndAfterEach +import org.scalatest.FlatSpec +import org.scalatest.Matchers +import org.scalatest.junit.JUnitRunner +import scala.concurrent.Await +import scala.concurrent.Future +import scala.concurrent.duration._ +import whisk.common.TransactionId +import whisk.core.WhiskConfig +import whisk.core.WhiskConfig._ +import whisk.core.containerpool.ContainerAddress +import whisk.core.containerpool.ContainerArgsConfig +import whisk.core.containerpool.ContainerId +import whisk.core.containerpool.docker.DockerApiWithFileAccess +import whisk.core.containerpool.docker.DockerContainerFactory +import whisk.core.containerpool.docker.DockerContainerFactoryConfig +import whisk.core.containerpool.docker.RuncApi +import whisk.core.entity.ExecManifest +import whisk.core.entity.InstanceId +import whisk.core.entity.size._ + +@RunWith(classOf[JUnitRunner]) +class DockerContainerFactoryTests + extends FlatSpec + with Matchers + with MockFactory + with StreamLogging + with BeforeAndAfterEach + with WskActorSystem + with TimingHelpers { + + implicit val config = new WhiskConfig( + ExecManifest.requiredProperties ++ Map(dockerImagePrefix -> "testing", dockerImageTag -> "testtag")) + ExecManifest.initialize(config) should be a 'success + + behavior of "DockerContainerFactory" + + it should "set the docker run args based on ContainerArgsConfig" in { + + val image = ExecManifest.runtimesManifest.manifests("nodejs").image + + implicit val tid = TransactionId.testing + val dockerApiStub = mock[DockerApiWithFileAccess] + //setup run expectation + (dockerApiStub + .run(_: String, _: Seq[String])(_: TransactionId)) + .expects( + image.localImageName(config.dockerRegistry, config.dockerImagePrefix, Some(config.dockerImageTag)), + List( + "--cpu-shares", + "32", //should be calculated as 1024/(numcore * sharefactor) via ContainerFactory.cpuShare + "--memory", + "10m", + "--memory-swap", + "10m", + "--network", + "net1", + "-e", + "__OW_API_HOST=://:", + "--dns", + "dns1", + "--dns", + "dns2", + "--name", + "testContainer", + "--env", + "e1", + "--env", + "e2"), + *) + .returning(Future.successful { ContainerId("fakecontainerid") }) + //setup inspect expectation + (dockerApiStub + .inspectIPAddress(_: ContainerId, _: String)(_: TransactionId)) + .expects(ContainerId("fakecontainerid"), "net1", *) + .returning(Future.successful { ContainerAddress("1.2.3.4", 1234) }) + //setup rm expectation + (dockerApiStub + .rm(_: ContainerId)(_: TransactionId)) + .expects(ContainerId("fakecontainerid"), *) + .returning(Future.successful(Unit)) + + val factory = + new DockerContainerFactory( + InstanceId(0), + Map(), + ContainerArgsConfig("net1", Seq("dns1", "dns2"), Map("env" -> Set("e1", "e2"))), + DockerContainerFactoryConfig(true))(actorSystem, executionContext, logging, dockerApiStub, mock[RuncApi]) + + val cf = factory.createContainer(tid, "testContainer", image, false, 10.MB, 32) + + val c = Await.result(cf, 5000.milliseconds) + + Await.result(c.destroy(), 500.milliseconds) + + } + +} diff --git a/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala b/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala index deb23ff..9a46509 100644 --- a/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala +++ b/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala @@ -49,6 +49,7 @@ import whisk.common.TransactionId import whisk.core.WhiskConfig import whisk.core.WhiskConfig._ import whisk.core.containerpool.ContainerArgsConfig +import whisk.core.containerpool.ContainerPoolConfig import whisk.core.containerpool.logging.DockerToActivationLogStore import whisk.core.entity.ExecManifest.ImageName import whisk.core.entity.size._ @@ -66,7 +67,7 @@ class MesosContainerFactoryTest def await[A](f: Future[A], timeout: FiniteDuration = 500.milliseconds) = Await.result[A](f, timeout) implicit val wskConfig = - new WhiskConfig(Map(invokerCoreShare -> "2", dockerImageTag -> "latest", wskApiHostname -> "apihost") ++ wskApiHost) + new WhiskConfig(Map(dockerImageTag -> "latest", wskApiHostname -> "apihost") ++ wskApiHost) var count = 0 var lastTaskId: String = null def testTaskId() = { @@ -75,8 +76,9 @@ class MesosContainerFactoryTest lastTaskId } - //TODO: adjust this once the invokerCoreShare issue is fixed see #3110 - def cpus() = wskConfig.invokerCoreShare.toInt / 1024.0 // + val poolConfig = ContainerPoolConfig(8, 10) + val dockerCpuShares = poolConfig.cpuShare + val mesosCpus = poolConfig.cpuShare / 1024.0 val containerArgsConfig = new ContainerArgsConfig("net1", Seq("dns1", "dns2"), Map("extra1" -> Set("e1", "e2"), "extra2" -> Set("e3", "e4"))) @@ -116,14 +118,20 @@ class MesosContainerFactoryTest testTaskId) expectMsg(Subscribe) - factory.createContainer(TransactionId.testing, "mesosContainer", ImageName("fakeImage"), false, 1.MB) + factory.createContainer( + TransactionId.testing, + "mesosContainer", + ImageName("fakeImage"), + false, + 1.MB, + poolConfig.cpuShare) expectMsg( SubmitTask(TaskDef( lastTaskId, "mesosContainer", "fakeImage:" + wskConfig.dockerImageTag, - cpus, + mesosCpus, 1, List(8080), Some(0), @@ -159,13 +167,19 @@ class MesosContainerFactoryTest probe.reply(new SubscribeComplete) //create the container - val c = factory.createContainer(TransactionId.testing, "mesosContainer", ImageName("fakeImage"), false, 1.MB) + val c = factory.createContainer( + TransactionId.testing, + "mesosContainer", + ImageName("fakeImage"), + false, + 1.MB, + poolConfig.cpuShare) probe.expectMsg( SubmitTask(TaskDef( lastTaskId, "mesosContainer", "fakeImage:" + wskConfig.dockerImageTag, - cpus, + mesosCpus, 1, List(8080), Some(0), @@ -228,14 +242,20 @@ class MesosContainerFactoryTest probe.reply(new SubscribeComplete) //create the container - val c = factory.createContainer(TransactionId.testing, "mesosContainer", ImageName("fakeImage"), false, 1.MB) + val c = factory.createContainer( + TransactionId.testing, + "mesosContainer", + ImageName("fakeImage"), + false, + 1.MB, + poolConfig.cpuShare) probe.expectMsg( SubmitTask(TaskDef( lastTaskId, "mesosContainer", "fakeImage:" + wskConfig.dockerImageTag, - cpus, + mesosCpus, 1, List(8080), Some(0), diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerArgsConfigTest.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerArgsConfigTest.scala index 9990c15..82ad231 100644 --- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerArgsConfigTest.scala +++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerArgsConfigTest.scala @@ -41,8 +41,8 @@ class ContainerArgsConfigTest extends FlatSpec with Matchers { System.setProperty("whisk.container-factory.container-args.extra-args.label.0", "l1") System.setProperty("whisk.container-factory.container-args.extra-args.label.1", "l2") System.setProperty("whisk.container-factory.container-args.extra-args.label.3", "l3") - System.setProperty("whisk.container-factory.container-args.extra-args.environment.0", "e1") - System.setProperty("whisk.container-factory.container-args.extra-args.environment.1", "e2") + System.setProperty("whisk.container-factory.container-args.extra-args.env.0", "e1") + System.setProperty("whisk.container-factory.container-args.extra-args.env.1", "e2") System.setProperty("whisk.container-factory.container-args.dns-servers.0", "google.com") System.setProperty("whisk.container-factory.container-args.dns-servers.1", "1.2.3.4") @@ -52,7 +52,7 @@ class ContainerArgsConfigTest extends FlatSpec with Matchers { config.dnsServers shouldBe Seq[String]("google.com", "1.2.3.4") //check map parsing of extra-args config config.extraArgs.get("label") shouldBe Some(Set("l1", "l2", "l3")) - config.extraArgs.get("environment") shouldBe Some(Set("e1", "e2")) + config.extraArgs.get("env") shouldBe Some(Set("e1", "e2")) } } diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala index ff6d49e..e41cea6 100644 --- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala @@ -123,7 +123,7 @@ class ContainerPoolTests it should "reuse a warm container" in within(timeout) { val (containers, factory) = testContainers(2) val feed = TestProbe() - val pool = system.actorOf(ContainerPool.props(factory, 2, 2, feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(2, 2), feed.ref)) pool ! runMessage containers(0).expectMsg(runMessage) @@ -137,7 +137,7 @@ class ContainerPoolTests it should "reuse a warm container when action is the same even if revision changes" in within(timeout) { val (containers, factory) = testContainers(2) val feed = TestProbe() - val pool = system.actorOf(ContainerPool.props(factory, 2, 2, feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(2, 2), feed.ref)) pool ! runMessage containers(0).expectMsg(runMessage) @@ -152,7 +152,7 @@ class ContainerPoolTests val (containers, factory) = testContainers(2) val feed = TestProbe() - val pool = system.actorOf(ContainerPool.props(factory, 2, 2, feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(2, 2), feed.ref)) pool ! runMessage containers(0).expectMsg(runMessage) // Note that the container doesn't respond, thus it's not free to take work @@ -166,7 +166,7 @@ class ContainerPoolTests val feed = TestProbe() // a pool with only 1 slot - val pool = system.actorOf(ContainerPool.props(factory, 1, 1, feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(1, 1), feed.ref)) pool ! runMessage containers(0).expectMsg(runMessage) containers(0).send(pool, NeedWork(warmedData())) @@ -181,7 +181,7 @@ class ContainerPoolTests val feed = TestProbe() // a pool with only 1 active slot but 2 slots in total - val pool = system.actorOf(ContainerPool.props(factory, 1, 2, feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(1, 2), feed.ref)) // Run the first container pool ! runMessage @@ -207,7 +207,7 @@ class ContainerPoolTests val feed = TestProbe() // a pool with only 1 slot - val pool = system.actorOf(ContainerPool.props(factory, 1, 1, feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(1, 1), feed.ref)) pool ! runMessage containers(0).expectMsg(runMessage) containers(0).send(pool, NeedWork(warmedData())) @@ -222,7 +222,7 @@ class ContainerPoolTests val feed = TestProbe() // a pool with only 1 slot - val pool = system.actorOf(ContainerPool.props(factory, 1, 1, feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(1, 1), feed.ref)) pool ! runMessage containers(0).expectMsg(runMessage) containers(0).send(pool, RescheduleJob) // emulate container failure ... @@ -239,7 +239,8 @@ class ContainerPoolTests val feed = TestProbe() val pool = - system.actorOf(ContainerPool.props(factory, 0, 0, feed.ref, Some(PrewarmingConfig(1, exec, memoryLimit)))) + system.actorOf( + ContainerPool.props(factory, ContainerPoolConfig(0, 0), feed.ref, Some(PrewarmingConfig(1, exec, memoryLimit)))) containers(0).expectMsg(Start(exec, memoryLimit)) } @@ -248,7 +249,8 @@ class ContainerPoolTests val feed = TestProbe() val pool = - system.actorOf(ContainerPool.props(factory, 1, 1, feed.ref, Some(PrewarmingConfig(1, exec, memoryLimit)))) + system.actorOf( + ContainerPool.props(factory, ContainerPoolConfig(1, 1), feed.ref, Some(PrewarmingConfig(1, exec, memoryLimit)))) containers(0).expectMsg(Start(exec, memoryLimit)) containers(0).send(pool, NeedWork(preWarmedData(exec.kind))) pool ! runMessage @@ -262,7 +264,8 @@ class ContainerPoolTests val alternativeExec = CodeExecAsString(RuntimeManifest("anotherKind", ImageName("testImage")), "testCode", None) val pool = system.actorOf( - ContainerPool.props(factory, 1, 1, feed.ref, Some(PrewarmingConfig(1, alternativeExec, memoryLimit)))) + ContainerPool + .props(factory, ContainerPoolConfig(1, 1), feed.ref, Some(PrewarmingConfig(1, alternativeExec, memoryLimit)))) containers(0).expectMsg(Start(alternativeExec, memoryLimit)) // container0 was prewarmed containers(0).send(pool, NeedWork(preWarmedData(alternativeExec.kind))) pool ! runMessage @@ -276,7 +279,9 @@ class ContainerPoolTests val alternativeLimit = 128.MB val pool = - system.actorOf(ContainerPool.props(factory, 1, 1, feed.ref, Some(PrewarmingConfig(1, exec, alternativeLimit)))) + system.actorOf( + ContainerPool + .props(factory, ContainerPoolConfig(1, 1), feed.ref, Some(PrewarmingConfig(1, exec, alternativeLimit)))) containers(0).expectMsg(Start(exec, alternativeLimit)) // container0 was prewarmed containers(0).send(pool, NeedWork(preWarmedData(exec.kind, alternativeLimit))) pool ! runMessage @@ -290,7 +295,7 @@ class ContainerPoolTests val (containers, factory) = testContainers(2) val feed = TestProbe() - val pool = system.actorOf(ContainerPool.props(factory, 2, 2, feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, ContainerPoolConfig(2, 2), feed.ref)) // container0 is created and used pool ! runMessage diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala index 5a13084..7f2e553 100644 --- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala @@ -158,7 +158,7 @@ class ContainerProxyTests /** Creates an inspectable factory */ def createFactory(response: Future[Container]) = LoggedFunction { - (_: TransactionId, _: String, _: ImageName, _: Boolean, _: ByteSize) => + (_: TransactionId, _: String, _: ImageName, _: Boolean, _: ByteSize, _: Int) => response } @@ -176,6 +176,8 @@ class ContainerProxyTests Future.successful(()) } + val poolConfig = ContainerPoolConfig(1, 2) + behavior of "ContainerProxy" /* @@ -188,12 +190,19 @@ class ContainerProxyTests val machine = childActorOf( ContainerProxy - .props(factory, createAcker, store, createCollector(), InstanceId(0, Some("myname")), pauseGrace = timeout)) + .props( + factory, + createAcker, + store, + createCollector(), + InstanceId(0, Some("myname")), + poolConfig, + pauseGrace = timeout)) registerCallback(machine) preWarm(machine) factory.calls should have size 1 - val (tid, name, _, _, memory) = factory.calls(0) + val (tid, name, _, _, memory, cpuShares) = factory.calls(0) tid shouldBe TransactionId.invokerWarmup name should fullyMatch regex """wskmyname\d+_\d+_prewarm_actionKind""" memory shouldBe memoryLimit @@ -208,7 +217,8 @@ class ContainerProxyTests val collector = createCollector() val machine = - childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout)) + childActorOf( + ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) preWarm(machine) @@ -243,7 +253,8 @@ class ContainerProxyTests val collector = createCollector() val machine = - childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout)) + childActorOf( + ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) preWarm(machine) @@ -289,7 +300,8 @@ class ContainerProxyTests val collector = createCollector() val machine = - childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout)) + childActorOf( + ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) preWarm(machine) @@ -326,7 +338,8 @@ class ContainerProxyTests val collector = createCollector() val machine = - childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout)) + childActorOf( + ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) run(machine, Uninitialized) @@ -358,7 +371,8 @@ class ContainerProxyTests val collector = createCollector() val machine = - childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout)) + childActorOf( + ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) machine ! Run(action, message) expectMsg(Transition(machine, Uninitialized, Running)) @@ -392,7 +406,8 @@ class ContainerProxyTests val collector = createCollector() val machine = - childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout)) + childActorOf( + ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) machine ! Run(action, message) expectMsg(Transition(machine, Uninitialized, Running)) @@ -430,7 +445,8 @@ class ContainerProxyTests val collector = createCollector() val machine = - childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout)) + childActorOf( + ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) machine ! Run(action, message) expectMsg(Transition(machine, Uninitialized, Running)) @@ -459,7 +475,8 @@ class ContainerProxyTests createCollector(Future.failed(LogCollectingException(ActivationLogs(partialLogs)))) val machine = - childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout)) + childActorOf( + ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) machine ! Run(action, message) expectMsg(Transition(machine, Uninitialized, Running)) @@ -487,7 +504,8 @@ class ContainerProxyTests val collector = createCollector(Future.failed(new Exception)) val machine = - childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout)) + childActorOf( + ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) machine ! Run(action, message) expectMsg(Transition(machine, Uninitialized, Running)) @@ -519,7 +537,8 @@ class ContainerProxyTests val store = createStore val machine = - childActorOf(ContainerProxy.props(factory, acker, store, createCollector(), InstanceId(0), pauseGrace = timeout)) + childActorOf( + ContainerProxy.props(factory, acker, store, createCollector(), InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) run(machine, Uninitialized) // first run an activation timeout(machine) // times out Ready state so container suspends @@ -553,7 +572,8 @@ class ContainerProxyTests val store = createStore val machine = - childActorOf(ContainerProxy.props(factory, acker, store, createCollector(), InstanceId(0), pauseGrace = timeout)) + childActorOf( + ContainerProxy.props(factory, acker, store, createCollector(), InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) run(machine, Uninitialized) timeout(machine) // times out Ready state so container suspends @@ -588,7 +608,8 @@ class ContainerProxyTests val collector = createCollector() val machine = - childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout)) + childActorOf( + ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) // Start running the action @@ -638,7 +659,8 @@ class ContainerProxyTests val collector = createCollector() val machine = - childActorOf(ContainerProxy.props(factory, acker, store, collector, InstanceId(0), pauseGrace = timeout)) + childActorOf( + ContainerProxy.props(factory, acker, store, collector, InstanceId(0), poolConfig, pauseGrace = timeout)) registerCallback(machine) run(machine, Uninitialized) timeout(machine) -- To stop receiving notification emails like this one, please contact markusthoem...@apache.org.