This is an automated email from the ASF dual-hosted git repository. markusthoemmes pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 5b3e0b6 Memory based loadbalancing. (#3747) 5b3e0b6 is described below commit 5b3e0b6a334b78fc783a2cd655f0f30ea58a68e8 Author: Christian Bickel <git...@cbickel.de> AuthorDate: Thu Aug 23 11:07:47 2018 +0200 Memory based loadbalancing. (#3747) --- ansible/group_vars/all | 4 +- ansible/roles/controller/tasks/deploy.yml | 4 +- ansible/roles/invoker/tasks/deploy.yml | 3 +- ansible/templates/whisk.properties.j2 | 4 - .../src/main/scala/whisk/core/WhiskConfig.scala | 3 - .../scala/whisk/core/containerpool/Container.scala | 32 ++-- .../core/containerpool/ContainerFactory.scala | 20 +- .../src/main/scala/whisk/core/entity/Size.scala | 17 ++ core/controller/src/main/resources/reference.conf | 2 +- .../ShardingContainerPoolBalancer.scala | 47 +++-- core/invoker/src/main/resources/application.conf | 3 +- .../whisk/core/containerpool/ContainerPool.scala | 207 +++++++++++++------- .../whisk/core/containerpool/ContainerProxy.scala | 38 ++-- .../scala/whisk/core/invoker/InvokerReactive.scala | 9 +- .../test/DockerToActivationFileLogStoreTests.scala | 2 +- .../mesos/test/MesosContainerFactoryTest.scala | 27 +-- .../containerpool/test/ContainerPoolTests.scala | 209 +++++++++++++++++++-- .../containerpool/test/ContainerProxyTests.scala | 2 +- .../scala/whisk/core/entity/test/SizeTests.scala | 42 +++++ .../test/ShardingContainerPoolBalancerTests.scala | 105 ++++++++--- 20 files changed, 561 insertions(+), 219 deletions(-) diff --git a/ansible/group_vars/all b/ansible/group_vars/all index 2114630..ffc658b 100644 --- a/ansible/group_vars/all +++ b/ansible/group_vars/all @@ -173,9 +173,7 @@ invoker: port: 12001 heap: "{{ invoker_heap | default('2g') }}" arguments: "{{ invoker_arguments | default('') }}" - numcore: 2 - coreshare: 2 - busyThreshold: "{{ invoker_busy_threshold | default(16) }}" + userMemory: "{{ invoker_user_memory | default('1024 m') }}" instances: "{{ groups['invokers'] | length }}" # Specify if it is allowed to deploy more than 1 invoker on a single machine. allowMultipleInstances: "{{ invoker_allow_multiple_instances | default(false) }}" diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml index b39c7c2..d7f4f59 100644 --- a/ansible/roles/controller/tasks/deploy.yml +++ b/ansible/roles/controller/tasks/deploy.yml @@ -219,8 +219,8 @@ "{{ controller.ssl.storeFlavor }}" "CONFIG_whisk_controller_https_clientAuth": "{{ controller.ssl.clientAuth }}" - "CONFIG_whisk_loadbalancer_invokerBusyThreshold": - "{{ invoker.busyThreshold }}" + "CONFIG_whisk_loadbalancer_invokerUserMemory": + "{{ invoker.userMemory }}" "CONFIG_whisk_loadbalancer_blackboxFraction": "{{ controller.blackboxFraction }}" "CONFIG_whisk_loadbalancer_timeoutFactor": diff --git a/ansible/roles/invoker/tasks/deploy.yml b/ansible/roles/invoker/tasks/deploy.yml index 8fd93bf..ba17b70 100644 --- a/ansible/roles/invoker/tasks/deploy.yml +++ b/ansible/roles/invoker/tasks/deploy.yml @@ -204,8 +204,7 @@ "CONFIG_whisk_runtimes_localImagePrefix": "{{ runtimes_local_image_prefix | default() }}" "CONFIG_whisk_containerFactory_containerArgs_network": "{{ invoker_container_network_name | default('bridge') }}" "INVOKER_CONTAINER_POLICY": "{{ invoker_container_policy_name | default()}}" - "CONFIG_whisk_containerPool_numCore": "{{ invoker.numcore }}" - "CONFIG_whisk_containerPool_coreShare": "{{ invoker.coreshare }}" + "CONFIG_whisk_containerPool_userMemory": "{{ invoker.userMemory }}" "CONFIG_whisk_docker_client_parallelRuns": "{{ invoker_parallel_runs | default() }}" "CONFIG_whisk_docker_containerFactory_useRunc": "{{ invoker.useRunc }}" "WHISK_LOGS_DIR": "{{ whisk_logs_dir }}" diff --git a/ansible/templates/whisk.properties.j2 b/ansible/templates/whisk.properties.j2 index c18e22a..6b79896 100644 --- a/ansible/templates/whisk.properties.j2 +++ b/ansible/templates/whisk.properties.j2 @@ -59,8 +59,6 @@ controller.protocol={{ controller.protocol }} invoker.container.network=bridge invoker.container.policy={{ invoker_container_policy_name | default()}} invoker.container.dns={{ invoker_container_network_dns_servers | default()}} -invoker.numcore={{ invoker.numcore }} -invoker.coreshare={{ invoker.coreshare }} invoker.useRunc={{ invoker.useRunc }} main.docker.endpoint={{ hostvars[groups["controllers"]|first].ansible_host }}:{{ docker.port }} @@ -92,5 +90,3 @@ db.instances={{ db.instances }} apigw.auth.user={{apigw_auth_user}} apigw.auth.pwd={{apigw_auth_pwd}} apigw.host.v2={{apigw_host_v2}} - -loadbalancer.invokerBusyThreshold={{ invoker.busyThreshold }} diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala index 5f7a8db..df24317 100644 --- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala +++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala @@ -57,7 +57,6 @@ class WhiskConfig(requiredProperties: Map[String, String], val wskApiHost = this(WhiskConfig.wskApiProtocol) + "://" + this(WhiskConfig.wskApiHostname) + ":" + this( WhiskConfig.wskApiPort) val controllerBlackboxFraction = this.getAsDouble(WhiskConfig.controllerBlackboxFraction, 0.10) - val loadbalancerInvokerBusyThreshold = this.getAsInt(WhiskConfig.loadbalancerInvokerBusyThreshold, 16) val controllerInstances = this(WhiskConfig.controllerInstances) val edgeHost = this(WhiskConfig.edgeHostName) + ":" + this(WhiskConfig.edgeHostApiPort) @@ -163,8 +162,6 @@ object WhiskConfig { val controllerInstances = "controller.instances" val dbInstances = "db.instances" - val loadbalancerInvokerBusyThreshold = "loadbalancer.invokerBusyThreshold" - val kafkaHostList = "kafka.hosts" val zookeeperHostList = "zookeeper.hosts" diff --git a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala index 7c46615..ff0124e 100644 --- a/common/scala/src/main/scala/whisk/core/containerpool/Container.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/Container.scala @@ -17,31 +17,25 @@ package whisk.core.containerpool -import akka.actor.ActorSystem import java.time.Instant + +import akka.actor.ActorSystem +import akka.event.Logging.InfoLevel import akka.stream.scaladsl.Source import akka.util.ByteString import pureconfig._ -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.concurrent.duration.Duration -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.duration._ -import scala.util.Failure -import scala.util.Success -import spray.json.JsObject import spray.json.DefaultJsonProtocol._ -import whisk.common.Logging -import whisk.common.LoggingMarkers -import whisk.common.TransactionId -import whisk.core.entity.ActivationResponse -import whisk.core.entity.ActivationResponse.ContainerConnectionError -import whisk.core.entity.ActivationResponse.ContainerResponse -import whisk.core.entity.ByteSize -import whisk.http.Messages -import akka.event.Logging.InfoLevel +import spray.json.JsObject +import whisk.common.{Logging, LoggingMarkers, TransactionId} import whisk.core.ConfigKeys -import whisk.core.entity.ActivationEntityLimit +import whisk.core.entity.ActivationResponse.{ContainerConnectionError, ContainerResponse} +import whisk.core.entity.{ActivationEntityLimit, ActivationResponse, ByteSize} +import whisk.core.entity.size._ +import whisk.http.Messages + +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.{Duration, FiniteDuration, _} +import scala.util.{Failure, Success} /** * An OpenWhisk biased container abstraction. This is **not only** an abstraction diff --git a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala index 3c56cf9..7b77f5f 100644 --- a/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala +++ b/common/scala/src/main/scala/whisk/core/containerpool/ContainerFactory.scala @@ -18,25 +18,18 @@ package whisk.core.containerpool import akka.actor.ActorSystem -import scala.concurrent.Future -import whisk.common.Logging -import whisk.common.TransactionId +import whisk.common.{Logging, TransactionId} import whisk.core.WhiskConfig -import whisk.core.entity.ByteSize -import whisk.core.entity.ExecManifest -import whisk.core.entity.InvokerInstanceId +import whisk.core.entity.{ByteSize, ExecManifest, InvokerInstanceId} import whisk.spi.Spi +import scala.concurrent.Future + case class ContainerArgsConfig(network: String, dnsServers: Seq[String] = Seq.empty, extraArgs: Map[String, Set[String]] = Map.empty) -case class ContainerPoolConfig(numCore: Int, coreShare: Int, akkaClient: Boolean) { - - /** - * The total number of containers is simply the number of cores dilated by the cpu sharing. - */ - def maxActiveContainers = numCore * coreShare +case class ContainerPoolConfig(userMemory: ByteSize, akkaClient: Boolean) { /** * The shareFactor indicates the number of containers that would share a single core, on average. @@ -45,7 +38,8 @@ case class ContainerPoolConfig(numCore: Int, coreShare: Int, akkaClient: Boolean * On an idle/underloaded system, a container will still get to use underutilized CPU shares. */ private val totalShare = 1024.0 // This is a pre-defined value coming from docker and not our hard-coded value. - def cpuShare = (totalShare / maxActiveContainers).toInt + // Grant more CPU to a container if it allocates more memory. + def cpuShare(reservedMemory: ByteSize) = (totalShare / (userMemory.toBytes / reservedMemory.toBytes)).toInt } /** diff --git a/common/scala/src/main/scala/whisk/core/entity/Size.scala b/common/scala/src/main/scala/whisk/core/entity/Size.scala index a51eb2e..34f5bc6 100644 --- a/common/scala/src/main/scala/whisk/core/entity/Size.scala +++ b/common/scala/src/main/scala/whisk/core/entity/Size.scala @@ -69,6 +69,23 @@ case class ByteSize(size: Long, unit: SizeUnits.Unit) extends Ordered[ByteSize] ByteSize(commonSize, commonUnit) } + def *(other: Int): ByteSize = { + ByteSize(toBytes * other, SizeUnits.BYTE) + } + + def /(other: ByteSize): Double = { + // Without throwing the exception the result would be `Infinity` here + if (other.toBytes == 0) { + throw new ArithmeticException + } else { + (1.0 * toBytes) / (1.0 * other.toBytes) + } + } + + def /(other: Int): ByteSize = { + ByteSize(toBytes / other, SizeUnits.BYTE) + } + def compare(other: ByteSize) = toBytes compare other.toBytes override def equals(that: Any): Boolean = that match { diff --git a/core/controller/src/main/resources/reference.conf b/core/controller/src/main/resources/reference.conf index 3cf073c..c2e329e 100644 --- a/core/controller/src/main/resources/reference.conf +++ b/core/controller/src/main/resources/reference.conf @@ -6,7 +6,7 @@ whisk { use-cluster-bootstrap: false } loadbalancer { - invoker-busy-threshold: 4 + user-memory: 1024 m blackbox-fraction: 10% # factor to increase the timeout for forced active acks # timeout = time-limit.std * timeoutfactor + 1m diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala index 47be740..4adeb2a 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala @@ -89,13 +89,13 @@ import scala.util.{Failure, Success} * * ## Capacity checking * - * The maximum capacity per invoker is configured using `invoker-busy-threshold`, which is the maximum amount of actions + * The maximum capacity per invoker is configured using `user-memory`, which is the maximum amount of memory of actions * running in parallel on that invoker. * * Spare capacity is determined by what the loadbalancer thinks it scheduled to each invoker. Upon scheduling, an entry - * is made to update the books and a slot in a Semaphore is taken. That slot is only released after the response from - * the invoker (active-ack) arrives **or** after the active-ack times out. The Semaphore has as many slots as are - * configured via `invoker-busy-threshold`. + * is made to update the books and a slot for each MB of the actions memory limit in a Semaphore is taken. These slots + * are only released after the response from the invoker (active-ack) arrives **or** after the active-ack times out. + * The Semaphore has as many slots as MBs are configured in `user-memory`. * * Known caveats: * - In an overload scenario, activations are queued directly to the invokers, which makes the active-ack timeout @@ -232,7 +232,12 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con val hash = ShardingContainerPoolBalancer.generateHash(msg.user.namespace.name, action.fullyQualifiedName(false)) val homeInvoker = hash % invokersToUse.size val stepSize = stepSizes(hash % stepSizes.size) - ShardingContainerPoolBalancer.schedule(invokersToUse, schedulingState.invokerSlots, homeInvoker, stepSize) + ShardingContainerPoolBalancer.schedule( + invokersToUse, + schedulingState.invokerSlots, + action.limits.memory.megabytes, + homeInvoker, + stepSize) } else { None } @@ -381,7 +386,7 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con totalActivations.decrement() totalActivationMemory.add(entry.memory.toMB * (-1)) activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement()) - schedulingState.invokerSlots.lift(invoker.toInt).foreach(_.release()) + schedulingState.invokerSlots.lift(invoker.toInt).foreach(_.release(entry.memory.toMB.toInt)) if (!forced) { entry.timeoutHandler.cancel() @@ -456,6 +461,7 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider { * * @param invokers a list of available invokers to search in, including their state * @param dispatched semaphores for each invoker to give the slots away from + * @param slots Number of slots, that need to be acquired (e.g. memory in MB) * @param index the index to start from (initially should be the "homeInvoker" * @param step stable identifier of the entity to be scheduled * @return an invoker to schedule to or None of no invoker is available @@ -463,6 +469,7 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider { @tailrec def schedule(invokers: IndexedSeq[InvokerHealth], dispatched: IndexedSeq[ForcibleSemaphore], + slots: Int, index: Int, step: Int, stepsDone: Int = 0)(implicit logging: Logging, transId: TransactionId): Option[InvokerInstanceId] = { @@ -471,7 +478,7 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider { if (numInvokers > 0) { val invoker = invokers(index) // If the current invoker is healthy and we can get a slot - if (invoker.status.isUsable && dispatched(invoker.id.toInt).tryAcquire()) { + if (invoker.status.isUsable && dispatched(invoker.id.toInt).tryAcquire(slots)) { Some(invoker.id) } else { // If we've gone through all invokers @@ -480,7 +487,7 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider { if (healthyInvokers.nonEmpty) { // Choose a healthy invoker randomly val random = healthyInvokers(ThreadLocalRandom.current().nextInt(healthyInvokers.size)).id - dispatched(random.toInt).forceAcquire() + dispatched(random.toInt).forceAcquire(slots) logging.warn(this, s"system is overloaded. Chose invoker${random.toInt} by random assignment.") Some(random) } else { @@ -488,7 +495,7 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider { } } else { val newIndex = (index + step) % numInvokers - schedule(invokers, dispatched, newIndex, step, stepsDone + 1) + schedule(invokers, dispatched, slots, newIndex, step, stepsDone + 1) } } } else { @@ -518,7 +525,7 @@ case class ShardingContainerPoolBalancerState( lbConfig: ShardingContainerPoolBalancerConfig = loadConfigOrThrow[ShardingContainerPoolBalancerConfig](ConfigKeys.loadbalancer))(implicit logging: Logging) { - private val totalInvokerThreshold = lbConfig.invokerBusyThreshold + private val totalInvokerThreshold = lbConfig.invokerUserMemory private var currentInvokerThreshold = totalInvokerThreshold private val blackboxFraction: Double = Math.max(0.0, Math.min(1.0, lbConfig.blackboxFraction)) @@ -564,7 +571,7 @@ case class ShardingContainerPoolBalancerState( if (oldSize < newSize) { // Keeps the existing state.. _invokerSlots = _invokerSlots ++ IndexedSeq.fill(newSize - oldSize) { - new ForcibleSemaphore(currentInvokerThreshold) + new ForcibleSemaphore(currentInvokerThreshold.toMB.toInt) } } } @@ -587,9 +594,17 @@ case class ShardingContainerPoolBalancerState( val actualSize = newSize max 1 // if a cluster size < 1 is reported, falls back to a size of 1 (alone) if (_clusterSize != actualSize) { _clusterSize = actualSize - val newTreshold = (totalInvokerThreshold / actualSize) max 1 // letting this fall below 1 doesn't make sense + val newTreshold = if (totalInvokerThreshold / actualSize < MemoryLimit.minMemory) { + logging.warn( + this, + s"registered controllers: ${_clusterSize}: the slots per invoker fall below the min memory of one action.")( + TransactionId.loadbalancer) + MemoryLimit.minMemory // letting this fall below minMemory doesn't make sense + } else { + totalInvokerThreshold / actualSize + } currentInvokerThreshold = newTreshold - _invokerSlots = _invokerSlots.map(_ => new ForcibleSemaphore(currentInvokerThreshold)) + _invokerSlots = _invokerSlots.map(_ => new ForcibleSemaphore(currentInvokerThreshold.toMB.toInt)) logging.info( this, @@ -610,10 +625,12 @@ case class ClusterConfig(useClusterBootstrap: Boolean) * Configuration for the sharding container pool balancer. * * @param blackboxFraction the fraction of all invokers to use exclusively for blackboxes - * @param invokerBusyThreshold how many slots an invoker has available in total + * @param invokerUserMemory how many Bytes of memory an invoker has available in total for user containers * @param timeoutFactor factor to influence the timeout period for forced active acks (time-limit.std * timeoutFactor + 1m) */ -case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double, invokerBusyThreshold: Int, timeoutFactor: Int) +case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double, + invokerUserMemory: ByteSize, + timeoutFactor: Int) /** * State kept for each activation until completion. diff --git a/core/invoker/src/main/resources/application.conf b/core/invoker/src/main/resources/application.conf index 244220a..57989a8 100644 --- a/core/invoker/src/main/resources/application.conf +++ b/core/invoker/src/main/resources/application.conf @@ -37,8 +37,7 @@ whisk { } container-pool { - num-core: 4 # used for computing --cpushares, and max number of containers allowed - core-share: 2 # used for computing --cpushares, and max number of containers allowed + user-memory: 1024 m akka-client: false # if true, use PoolingContainerClient for HTTP from invoker to action container (otherwise use ApacheBlockingContainerClient) } diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala index 90f5d73..fd50a3c 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerPool.scala @@ -17,14 +17,15 @@ package whisk.core.containerpool -import scala.collection.immutable -import whisk.common.{AkkaLogging, LoggingMarkers, TransactionId} import akka.actor.{Actor, ActorRef, ActorRefFactory, Props} +import whisk.common.{AkkaLogging, LoggingMarkers, TransactionId} +import whisk.core.connector.MessageFeed import whisk.core.entity._ import whisk.core.entity.size._ -import whisk.core.connector.MessageFeed +import scala.collection.immutable import scala.concurrent.duration._ +import scala.util.Try sealed trait WorkerState case object Busy extends WorkerState @@ -57,11 +58,17 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, prewarmConfig: List[PrewarmingConfig] = List.empty, poolConfig: ContainerPoolConfig) extends Actor { + import ContainerPool.memoryConsumptionOf + implicit val logging = new AkkaLogging(context.system.log) var freePool = immutable.Map.empty[ActorRef, ContainerData] var busyPool = immutable.Map.empty[ActorRef, ContainerData] var prewarmedPool = immutable.Map.empty[ActorRef, ContainerData] + // If all memory slots are occupied and if there is currently no container to be removed, than the actions will be + // buffered here to keep order of computation. + // Otherwise actions with small memory-limits could block actions with large memory limits. + var runBuffer = immutable.Queue.empty[Run] val logMessageInterval = 10.seconds prewarmConfig.foreach { config => @@ -91,62 +98,87 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, // their requests and send them back to the pool for rescheduling (this may happen if "docker" operations // fail for example, or a container has aged and was destroying itself when a new request was assigned) case r: Run => - val createdContainer = if (busyPool.size < poolConfig.maxActiveContainers) { + // Check if the message is resent from the buffer. Only the first message on the buffer can be resent. + val isResentFromBuffer = runBuffer.nonEmpty && runBuffer.dequeueOption.exists(_._1.msg == r.msg) - // Schedule a job to a warm container - ContainerPool - .schedule(r.action, r.msg.user.namespace.name, freePool) - .map(container => { - (container, "warm") - }) - .orElse { - if (busyPool.size + freePool.size < poolConfig.maxActiveContainers) { - takePrewarmContainer(r.action) - .map(container => { - (container, "prewarmed") - }) - .orElse { - Some(createContainer(), "cold") - } - } else None - } - .orElse { - // Remove a container and create a new one for the given job - ContainerPool.remove(freePool).map { toDelete => - removeContainer(toDelete) - takePrewarmContainer(r.action) - .map(container => { - (container, "recreated") - }) - .getOrElse { - (createContainer(), "recreated") - } - } - } - } else None + // Only process request, if there are no other requests waiting for free slots, or if the current request is the + // next request to process + // It is guaranteed, that only the first message on the buffer is resent. + if (runBuffer.isEmpty || isResentFromBuffer) { + val createdContainer = + // Is there enough space on the invoker for this action to be executed. + if (hasPoolSpaceFor(busyPool, r.action.limits.memory.megabytes.MB)) { + // Schedule a job to a warm container + ContainerPool + .schedule(r.action, r.msg.user.namespace.name, freePool) + .map(container => (container, "warm")) + .orElse( + // There was no warm container. Try to take a prewarm container or a cold container. + + // Is there enough space to create a new container or do other containers have to be removed? + if (hasPoolSpaceFor(busyPool ++ freePool, r.action.limits.memory.megabytes.MB)) { + takePrewarmContainer(r.action) + .map(container => (container, "prewarmed")) + .orElse(Some(createContainer(r.action.limits.memory.megabytes.MB), "cold")) + } else None) + .orElse( + // Remove a container and create a new one for the given job + ContainerPool + // Only free up the amount, that is really needed to free up + .remove(freePool, Math.min(r.action.limits.memory.megabytes, memoryConsumptionOf(freePool)).MB) + .map(removeContainer) + // If the list had at least one entry, enough containers were removed to start the new container. After + // removing the containers, we are not interested anymore in the containers that have been removed. + .headOption + .map(_ => + takePrewarmContainer(r.action) + .map(container => (container, "recreatedPrewarm")) + .getOrElse(createContainer(r.action.limits.memory.megabytes.MB), "recreated"))) + } else None - createdContainer match { - case Some(((actor, data), containerState)) => - busyPool = busyPool + (actor -> data) - freePool = freePool - actor - actor ! r // forwards the run request to the container - logContainerStart(r, containerState) - case None => - // this can also happen if createContainer fails to start a new container, or - // if a job is rescheduled but the container it was allocated to has not yet destroyed itself - // (and a new container would over commit the pool) - val isErrorLogged = r.retryLogDeadline.map(_.isOverdue).getOrElse(true) - val retryLogDeadline = if (isErrorLogged) { - logging.error( - this, - s"Rescheduling Run message, too many message in the pool, freePoolSize: ${freePool.size}, " + - s"busyPoolSize: ${busyPool.size}, maxActiveContainers ${poolConfig.maxActiveContainers}, " + - s"userNamespace: ${r.msg.user.namespace.name}, action: ${r.action}")(r.msg.transid) - Some(logMessageInterval.fromNow) - } else { - r.retryLogDeadline - } - self ! Run(r.action, r.msg, retryLogDeadline) + createdContainer match { + case Some(((actor, data), containerState)) => + busyPool = busyPool + (actor -> data) + freePool = freePool - actor + // Remove the action that get's executed now from the buffer and execute the next one afterwards. + if (isResentFromBuffer) { + // It is guaranteed that the currently executed messages is the head of the queue, if the message comes + // from the buffer + val (_, newBuffer) = runBuffer.dequeue + runBuffer = newBuffer + runBuffer.dequeueOption.foreach { case (run, _) => self ! run } + } + actor ! r // forwards the run request to the container + logContainerStart(r, containerState) + case None => + // this can also happen if createContainer fails to start a new container, or + // if a job is rescheduled but the container it was allocated to has not yet destroyed itself + // (and a new container would over commit the pool) + val isErrorLogged = r.retryLogDeadline.map(_.isOverdue).getOrElse(true) + val retryLogDeadline = if (isErrorLogged) { + logging.error( + this, + s"Rescheduling Run message, too many message in the pool, " + + s"freePoolSize: ${freePool.size} containers and ${memoryConsumptionOf(freePool)} MB, " + + s"busyPoolSize: ${busyPool.size} containers and ${memoryConsumptionOf(busyPool)} MB, " + + s"maxContainersMemory ${poolConfig.userMemory.toMB} MB, " + + s"userNamespace: ${r.msg.user.namespace.name}, action: ${r.action}, " + + s"needed memory: ${r.action.limits.memory.megabytes} MB")(r.msg.transid) + Some(logMessageInterval.fromNow) + } else { + r.retryLogDeadline + } + if (!isResentFromBuffer) { + // Add this request to the buffer, as it is not there yet. + runBuffer = runBuffer.enqueue(r) + } + // As this request is the first one in the buffer, try again to execute it. + self ! Run(r.action, r.msg, retryLogDeadline) + } + } else { + // There are currently actions waiting to be executed before this action gets executed. + // These waiting actions were not able to free up enough memory. + runBuffer = runBuffer.enqueue(r) } // Container is free to take more work @@ -181,22 +213,22 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, } /** Creates a new container and updates state accordingly. */ - def createContainer(): (ActorRef, ContainerData) = { + def createContainer(memoryLimit: ByteSize): (ActorRef, ContainerData) = { val ref = childFactory(context) - val data = NoData() + val data = MemoryData(memoryLimit) freePool = freePool + (ref -> data) ref -> data } /** Creates a new prewarmed container */ - def prewarmContainer(exec: CodeExec[_], memoryLimit: ByteSize) = + def prewarmContainer(exec: CodeExec[_], memoryLimit: ByteSize): Unit = childFactory(context) ! Start(exec, memoryLimit) /** * Takes a prewarm container out of the prewarmed pool - * iff a container with a matching kind is found. + * iff a container with a matching kind and memory is found. * - * @param kind the kind you want to invoke + * @param action the action that holds the kind and the required memory. * @return the container iff found */ def takePrewarmContainer(action: ExecutableWhiskAction): Option[(ActorRef, ContainerData)] = { @@ -213,7 +245,8 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, freePool = freePool + (ref -> data) prewarmedPool = prewarmedPool - ref // Create a new prewarm container - // NOTE: prewarming ignores the action code in exec, but this is dangerous as the field is accessible to the factory + // NOTE: prewarming ignores the action code in exec, but this is dangerous as the field is accessible to the + // factory prewarmContainer(action.exec, memory) (ref, data) } @@ -225,11 +258,32 @@ class ContainerPool(childFactory: ActorRefFactory => ActorRef, freePool = freePool - toDelete busyPool = busyPool - toDelete } + + /** + * Calculate if there is enough free memory within a given pool. + * + * @param pool The pool, that has to be checked, if there is enough free memory. + * @param memory The amount of memory to check. + * @return true, if there is enough space for the given amount of memory. + */ + def hasPoolSpaceFor[A](pool: Map[A, ContainerData], memory: ByteSize): Boolean = { + memoryConsumptionOf(pool) + memory.toMB <= poolConfig.userMemory.toMB + } } object ContainerPool { /** + * Calculate the memory of a given pool. + * + * @param pool The pool with the containers. + * @return The memory consumption of all containers in the pool in Megabytes. + */ + protected[containerpool] def memoryConsumptionOf[A](pool: Map[A, ContainerData]): Long = { + pool.map(_._2.memoryLimit.toMB).sum + } + + /** * Finds the best container for a given job to run on. * * Selects an arbitrary warm container from the passed pool of idle containers @@ -255,22 +309,37 @@ object ContainerPool { /** * Finds the oldest previously used container to remove to make space for the job passed to run. + * Depending on the space that has to be allocated, several containers might be removed. * * NOTE: This method is never called to remove an action that is in the pool already, * since this would be picked up earlier in the scheduler and the container reused. * * @param pool a map of all free containers in the pool - * @return a container to be removed iff found + * @param memory the amount of memory that has to be freed up + * @return a list of containers to be removed iff found */ - protected[containerpool] def remove[A](pool: Map[A, ContainerData]): Option[A] = { + protected[containerpool] def remove[A](pool: Map[A, ContainerData], memory: ByteSize): List[A] = { val freeContainers = pool.collect { + // Only warm containers will be removed. Prewarmed containers will stay always. case (ref, w: WarmedData) => ref -> w } - if (freeContainers.nonEmpty) { - val (ref, _) = freeContainers.minBy(_._2.lastUsed) - Some(ref) - } else None + if (memory > 0.B && freeContainers.nonEmpty && memoryConsumptionOf(freeContainers) >= memory.toMB) { + // Remove the oldest container if: + // - there is more memory required + // - there are still containers that can be removed + // - there are enough free containers that can be removed + val (ref, data) = freeContainers.minBy(_._2.lastUsed) + // Catch exception if remaining memory will be negative + val remainingMemory = Try(memory - data.memoryLimit).getOrElse(0.B) + List(ref) ++ remove(freeContainers - ref, remainingMemory) + } else { + // If this is the first call: All containers are in use currently, or there is more memory needed than + // containers can be removed. + // Or, if this is one of the recursions: Enough containers are found to get the memory, that is + // necessary. -> Abort recursion + List.empty + } } def props(factory: ActorRefFactory => ActorRef, diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala index 92eda45..0ddd666 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala @@ -19,28 +19,26 @@ package whisk.core.containerpool import java.time.Instant -import scala.concurrent.Future -import scala.concurrent.duration._ -import scala.util.Success -import scala.util.Failure -import akka.actor.FSM -import akka.actor.Props -import akka.actor.Stash import akka.actor.Status.{Failure => FailureMessage} +import akka.actor.{FSM, Props, Stash} +import akka.event.Logging.InfoLevel import akka.pattern.pipe -import spray.json._ +import pureconfig.loadConfigOrThrow import spray.json.DefaultJsonProtocol._ +import spray.json._ import whisk.common.{AkkaLogging, Counter, LoggingMarkers, TransactionId} +import whisk.core.ConfigKeys import whisk.core.connector.ActivationMessage import whisk.core.containerpool.logging.LogCollectingException +import whisk.core.database.UserContext +import whisk.core.entity.ExecManifest.ImageName import whisk.core.entity._ import whisk.core.entity.size._ -import whisk.core.entity.ExecManifest.ImageName import whisk.http.Messages -import akka.event.Logging.InfoLevel -import pureconfig.loadConfigOrThrow -import whisk.core.ConfigKeys -import whisk.core.database.UserContext + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.{Failure, Success} // States sealed trait ContainerState @@ -54,14 +52,16 @@ case object Paused extends ContainerState case object Removing extends ContainerState // Data -sealed abstract class ContainerData(val lastUsed: Instant) -case class NoData() extends ContainerData(Instant.EPOCH) -case class PreWarmedData(container: Container, kind: String, memoryLimit: ByteSize) extends ContainerData(Instant.EPOCH) +sealed abstract class ContainerData(val lastUsed: Instant, val memoryLimit: ByteSize) +case class NoData() extends ContainerData(Instant.EPOCH, 0.B) +case class MemoryData(override val memoryLimit: ByteSize) extends ContainerData(Instant.EPOCH, memoryLimit) +case class PreWarmedData(container: Container, kind: String, override val memoryLimit: ByteSize) + extends ContainerData(Instant.EPOCH, memoryLimit) case class WarmedData(container: Container, invocationNamespace: EntityName, action: ExecutableWhiskAction, override val lastUsed: Instant) - extends ContainerData(lastUsed) + extends ContainerData(lastUsed, action.limits.memory.megabytes.MB) // Events received by the actor case class Start(exec: CodeExec[_], memoryLimit: ByteSize) @@ -120,7 +120,7 @@ class ContainerProxy( job.exec.image, job.exec.pull, job.memoryLimit, - poolConfig.cpuShare) + poolConfig.cpuShare(job.memoryLimit)) .map(container => PreWarmedData(container, job.exec.kind, job.memoryLimit)) .pipeTo(self) @@ -137,7 +137,7 @@ class ContainerProxy( job.action.exec.image, job.action.exec.pull, job.action.limits.memory.megabytes.MB, - poolConfig.cpuShare) + poolConfig.cpuShare(job.action.limits.memory.megabytes.MB)) // container factory will either yield a new container ready to execute the action, or // starting up the container failed; for the latter, it's either an internal error starting diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala index 634e416..5f4fd8d 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala @@ -25,23 +25,24 @@ import akka.event.Logging.InfoLevel import akka.stream.ActorMaterializer import org.apache.kafka.common.errors.RecordTooLargeException import pureconfig._ +import spray.json.DefaultJsonProtocol._ import spray.json._ import whisk.common.tracing.WhiskTracerProvider import whisk.common._ -import whisk.core.{ConfigKeys, WhiskConfig} import whisk.core.connector._ import whisk.core.containerpool._ import whisk.core.containerpool.logging.LogStoreProvider import whisk.core.database._ import whisk.core.entity._ +import whisk.core.entity.size._ +import whisk.core.{ConfigKeys, WhiskConfig} import whisk.http.Messages import whisk.spi.SpiLoader import whisk.core.database.UserContext -import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} -import DefaultJsonProtocol._ class InvokerReactive( config: WhiskConfig, @@ -99,7 +100,7 @@ class InvokerReactive( /** Initialize message consumers */ private val topic = s"invoker${instance.toInt}" - private val maximumContainers = poolConfig.maxActiveContainers + private val maximumContainers = (poolConfig.userMemory / MemoryLimit.minMemory).toInt private val msgProvider = SpiLoader.get[MessagingProvider] private val consumer = msgProvider.getConsumer( config, diff --git a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala index e539eeb..8548cfc 100644 --- a/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/logging/test/DockerToActivationFileLogStoreTests.scala @@ -26,8 +26,8 @@ import common.{StreamLogging, WskActorSystem} import org.junit.runner.RunWith import org.scalatest.Matchers import org.scalatest.junit.JUnitRunner -import spray.json._ import spray.json.DefaultJsonProtocol._ +import spray.json._ import whisk.common.TransactionId import whisk.core.containerpool.logging.{DockerToActivationFileLogStore, LogLine} import whisk.core.entity._ diff --git a/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala b/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala index 73ca88e..d5187ed 100644 --- a/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala +++ b/tests/src/test/scala/whisk/core/containerpool/mesos/test/MesosContainerFactoryTest.scala @@ -78,9 +78,10 @@ class MesosContainerFactoryTest lastTaskId } - val poolConfig = ContainerPoolConfig(8, 10, false) - val dockerCpuShares = poolConfig.cpuShare - val mesosCpus = poolConfig.cpuShare / 1024.0 + // 80 slots, each 265MB + val poolConfig = ContainerPoolConfig(21200.MB, false) + val actionMemory = 265.MB + val mesosCpus = poolConfig.cpuShare(actionMemory) / 1024.0 val containerArgsConfig = new ContainerArgsConfig("net1", Seq("dns1", "dns2"), Map("extra1" -> Set("e1", "e2"), "extra2" -> Set("e3", "e4"))) @@ -134,8 +135,8 @@ class MesosContainerFactoryTest "mesosContainer", ImageName("fakeImage"), false, - 1.MB, - poolConfig.cpuShare) + actionMemory, + poolConfig.cpuShare(actionMemory)) expectMsg( SubmitTask(TaskDef( @@ -143,7 +144,7 @@ class MesosContainerFactoryTest "mesosContainer", "fakeImage", mesosCpus, - 1, + actionMemory.toMB.toInt, List(8080), Some(0), false, @@ -184,15 +185,15 @@ class MesosContainerFactoryTest "mesosContainer", ImageName("fakeImage"), false, - 1.MB, - poolConfig.cpuShare) + actionMemory, + poolConfig.cpuShare(actionMemory)) probe.expectMsg( SubmitTask(TaskDef( lastTaskId, "mesosContainer", "fakeImage", mesosCpus, - 1, + actionMemory.toMB.toInt, List(8080), Some(0), false, @@ -255,8 +256,8 @@ class MesosContainerFactoryTest "mesosContainer", ImageName("fakeImage"), false, - 1.MB, - poolConfig.cpuShare) + actionMemory, + poolConfig.cpuShare(actionMemory)) probe.expectMsg( SubmitTask(TaskDef( @@ -264,7 +265,7 @@ class MesosContainerFactoryTest "mesosContainer", "fakeImage", mesosCpus, - 1, + actionMemory.toMB.toInt, List(8080), Some(0), false, @@ -293,7 +294,7 @@ class MesosContainerFactoryTest implicit val tid = TransactionId.testing implicit val m = ActorMaterializer() val logs = container - .logs(1.MB, false) + .logs(actionMemory, false) .via(DockerToActivationLogStore.toFormattedString) .runWith(Sink.seq) await(logs)(0) should endWith diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala index 20bd353..78317ad 100644 --- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerPoolTests.scala @@ -88,8 +88,13 @@ class ContainerPoolTests val differentInvocationNamespace = EntityName("invocationSpace2") val action = ExecutableWhiskAction(EntityPath("actionSpace"), EntityName("actionName"), exec) val differentAction = action.copy(name = EntityName("actionName2")) + val largeAction = + action.copy( + name = EntityName("largeAction"), + limits = ActionLimits(memory = MemoryLimit(MemoryLimit.stdMemory * 2))) val runMessage = createRunMessage(action, invocationNamespace) + val runMessageLarge = createRunMessage(largeAction, invocationNamespace) val runMessageDifferentAction = createRunMessage(differentAction, invocationNamespace) val runMessageDifferentVersion = createRunMessage(action.copy().revision(DocRevision("v2")), invocationNamespace) val runMessageDifferentNamespace = createRunMessage(action, differentInvocationNamespace) @@ -113,7 +118,7 @@ class ContainerPoolTests (containers, factory) } - def poolConfig(numCore: Int, coreShare: Int) = ContainerPoolConfig(numCore, coreShare, false) + def poolConfig(userMemory: ByteSize) = ContainerPoolConfig(userMemory, false) behavior of "ContainerPool" @@ -126,7 +131,8 @@ class ContainerPoolTests it should "reuse a warm container" in within(timeout) { val (containers, factory) = testContainers(2) val feed = TestProbe() - val pool = system.actorOf(ContainerPool.props(factory, poolConfig(2, 2), feed.ref)) + // Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled. + val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref)) pool ! runMessage containers(0).expectMsg(runMessage) @@ -140,7 +146,8 @@ class ContainerPoolTests it should "reuse a warm container when action is the same even if revision changes" in within(timeout) { val (containers, factory) = testContainers(2) val feed = TestProbe() - val pool = system.actorOf(ContainerPool.props(factory, poolConfig(2, 2), feed.ref)) + // Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled. + val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref)) pool ! runMessage containers(0).expectMsg(runMessage) @@ -155,7 +162,8 @@ class ContainerPoolTests val (containers, factory) = testContainers(2) val feed = TestProbe() - val pool = system.actorOf(ContainerPool.props(factory, poolConfig(2, 2), feed.ref)) + // Actions are created with default memory limit (MemoryLimit.stdMemory). This means 4 actions can be scheduled. + val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref)) pool ! runMessage containers(0).expectMsg(runMessage) // Note that the container doesn't respond, thus it's not free to take work @@ -169,7 +177,7 @@ class ContainerPoolTests val feed = TestProbe() // a pool with only 1 slot - val pool = system.actorOf(ContainerPool.props(factory, poolConfig(1, 1), feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory), feed.ref)) pool ! runMessage containers(0).expectMsg(runMessage) containers(0).send(pool, NeedWork(warmedData())) @@ -179,12 +187,35 @@ class ContainerPoolTests containers(1).expectMsg(runMessageDifferentEverything) } + it should "remove several containers to make space in the pool if it is already full and a different large action arrives" in within( + timeout) { + val (containers, factory) = testContainers(3) + val feed = TestProbe() + + // a pool with slots for 2 actions with default memory limit. + val pool = system.actorOf(ContainerPool.props(factory, poolConfig(512.MB), feed.ref)) + pool ! runMessage + containers(0).expectMsg(runMessage) + pool ! runMessageDifferentAction // 2 * stdMemory taken -> full + containers(1).expectMsg(runMessageDifferentAction) + + containers(0).send(pool, NeedWork(warmedData())) // first action finished -> 1 * stdMemory taken + feed.expectMsg(MessageFeed.Processed) + containers(1).send(pool, NeedWork(warmedData())) // second action finished -> 1 * stdMemory taken + feed.expectMsg(MessageFeed.Processed) + + pool ! runMessageLarge // need to remove both action to make space for the large action (needs 2 * stdMemory) + containers(0).expectMsg(Remove) + containers(1).expectMsg(Remove) + containers(2).expectMsg(runMessageLarge) + } + it should "cache a container if there is still space in the pool" in within(timeout) { val (containers, factory) = testContainers(2) val feed = TestProbe() // a pool with only 1 active slot but 2 slots in total - val pool = system.actorOf(ContainerPool.props(factory, poolConfig(1, 2), feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 2), feed.ref)) // Run the first container pool ! runMessage @@ -210,7 +241,7 @@ class ContainerPoolTests val feed = TestProbe() // a pool with only 1 slot - val pool = system.actorOf(ContainerPool.props(factory, poolConfig(1, 1), feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory), feed.ref)) pool ! runMessage containers(0).expectMsg(runMessage) containers(0).send(pool, NeedWork(warmedData())) @@ -225,7 +256,7 @@ class ContainerPoolTests val feed = TestProbe() // a pool with only 1 slot - val pool = system.actorOf(ContainerPool.props(factory, poolConfig(1, 1), feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory), feed.ref)) pool ! runMessage containers(0).expectMsg(runMessage) containers(0).send(pool, RescheduleJob) // emulate container failure ... @@ -234,6 +265,34 @@ class ContainerPoolTests containers(1).expectMsg(runMessage) // job resent to new actor } + it should "not start a new container if there is not enough space in the pool" in within(timeout) { + val (containers, factory) = testContainers(2) + val feed = TestProbe() + + val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 2), feed.ref)) + + // Start first action + pool ! runMessage // 1 * stdMemory taken + containers(0).expectMsg(runMessage) + + // Send second action to the pool + pool ! runMessageLarge // message is too large to be processed immediately. + containers(1).expectNoMessage(100.milliseconds) + + // First action is finished + containers(0).send(pool, NeedWork(warmedData())) // pool is empty again. + feed.expectMsg(MessageFeed.Processed) + + // Second action should run now + containers(1).expectMsgPF() { + // The `Some` assures, that it has been retried while the first action was still blocking the invoker. + case Run(runMessageLarge.action, runMessageLarge.msg, Some(_)) => true + } + + containers(1).send(pool, NeedWork(warmedData())) + feed.expectMsg(MessageFeed.Processed) + } + /* * CONTAINER PREWARMING */ @@ -244,7 +303,7 @@ class ContainerPoolTests val pool = system.actorOf( ContainerPool - .props(factory, poolConfig(0, 0), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit)))) + .props(factory, poolConfig(0.MB), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit)))) containers(0).expectMsg(Start(exec, memoryLimit)) } @@ -255,7 +314,7 @@ class ContainerPoolTests val pool = system.actorOf( ContainerPool - .props(factory, poolConfig(1, 1), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit)))) + .props(factory, poolConfig(MemoryLimit.stdMemory), feed.ref, List(PrewarmingConfig(1, exec, memoryLimit)))) containers(0).expectMsg(Start(exec, memoryLimit)) containers(0).send(pool, NeedWork(preWarmedData(exec.kind))) pool ! runMessage @@ -270,7 +329,11 @@ class ContainerPoolTests val pool = system.actorOf( ContainerPool - .props(factory, poolConfig(1, 1), feed.ref, List(PrewarmingConfig(1, alternativeExec, memoryLimit)))) + .props( + factory, + poolConfig(MemoryLimit.stdMemory), + feed.ref, + List(PrewarmingConfig(1, alternativeExec, memoryLimit)))) containers(0).expectMsg(Start(alternativeExec, memoryLimit)) // container0 was prewarmed containers(0).send(pool, NeedWork(preWarmedData(alternativeExec.kind))) pool ! runMessage @@ -284,9 +347,8 @@ class ContainerPoolTests val alternativeLimit = 128.MB val pool = - system.actorOf( - ContainerPool - .props(factory, poolConfig(1, 1), feed.ref, List(PrewarmingConfig(1, exec, alternativeLimit)))) + system.actorOf(ContainerPool + .props(factory, poolConfig(MemoryLimit.stdMemory), feed.ref, List(PrewarmingConfig(1, exec, alternativeLimit)))) containers(0).expectMsg(Start(exec, alternativeLimit)) // container0 was prewarmed containers(0).send(pool, NeedWork(preWarmedData(exec.kind, alternativeLimit))) pool ! runMessage @@ -300,7 +362,7 @@ class ContainerPoolTests val (containers, factory) = testContainers(2) val feed = TestProbe() - val pool = system.actorOf(ContainerPool.props(factory, poolConfig(2, 2), feed.ref)) + val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 4), feed.ref)) // container0 is created and used pool ! runMessage @@ -319,6 +381,97 @@ class ContainerPoolTests pool ! runMessage containers(1).expectMsg(runMessage) } + + /* + * Run buffer + */ + it should "first put messages into the queue and retrying them and then put messages only into the queue" in within( + timeout) { + val (containers, factory) = testContainers(2) + val feed = TestProbe() + + // Pool with 512 MB usermemory + val pool = + system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 2), feed.ref)) + + // Send action that blocks the pool + pool ! runMessageLarge + containers(0).expectMsg(runMessageLarge) + + // Send action that should be written to the queue and retried in invoker + pool ! runMessage + containers(1).expectNoMessage(100.milliseconds) + + // Send another message that should not be retried, but put into the queue as well + pool ! runMessageDifferentAction + containers(2).expectNoMessage(100.milliseconds) + + // Action with 512 MB is finished + containers(0).send(pool, NeedWork(warmedData())) + feed.expectMsg(MessageFeed.Processed) + + // Action 1 should start immediately + containers(0).expectMsgPF() { + // The `Some` assures, that it has been retried while the first action was still blocking the invoker. + case Run(runMessage.action, runMessage.msg, Some(_)) => true + } + // Action 2 should start immediately as well (without any retries, as there is already enough space in the pool) + containers(1).expectMsg(runMessageDifferentAction) + } + + it should "process activations in the order they are arriving" in within(timeout) { + val (containers, factory) = testContainers(4) + val feed = TestProbe() + + // Pool with 512 MB usermemory + val pool = system.actorOf(ContainerPool.props(factory, poolConfig(MemoryLimit.stdMemory * 2), feed.ref)) + + // Send 4 actions to the ContainerPool (Action 0, Action 2 and Action 3 with each 265 MB and Action 1 with 512 MB) + pool ! runMessage + containers(0).expectMsg(runMessage) + pool ! runMessageLarge + containers(1).expectNoMessage(100.milliseconds) + pool ! runMessageDifferentNamespace + containers(2).expectNoMessage(100.milliseconds) + pool ! runMessageDifferentAction + containers(3).expectNoMessage(100.milliseconds) + + // Action 0 ist finished -> Large action should be executed now + containers(0).send(pool, NeedWork(warmedData())) + feed.expectMsg(MessageFeed.Processed) + containers(1).expectMsgPF() { + // The `Some` assures, that it has been retried while the first action was still blocking the invoker. + case Run(runMessageLarge.action, runMessageLarge.msg, Some(_)) => true + } + + // Send another action to the container pool, that would fit memory-wise + pool ! runMessageDifferentEverything + containers(4).expectNoMessage(100.milliseconds) + + // Action 1 is finished -> Action 2 and Action 3 should be executed now + containers(1).send(pool, NeedWork(warmedData())) + feed.expectMsg(MessageFeed.Processed) + containers(2).expectMsgPF() { + // The `Some` assures, that it has been retried while the first action was still blocking the invoker. + case Run(runMessageDifferentNamespace.action, runMessageDifferentNamespace.msg, Some(_)) => true + } + // Assert retryLogline = false to check if this request has been stored in the queue instead of retrying in the system + containers(3).expectMsg(runMessageDifferentAction) + + // Action 3 is finished -> Action 4 should start + containers(3).send(pool, NeedWork(warmedData())) + feed.expectMsg(MessageFeed.Processed) + containers(4).expectMsgPF() { + // The `Some` assures, that it has been retried while the first action was still blocking the invoker. + case Run(runMessageDifferentEverything.action, runMessageDifferentEverything.msg, Some(_)) => true + } + + // Action 2 and 4 are finished + containers(2).send(pool, NeedWork(warmedData())) + feed.expectMsg(MessageFeed.Processed) + containers(4).send(pool, NeedWork(warmedData())) + feed.expectMsg(MessageFeed.Processed) + } } /** @@ -419,18 +572,24 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory { behavior of "ContainerPool remove()" it should "not provide a container if pool is empty" in { - ContainerPool.remove(Map.empty) shouldBe None + ContainerPool.remove(Map.empty, MemoryLimit.stdMemory) shouldBe List.empty } it should "not provide a container from busy pool with non-warm containers" in { val pool = Map('none -> noData(), 'pre -> preWarmedData()) - ContainerPool.remove(pool) shouldBe None + ContainerPool.remove(pool, MemoryLimit.stdMemory) shouldBe List.empty + } + + it should "not provide a container from pool if there is not enough capacity" in { + val pool = Map('first -> warmedData()) + + ContainerPool.remove(pool, MemoryLimit.stdMemory * 2) shouldBe List.empty } it should "provide a container from pool with one single free container" in { val data = warmedData() val pool = Map('warm -> data) - ContainerPool.remove(pool) shouldBe Some('warm) + ContainerPool.remove(pool, MemoryLimit.stdMemory) shouldBe List('warm) } it should "provide oldest container from busy pool with multiple containers" in { @@ -441,6 +600,18 @@ class ContainerPoolObjectTests extends FlatSpec with Matchers with MockFactory { val pool = Map('first -> first, 'second -> second, 'oldest -> oldest) - ContainerPool.remove(pool) shouldBe Some('oldest) + ContainerPool.remove(pool, MemoryLimit.stdMemory) shouldBe List('oldest) + } + + it should "provide a list of the oldest containers from pool, if several containers have to be removed" in { + val namespace = differentNamespace.asString + val first = warmedData(namespace = namespace, lastUsed = Instant.ofEpochMilli(1)) + val second = warmedData(namespace = namespace, lastUsed = Instant.ofEpochMilli(2)) + val third = warmedData(namespace = namespace, lastUsed = Instant.ofEpochMilli(3)) + val oldest = warmedData(namespace = namespace, lastUsed = Instant.ofEpochMilli(0)) + + val pool = Map('first -> first, 'second -> second, 'third -> third, 'oldest -> oldest) + + ContainerPool.remove(pool, MemoryLimit.stdMemory * 2) shouldBe List('oldest, 'first) } } diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala index a46647a..4a2a133 100644 --- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala @@ -168,7 +168,7 @@ class ContainerProxyTests Future.successful(()) } - val poolConfig = ContainerPoolConfig(1, 2, false) + val poolConfig = ContainerPoolConfig(2.MB, false) behavior of "ContainerProxy" diff --git a/tests/src/test/scala/whisk/core/entity/test/SizeTests.scala b/tests/src/test/scala/whisk/core/entity/test/SizeTests.scala index c74f146..d800f14 100644 --- a/tests/src/test/scala/whisk/core/entity/test/SizeTests.scala +++ b/tests/src/test/scala/whisk/core/entity/test/SizeTests.scala @@ -93,6 +93,48 @@ class SizeTests extends FlatSpec with Matchers { } } + // Multiplication + it should "2 B * 10 = 20 B" in { + 2.B * 10 should be(20.B) + } + + it should "40 MB * 2 = 80 MB" in { + 40.MB * 2 should be(80.MB) + } + + // Division + it should "5 Byte / 2 Byte = 2.5" in { + 5.B / 2.B should be(2.5) + } + + it should "1 KB / 512 Byte = 2" in { + 1.KB / 512.B should be(2) + } + + it should "throw an exception if division is through 0 byte" in { + an[ArithmeticException] should be thrownBy { + 1.MB / 0.B + } + } + + it should "5 Byte / 2 = 2 Byte" in { + 5.B / 2 should be(2.B) + } + + it should "1 MB / 512 = 2 Byte" in { + 1.MB / 512 should be(2.KB) + } + + it should "not go into integer overflow for a few GB" in { + 4096.MB / 2 should be(2048.MB) + } + + it should "throw an exception if division is through 0" in { + an[ArithmeticException] should be thrownBy { + 1.MB / 0 + } + } + // Conversions it should "1024 B to KB = 1" in { (1024 B).toKB should be(1) diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala index 4e906d0..211268f 100644 --- a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala +++ b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala @@ -19,12 +19,12 @@ package whisk.core.loadBalancer.test import common.StreamLogging import org.junit.runner.RunWith -import org.scalatest.{FlatSpec, Matchers} import org.scalatest.junit.JUnitRunner +import org.scalatest.{FlatSpec, Matchers} import whisk.common.{ForcibleSemaphore, TransactionId} -import whisk.core.entity.InvokerInstanceId -import whisk.core.loadBalancer._ +import whisk.core.entity.{ByteSize, InvokerInstanceId, MemoryLimit} import whisk.core.loadBalancer.InvokerState._ +import whisk.core.loadBalancer._ /** * Unit tests for the ContainerPool object. @@ -43,13 +43,15 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str def semaphores(count: Int, max: Int): IndexedSeq[ForcibleSemaphore] = IndexedSeq.fill(count)(new ForcibleSemaphore(max)) - def lbConfig(blackboxFraction: Double, invokerBusyThreshold: Int) = + def lbConfig(blackboxFraction: Double, invokerBusyThreshold: ByteSize) = ShardingContainerPoolBalancerConfig(blackboxFraction, invokerBusyThreshold, 1) it should "update invoker's state, growing the slots data and keeping valid old data" in { // start empty val slots = 10 - val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, slots)) + val memoryPerSlot = MemoryLimit.minMemory + val memory = memoryPerSlot * slots + val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, memory)) state.invokers shouldBe 'empty state.blackboxInvokers shouldBe 'empty state.managedInvokers shouldBe 'empty @@ -65,13 +67,13 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str state.blackboxInvokers shouldBe update1 // fallback to at least one state.managedInvokers shouldBe update1 // fallback to at least one state.invokerSlots should have size update1.size - state.invokerSlots.head.availablePermits shouldBe slots + state.invokerSlots.head.availablePermits shouldBe memory.toMB state.managedStepSizes shouldBe Seq(1) state.blackboxStepSizes shouldBe Seq(1) // aquire a slot to alter invoker state - state.invokerSlots.head.tryAcquire() - state.invokerSlots.head.availablePermits shouldBe slots - 1 + state.invokerSlots.head.tryAcquire(memoryPerSlot.toMB.toInt) + state.invokerSlots.head.availablePermits shouldBe (memory - memoryPerSlot).toMB.toInt // apply second update, growing the state val update2 = IndexedSeq(healthy(0), healthy(1)) @@ -81,15 +83,15 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str state.managedInvokers shouldBe IndexedSeq(update2.head) state.blackboxInvokers shouldBe IndexedSeq(update2.last) state.invokerSlots should have size update2.size - state.invokerSlots.head.availablePermits shouldBe slots - 1 - state.invokerSlots(1).availablePermits shouldBe slots + state.invokerSlots.head.availablePermits shouldBe (memory - memoryPerSlot).toMB.toInt + state.invokerSlots(1).availablePermits shouldBe memory.toMB state.managedStepSizes shouldBe Seq(1) state.blackboxStepSizes shouldBe Seq(1) } it should "allow managed partition to overlap with blackbox for small N" in { Seq(0.1, 0.2, 0.3, 0.4, 0.5).foreach { bf => - val state = ShardingContainerPoolBalancerState()(lbConfig(bf, 1)) + val state = ShardingContainerPoolBalancerState()(lbConfig(bf, MemoryLimit.stdMemory)) (1 to 100).toSeq.foreach { i => state.updateInvokers((1 to i).map(_ => healthy(1))) @@ -116,43 +118,49 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str it should "update the cluster size, adjusting the invoker slots accordingly" in { val slots = 10 - val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, slots)) + val memoryPerSlot = MemoryLimit.minMemory + val memory = memoryPerSlot * slots + val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, memory)) state.updateInvokers(IndexedSeq(healthy(0))) - state.invokerSlots.head.tryAcquire() - state.invokerSlots.head.availablePermits shouldBe slots - 1 + state.invokerSlots.head.tryAcquire(memoryPerSlot.toMB.toInt) + state.invokerSlots.head.availablePermits shouldBe (memory - memoryPerSlot).toMB state.updateCluster(2) - state.invokerSlots.head.availablePermits shouldBe slots / 2 // state reset + divided by 2 + state.invokerSlots.head.availablePermits shouldBe memory.toMB / 2 // state reset + divided by 2 } it should "fallback to a size of 1 (alone) if cluster size is < 1" in { val slots = 10 - val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, slots)) + val memoryPerSlot = MemoryLimit.minMemory + val memory = memoryPerSlot * slots + val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, memory)) state.updateInvokers(IndexedSeq(healthy(0))) - state.invokerSlots.head.availablePermits shouldBe slots + state.invokerSlots.head.availablePermits shouldBe memory.toMB state.updateCluster(2) - state.invokerSlots.head.availablePermits shouldBe slots / 2 + state.invokerSlots.head.availablePermits shouldBe memory.toMB / 2 state.updateCluster(0) - state.invokerSlots.head.availablePermits shouldBe slots + state.invokerSlots.head.availablePermits shouldBe memory.toMB state.updateCluster(-1) - state.invokerSlots.head.availablePermits shouldBe slots + state.invokerSlots.head.availablePermits shouldBe memory.toMB } it should "set the threshold to 1 if the cluster is bigger than there are slots on 1 invoker" in { val slots = 10 - val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, slots)) + val memoryPerSlot = MemoryLimit.minMemory + val memory = memoryPerSlot * slots + val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, memory)) state.updateInvokers(IndexedSeq(healthy(0))) - state.invokerSlots.head.availablePermits shouldBe slots + state.invokerSlots.head.availablePermits shouldBe memory.toMB state.updateCluster(20) - state.invokerSlots.head.availablePermits shouldBe 1 + state.invokerSlots.head.availablePermits shouldBe MemoryLimit.minMemory.toMB } behavior of "schedule" @@ -160,7 +168,12 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str implicit val transId = TransactionId.testing it should "return None on an empty invoker list" in { - ShardingContainerPoolBalancer.schedule(IndexedSeq.empty, IndexedSeq.empty, index = 0, step = 2) shouldBe None + ShardingContainerPoolBalancer.schedule( + IndexedSeq.empty, + IndexedSeq.empty, + MemoryLimit.minMemory.toMB.toInt, + index = 0, + step = 2) shouldBe None } it should "return None if no invokers are healthy" in { @@ -168,7 +181,12 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str val invokerSlots = semaphores(invokerCount, 3) val invokers = (0 until invokerCount).map(unhealthy) - ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, index = 0, step = 2) shouldBe None + ShardingContainerPoolBalancer.schedule( + invokers, + invokerSlots, + MemoryLimit.minMemory.toMB.toInt, + index = 0, + step = 2) shouldBe None } it should "choose the first available invoker, jumping in stepSize steps, falling back to randomized scheduling once all invokers are full" in { @@ -178,13 +196,19 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str val expectedResult = Seq(3, 3, 3, 5, 5, 5, 4, 4, 4) val result = expectedResult.map { _ => - ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, index = 0, step = 2).get.toInt + ShardingContainerPoolBalancer + .schedule(invokers, invokerSlots, 1, index = 0, step = 2) + .get + .toInt } result shouldBe expectedResult val bruteResult = (0 to 100).map { _ => - ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, index = 0, step = 2).get.toInt + ShardingContainerPoolBalancer + .schedule(invokers, invokerSlots, 1, index = 0, step = 2) + .get + .toInt } bruteResult should contain allOf (3, 4, 5) @@ -196,20 +220,43 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str val expectedResult = Seq(0, 0, 0, 3, 3, 3) val result = expectedResult.map { _ => - ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, index = 0, step = 1).get.toInt + ShardingContainerPoolBalancer + .schedule(invokers, invokerSlots, 1, index = 0, step = 1) + .get + .toInt } result shouldBe expectedResult // more schedules will result in randomized invokers, but the unhealthy and offline invokers should not be part val bruteResult = (0 to 100).map { _ => - ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, index = 0, step = 1).get.toInt + ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, 1, index = 0, step = 1).get.toInt } bruteResult should contain allOf (0, 3) bruteResult should contain noneOf (1, 2) } + it should "only take invokers that have enough free slots" in { + val invokerCount = 3 + // Each invoker has 4 slots + val invokerSlots = semaphores(invokerCount, 4) + val invokers = (0 until invokerCount).map(i => healthy(i)) + + // Ask for three slots -> First invoker should be used + ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, 3, index = 0, step = 1).get.toInt shouldBe 0 + // Ask for two slots -> Second invoker should be used + ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, 2, index = 0, step = 1).get.toInt shouldBe 1 + // Ask for 1 slot -> First invoker should be used + ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, 1, index = 0, step = 1).get.toInt shouldBe 0 + // Ask for 4 slots -> Third invoker should be used + ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, 4, index = 0, step = 1).get.toInt shouldBe 2 + // Ask for 2 slots -> Second invoker should be used + ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, 2, index = 0, step = 1).get.toInt shouldBe 1 + + invokerSlots.foreach(_.availablePermits shouldBe 0) + } + behavior of "pairwiseCoprimeNumbersUntil" it should "return an empty set for malformed inputs" in {