This is an automated email from the ASF dual-hosted git repository.

cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new a2d9781  Add a loadbalancer with local state and horizontal invoker 
sharding. (#3240)
a2d9781 is described below

commit a2d978108a97a36ad063ceb61dad63419392f076
Author: Markus Thömmes <markusthoem...@me.com>
AuthorDate: Wed Feb 14 08:12:26 2018 +0100

    Add a loadbalancer with local state and horizontal invoker sharding. (#3240)
    
    The current ContainerPoolBalancer suffers a couple of problems and 
bottlenecks:
    
    1. **Inconsistent state:** The data-structures keeping the state for that 
loadbalancer are not thread-safely handled, meaning there can be queuing to 
some invokers even though there is free capacity on other invokers.
    2. **Asynchronously shared state:** Sharing the state is needed for a 
high-available deployment of multiple controllers and for horizontal scale in 
those. Said state-sharing makes point 1 even worse and isn't anywhere fast 
enough to be able to efficiently schedule quick bursts.
    3. **Bottlenecks:** Getting the state from the outside (like for the 
ActivationThrottle) is a very costly operation (at least in the sharedstate 
case) and actually bottlenecks the whole invocation path. Getting the current 
state of the invokers is a second bottleneck, where one request is made to the 
corresponding actor for each invocation.
    
    This new implementation aims to solve the problems mentioned above as 
follows:
    
    1. **All state is local:** There is no shared state. Resources are managed 
through horizontal sharding. Horizontal sharding means: The invokers' slots are 
evenly divided between the loadbalancers in existence. If we deploy 2 
loadbalancers and each invoker has 16 slots, each of the loadbalancers will 
have access to 8 slots on each invoker.
    2. **Slots are given away atomically:** When scheduling an activation, the 
slot is immediatly assigned to that activation (implemented through 
Semaphores). That means: Even in concurrent schedules, there will not be an 
overload on an invoker as long as there is capacity left on that invoker.
    3. **Asynchronous updates of slow data:** Slowly changing data, like a 
change in the invoker's state, is asynchronously handled and updated to a local 
version of the state. Querying the state is as cheap as it can be.
---
 ansible/group_vars/all                             |   2 +
 ansible/roles/controller/tasks/deploy.yml          |   1 +
 .../scala/whisk/common/ForcableSemaphore.scala     | 124 ++++++
 .../scala/whisk/core/controller/Controller.scala   |   2 +
 .../core/entitlement/ActivationThrottler.scala     |  28 +-
 .../scala/whisk/core/entitlement/Entitlement.scala |  80 ++--
 .../whisk/core/entitlement/RateThrottler.scala     |  11 +-
 .../core/loadBalancer/ContainerPoolBalancer.scala  |   2 +
 .../core/loadBalancer/InvokerSupervision.scala     |  12 +-
 .../whisk/core/loadBalancer/LoadBalancer.scala     |   5 +
 .../ShardingContainerPoolBalancer.scala            | 453 +++++++++++++++++++++
 docs/deploy.md                                     |   6 +
 tests/src/test/scala/limits/ThrottleTests.scala    |  21 +-
 .../whisk/common/ForcableSemaphoreTests.scala      |  88 ++++
 .../core/controller/test/RateThrottleTests.scala   |   6 +-
 .../test/ShardingContainerPoolBalancerTests.scala  | 174 ++++++++
 16 files changed, 947 insertions(+), 68 deletions(-)

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 54c2d68..8f14ea6 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -56,6 +56,8 @@ controller:
       seedNodes: "{{ groups['controllers'] | map('extract', hostvars, 
'ansible_host') | list }}"
   # We recommend to enable HA for the controllers only, if bookkeeping data 
are shared too. (localBookkeeping: false)
   ha: "{{ controller_enable_ha | default(True) and groups['controllers'] | 
length > 1 }}"
+  loadbalancer:
+    spi: "{{ controller_loadbalancer_spi | default('') }}"
   loglevel: "{{ controller_loglevel | default(whisk_loglevel) | 
default('INFO') }}"
 
 jmx:
diff --git a/ansible/roles/controller/tasks/deploy.yml 
b/ansible/roles/controller/tasks/deploy.yml
index 9c19dd4..3fa07b5 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -157,6 +157,7 @@
       "CONFIG_kamon_statsd_port": "{{ metrics.kamon.port }}"
 
       "CONFIG_whisk_spi_LogStoreProvider": "{{ userLogs.spi }}"
+      "CONFIG_whisk_spi_LoadBalancerProvider": "{{ controller.loadbalancer.spi 
}}"
       
       "CONFIG_logback_log_level": "{{ controller.loglevel }}"
 
diff --git a/common/scala/src/main/scala/whisk/common/ForcableSemaphore.scala 
b/common/scala/src/main/scala/whisk/common/ForcableSemaphore.scala
new file mode 100644
index 0000000..9544a30
--- /dev/null
+++ b/common/scala/src/main/scala/whisk/common/ForcableSemaphore.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.common
+
+import java.util.concurrent.locks.AbstractQueuedSynchronizer
+
+import scala.annotation.tailrec
+
+/**
+ * A Semaphore, which in addition to the usual features has means to force 
more clients to get permits.
+ *
+ * Like any usual Semaphore, this implementation will give away at most 
`maxAllowed` permits when used the "usual" way.
+ * In addition to that, it also has a `forceAcquire` method which will push 
the Semaphore's remaining permits into a
+ * negative value. Getting permits using `tryAcquire` will only be possible 
once the permits value is in a positive
+ * state again.
+ *
+ * As this is (now) only used for the loadbalancer's scheduling, this does not 
implement the "whole" Java Semaphore's
+ * interface but only the methods needed.
+ *
+ * @param maxAllowed maximum number of permits given away by `tryAcquire`
+ */
+class ForcableSemaphore(maxAllowed: Int) {
+  class Sync extends AbstractQueuedSynchronizer {
+    setState(maxAllowed)
+
+    def permits: Int = getState
+
+    /** Try to release a permit and return whether or not that operation was 
successful. */
+    @tailrec
+    override final def tryReleaseShared(releases: Int): Boolean = {
+      val current = getState
+      val next = current + releases
+      if (next < current) { // integer overflow
+        throw new Error("Maximum permit count exceeded, permit variable 
overflowed")
+      }
+      if (compareAndSetState(current, next)) {
+        true
+      } else {
+        tryReleaseShared(releases)
+      }
+    }
+
+    /**
+     * Try to acquire a permit and return whether or not that operation was 
successful. Requests may not finish in FIFO
+     * order, hence this method is not necessarily fair.
+     */
+    @tailrec
+    final def nonFairTryAcquireShared(acquires: Int): Int = {
+      val available = getState
+      val remaining = available - acquires
+      if (remaining < 0 || compareAndSetState(available, remaining)) {
+        remaining
+      } else {
+        nonFairTryAcquireShared(acquires)
+      }
+    }
+
+    /**
+     * Basically the same as `nonFairTryAcquireShared`, but does bound to a 
minimal value of 0 so permits can get
+     * negative.
+     */
+    @tailrec
+    final def forceAquireShared(acquires: Int): Unit = {
+      val available = getState
+      val remaining = available - acquires
+      if (!compareAndSetState(available, remaining)) {
+        forceAquireShared(acquires)
+      }
+    }
+  }
+
+  val sync = new Sync
+
+  /**
+   * Acquires the given numbers of permits.
+   *
+   * @param acquires the number of permits to get
+   * @return `true`, iff the internal semaphore's number of permits is 
positive, `false` if negative
+   */
+  def tryAcquire(acquires: Int = 1): Boolean = {
+    require(acquires > 0, "cannot acquire negative or no permits")
+    sync.nonFairTryAcquireShared(acquires) >= 0
+  }
+
+  /**
+   * Forces the amount of permits.
+   *
+   * This possibly pushes the internal number of available permits to a 
negative value.
+   *
+   * @param acquires the number of permits to get
+   */
+  def forceAcquire(acquires: Int = 1): Unit = {
+    require(acquires > 0, "cannot force acquire negative or no permits")
+    sync.forceAquireShared(acquires)
+  }
+
+  /**
+   * Releases the given amount of permits
+   *
+   * @param acquires the number of permits to release
+   */
+  def release(acquires: Int = 1): Unit = {
+    require(acquires > 0, "cannot release negative or no permits")
+    sync.releaseShared(acquires)
+  }
+
+  /** Returns the number of currently available permits. Possibly negative. */
+  def availablePermits: Int = sync.permits
+}
diff --git 
a/core/controller/src/main/scala/whisk/core/controller/Controller.scala 
b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
index 2ccbf2f..00901bf 100644
--- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala
+++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala
@@ -118,6 +118,8 @@ class Controller(val instance: InstanceId,
   // initialize backend services
   private implicit val loadBalancer =
     SpiLoader.get[LoadBalancerProvider].loadBalancer(whiskConfig, instance)
+  logging.info(this, s"loadbalancer initialized: 
${loadBalancer.getClass.getSimpleName}")(TransactionId.controller)
+
   private implicit val entitlementProvider = new 
LocalEntitlementProvider(whiskConfig, loadBalancer)
   private implicit val activationIdFactory = new ActivationIdGenerator {}
   private implicit val logStore = 
SpiLoader.get[LogStoreProvider].logStore(actorSystem)
diff --git 
a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
 
b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
index 1a787e5..563e7fe 100644
--- 
a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
+++ 
b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala
@@ -26,32 +26,28 @@ import whisk.http.Messages
 import scala.concurrent.{ExecutionContext, Future}
 
 /**
- * Determines user limits and activation counts as seen by the invoker and the 
loadbalancer
- * in a scheduled, repeating task for other services to get the cached 
information to be able
- * to calculate and determine whether the namespace currently invoking a new 
action should
- * be allowed to do so.
+ * Determine whether the namespace currently invoking a new action should be 
allowed to do so.
  *
- * @param loadbalancer contains active quotas
- * @param defaultConcurrencyLimit the default max allowed concurrent operations
+ * @param loadBalancer contains active quotas
+ * @param concurrencyLimit a calculated limit relative to the user using the 
system
  * @param systemOverloadLimit the limit when the system is considered 
overloaded
  */
-class ActivationThrottler(loadBalancer: LoadBalancer, defaultConcurrencyLimit: 
Int, systemOverloadLimit: Int)(
+class ActivationThrottler(loadBalancer: LoadBalancer, concurrencyLimit: 
Identity => Int, systemOverloadLimit: Int)(
   implicit logging: Logging,
   executionContext: ExecutionContext) {
 
-  logging.info(this, s"concurrencyLimit = $defaultConcurrencyLimit, 
systemOverloadLimit = $systemOverloadLimit")(
-    TransactionId.controller)
+  logging.info(this, s"systemOverloadLimit = 
$systemOverloadLimit")(TransactionId.controller)
 
   /**
    * Checks whether the operation should be allowed to proceed.
    */
   def check(user: Identity)(implicit tid: TransactionId): Future[RateLimit] = {
     loadBalancer.activeActivationsFor(user.uuid).map { concurrentActivations =>
-      val concurrencyLimit = 
user.limits.concurrentInvocations.getOrElse(defaultConcurrencyLimit)
+      val currentLimit = concurrencyLimit(user)
       logging.debug(
         this,
-        s"namespace = ${user.uuid.asString}, concurrent activations = 
$concurrentActivations, below limit = $concurrencyLimit")
-      ConcurrentRateLimit(concurrentActivations, concurrencyLimit)
+        s"namespace = ${user.uuid.asString}, concurrent activations = 
$concurrentActivations, below limit = $currentLimit")
+      ConcurrentRateLimit(concurrentActivations, currentLimit)
     }
   }
 
@@ -76,11 +72,11 @@ sealed trait RateLimit {
 }
 
 case class ConcurrentRateLimit(count: Int, allowed: Int) extends RateLimit {
-  val ok = count < allowed // must have slack for the current activation 
request
-  override def errorMsg = Messages.tooManyConcurrentRequests(count, allowed)
+  val ok: Boolean = count < allowed // must have slack for the current 
activation request
+  override def errorMsg: String = Messages.tooManyConcurrentRequests(count, 
allowed)
 }
 
 case class TimedRateLimit(count: Int, allowed: Int) extends RateLimit {
-  val ok = count <= allowed // the count is already updated to account for the 
current request
-  override def errorMsg = Messages.tooManyRequests(count, allowed)
+  val ok: Boolean = count <= allowed // the count is already updated to 
account for the current request
+  override def errorMsg: String = Messages.tooManyRequests(count, allowed)
 }
diff --git 
a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala 
b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
index 9f97d71..d148eb7 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala
@@ -19,23 +19,20 @@ package whisk.core.entitlement
 
 import scala.collection.concurrent.TrieMap
 import scala.collection.immutable.Set
-import scala.concurrent.Future
+import scala.concurrent.{ExecutionContext, Future}
 import scala.util.Failure
 import scala.util.Success
-
 import akka.actor.ActorSystem
 import akka.http.scaladsl.model.StatusCodes.Forbidden
 import akka.http.scaladsl.model.StatusCodes.TooManyRequests
-
 import whisk.core.entitlement.Privilege.ACTIVATE
-import whisk.core.entitlement.Privilege._
 import whisk.core.entitlement.Privilege.REJECT
 import whisk.common.Logging
 import whisk.common.TransactionId
 import whisk.core.WhiskConfig
 import whisk.core.controller.RejectRequest
 import whisk.core.entity._
-import whisk.core.loadBalancer.LoadBalancer
+import whisk.core.loadBalancer.{LoadBalancer, ShardingContainerPoolBalancer}
 import whisk.http.ErrorResponse
 import whisk.http.Messages
 import whisk.http.Messages._
@@ -57,10 +54,10 @@ protected[core] case class Resource(namespace: EntityPath,
                                     collection: Collection,
                                     entity: Option[String],
                                     env: Option[Parameters] = None) {
-  def parent = collection.path + EntityPath.PATHSEP + namespace
-  def id = parent + entity.map(EntityPath.PATHSEP + _).getOrElse("")
-  def fqname = namespace.asString + entity.map(EntityPath.PATHSEP + 
_).getOrElse("")
-  override def toString = id
+  def parent: String = collection.path + EntityPath.PATHSEP + namespace
+  def id: String = parent + entity.map(EntityPath.PATHSEP + _).getOrElse("")
+  def fqname: String = namespace.asString + entity.map(EntityPath.PATHSEP + 
_).getOrElse("")
+  override def toString: String = id
 }
 
 protected[core] object EntitlementProvider {
@@ -82,42 +79,69 @@ protected[core] abstract class EntitlementProvider(config: 
WhiskConfig, loadBala
   implicit actorSystem: ActorSystem,
   logging: Logging) {
 
-  private implicit val executionContext = actorSystem.dispatcher
-
-  /**
-   * The number of controllers if HA is enabled, 1 otherwise
-   */
-  private val diviser = if (config.controllerHighAvailability) 
config.controllerInstances.toInt else 1
+  private implicit val executionContext: ExecutionContext = 
actorSystem.dispatcher
 
   /**
    * Allows 20% of additional requests on top of the limit to mitigate 
possible unfair round-robin loadbalancing between
    * controllers
    */
   private val overcommit = if (config.controllerHighAvailability) 1.2 else 1
+  private def dilateLimit(limit: Int): Int = Math.ceil(limit.toDouble * 
overcommit).toInt
 
   /**
-   * Adjust the throttles for a single controller with the diviser and the 
overcommit.
+   * Calculates a possibly dilated limit relative to the current user.
    *
-   * @param originalThrottle The throttle that needs to be adjusted for this 
controller.
+   * @param defaultLimit the default limit across the whole system
+   * @param user the user to apply that limit to
+   * @return a calculated limit
    */
-  private def dilateThrottle(originalThrottle: Int): Int = {
-    Math.ceil((originalThrottle.toDouble / diviser.toDouble) * 
overcommit).toInt
+  private def calculateLimit(defaultLimit: Int, overrideLimit: Identity => 
Option[Int])(user: Identity): Int = {
+    val absoluteLimit = overrideLimit(user).getOrElse(defaultLimit)
+    dilateLimit(absoluteLimit)
+  }
+
+  /**
+   * Calculates a limit which applies only to this instance individually.
+   *
+   * The state needed to correctly check this limit is not shared between all 
instances, which want to check that
+   * limit, so it needs to be divided between the parties who want to perform 
that check.
+   *
+   * @param defaultLimit the default limit across the whole system
+   * @param user the user to apply that limit to
+   * @return a calculated limit
+   */
+  private def calculateIndividualLimit(defaultLimit: Int, overrideLimit: 
Identity => Option[Int])(
+    user: Identity): Int = {
+    val limit = calculateLimit(defaultLimit, overrideLimit)(user)
+    if (limit == 0) {
+      0
+    } else {
+      // Edge case: Iff the divided limit is < 1 no loadbalancer would allow 
an action to be executed, thus we range
+      // bound to at least 1
+      (limit / loadBalancer.clusterSize).max(1)
+    }
   }
 
   private val invokeRateThrottler =
     new RateThrottler(
       "actions per minute",
-      dilateThrottle(config.actionInvokePerMinuteLimit.toInt),
-      _.limits.invocationsPerMinute.map(dilateThrottle))
+      calculateIndividualLimit(config.actionInvokePerMinuteLimit.toInt, 
_.limits.invocationsPerMinute))
   private val triggerRateThrottler =
     new RateThrottler(
       "triggers per minute",
-      dilateThrottle(config.triggerFirePerMinuteLimit.toInt),
-      _.limits.firesPerMinute.map(dilateThrottle))
-  private val concurrentInvokeThrottler = new ActivationThrottler(
-    loadBalancer,
-    config.actionInvokeConcurrentLimit.toInt,
-    config.actionInvokeSystemOverloadLimit.toInt)
+      calculateIndividualLimit(config.triggerFirePerMinuteLimit.toInt, 
_.limits.firesPerMinute))
+
+  private val activationThrottleCalculator = loadBalancer match {
+    // This loadbalancer applies sharding and does not share any state
+    case _: ShardingContainerPoolBalancer => calculateIndividualLimit _
+    // Activation relevant data is shared by all other loadbalancers
+    case _ => calculateLimit _
+  }
+  private val concurrentInvokeThrottler =
+    new ActivationThrottler(
+      loadBalancer,
+      activationThrottleCalculator(config.actionInvokeConcurrentLimit.toInt, 
_.limits.concurrentInvocations),
+      config.actionInvokeSystemOverloadLimit.toInt)
 
   /**
    * Grants a subject the right to access a resources.
@@ -262,7 +286,7 @@ protected[core] abstract class EntitlementProvider(config: 
WhiskConfig, loadBala
     implicit transid: TransactionId): Future[Set[(Resource, Boolean)]] = {
     // check the default namespace first, bypassing additional checks if 
permitted
     val defaultNamespaces = Set(user.namespace.asString)
-    implicit val es = this
+    implicit val es: EntitlementProvider = this
 
     Future.sequence {
       resources.map { resource =>
diff --git 
a/core/controller/src/main/scala/whisk/core/entitlement/RateThrottler.scala 
b/core/controller/src/main/scala/whisk/core/entitlement/RateThrottler.scala
index fd0b56a..9ce7216 100644
--- a/core/controller/src/main/scala/whisk/core/entitlement/RateThrottler.scala
+++ b/core/controller/src/main/scala/whisk/core/entitlement/RateThrottler.scala
@@ -30,10 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger
  *
  * For now, we throttle only at a 1-minute granularity.
  */
-class RateThrottler(description: String, defaultMaxPerMinute: Int, 
overrideMaxPerMinute: Identity => Option[Int])(
-  implicit logging: Logging) {
-
-  logging.debug(this, s"$description: defaultMaxPerMinute = 
$defaultMaxPerMinute")(TransactionId.controller)
+class RateThrottler(description: String, maxPerMinute: Identity => 
Int)(implicit logging: Logging) {
 
   /**
    * Maintains map of subject namespace to operations rates.
@@ -50,7 +47,7 @@ class RateThrottler(description: String, defaultMaxPerMinute: 
Int, overrideMaxPe
   def check(user: Identity)(implicit transid: TransactionId): RateLimit = {
     val uuid = user.uuid // this is namespace identifier
     val throttle = rateMap.getOrElseUpdate(uuid, new RateInfo)
-    val limit = overrideMaxPerMinute(user).getOrElse(defaultMaxPerMinute)
+    val limit = maxPerMinute(user)
     val rate = TimedRateLimit(throttle.update(limit), limit)
     logging.debug(this, s"namespace = ${uuid.asString} rate = ${rate.count}, 
limit = $limit")
     rate
@@ -61,7 +58,7 @@ class RateThrottler(description: String, defaultMaxPerMinute: 
Int, overrideMaxPe
  * Tracks the activation rate of one subject at minute-granularity.
  */
 private class RateInfo {
-  @volatile var lastMin = getCurrentMinute
+  @volatile var lastMin: Long = getCurrentMinute
   val lastMinCount = new AtomicInteger()
 
   /**
@@ -77,7 +74,7 @@ private class RateInfo {
     lastMinCount.incrementAndGet()
   }
 
-  def roll() = {
+  def roll(): Unit = {
     val curMin = getCurrentMinute
     if (curMin != lastMin) {
       lastMin = curMin
diff --git 
a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
 
b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
index 46cb15d..f3c2f0d 100644
--- 
a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
+++ 
b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
@@ -92,6 +92,8 @@ class ContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: InstanceId)
 
   override def totalActiveActivations = loadBalancerData.totalActivationCount
 
+  override def clusterSize = if (config.controllerHighAvailability) 
config.controllerInstances.toInt else 1
+
   /**
    * Tries to fill in the result slot (i.e., complete the promise) when a 
completion message arrives.
    * The promise is removed form the map when the result arrives or upon 
timeout.
diff --git 
a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
 
b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
index 0c75176..1ac686d 100644
--- 
a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
+++ 
b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala
@@ -53,6 +53,9 @@ case object UnHealthy extends InvokerState { val asString = 
"unhealthy" }
 case class ActivationRequest(msg: ActivationMessage, invoker: InstanceId)
 case class InvocationFinishedMessage(invokerInstance: InstanceId, successful: 
Boolean)
 
+// Sent to a monitor if the state changed
+case class CurrentInvokerPoolState(newState: IndexedSeq[InvokerHealth])
+
 // Data stored in the Invoker
 final case class InvokerInfo(buffer: RingBuffer[Boolean])
 
@@ -69,7 +72,8 @@ final case class InvokerInfo(buffer: RingBuffer[Boolean])
  */
 class InvokerPool(childFactory: (ActorRefFactory, InstanceId) => ActorRef,
                   sendActivationToInvoker: (ActivationMessage, InstanceId) => 
Future[RecordMetadata],
-                  pingConsumer: MessageConsumer)
+                  pingConsumer: MessageConsumer,
+                  monitor: Option[ActorRef])
     extends Actor {
 
   implicit val transid = TransactionId.invokerHealth
@@ -113,6 +117,7 @@ class InvokerPool(childFactory: (ActorRefFactory, 
InstanceId) => ActorRef,
   }
 
   def logStatus() = {
+    monitor.foreach(_ ! CurrentInvokerPoolState(status))
     val pretty = status.map(i => s"${i.id.toInt} -> ${i.status}")
     logging.info(this, s"invoker status changed to ${pretty.mkString(", ")}")
   }
@@ -208,8 +213,9 @@ object InvokerPool {
 
   def props(f: (ActorRefFactory, InstanceId) => ActorRef,
             p: (ActivationMessage, InstanceId) => Future[RecordMetadata],
-            pc: MessageConsumer) = {
-    Props(new InvokerPool(f, p, pc))
+            pc: MessageConsumer,
+            m: Option[ActorRef] = None) = {
+    Props(new InvokerPool(f, p, pc, m))
   }
 
   /** A stub identity for invoking the test action. This does not need to be a 
valid identity. */
diff --git 
a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala 
b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
index 8f2227f..ab8db1c 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/LoadBalancer.scala
@@ -38,6 +38,8 @@ class InvokerHealth(val id: InstanceId, val status: 
InvokerState) {
     case that: InvokerHealth => that.id == this.id && that.status == 
this.status
     case _                   => false
   }
+
+  override def toString = s"InvokerHealth($id, $status)"
 }
 
 trait LoadBalancer {
@@ -69,6 +71,9 @@ trait LoadBalancer {
 
   /** Gets the number of in-flight activations in the system. */
   def totalActiveActivations: Future[Int]
+
+  /** Gets the size of the cluster all loadbalancers are acting in */
+  def clusterSize: Int = 1
 }
 
 /**
diff --git 
a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
 
b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
new file mode 100644
index 0000000..5da863d
--- /dev/null
+++ 
b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.loadBalancer
+
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.LongAdder
+import java.util.concurrent.ThreadLocalRandom
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.cluster.ClusterEvent._
+import akka.cluster.{Cluster, Member, MemberStatus}
+import akka.event.Logging.InfoLevel
+import akka.stream.ActorMaterializer
+import org.apache.kafka.clients.producer.RecordMetadata
+import pureconfig._
+import whisk.common._
+import whisk.core.WhiskConfig._
+import whisk.core.connector._
+import whisk.core.entity._
+import whisk.core.{ConfigKeys, WhiskConfig}
+import whisk.spi.SpiLoader
+
+import scala.annotation.tailrec
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success}
+
+/**
+ * A loadbalancer that uses "horizontal" sharding to not collide with fellow 
loadbalancers.
+ *
+ * Horizontal sharding means, that each invoker's capacity is evenly divided 
between the loadbalancers. If an invoker
+ * has at most 16 slots available, those will be divided to 8 slots for each 
loadbalancer (if there are 2).
+ */
+class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: 
InstanceId)(
+  implicit val actorSystem: ActorSystem,
+  logging: Logging,
+  materializer: ActorMaterializer)
+    extends LoadBalancer {
+
+  private implicit val executionContext: ExecutionContext = 
actorSystem.dispatcher
+
+  /** Build a cluster of all loadbalancers */
+  val seedNodesProvider = new 
StaticSeedNodesProvider(config.controllerSeedNodes, actorSystem.name)
+  val cluster = Cluster(actorSystem)
+  cluster.joinSeedNodes(seedNodesProvider.getSeedNodes())
+
+  /** Used to manage an action for testing invoker health */
+  private val entityStore = WhiskEntityStore.datastore(config)
+
+  /** State related to invocations and throttling */
+  private val activations = TrieMap[ActivationId, ActivationEntry]()
+  private val activationsPerNamespace = TrieMap[UUID, LongAdder]()
+  private val totalActivations = new LongAdder()
+
+  /** State needed for scheduling. */
+  private val schedulingState = ShardingContainerPoolBalancerState()()
+
+  /**
+   * Monitors invoker supervision and the cluster to update the state 
sequentially
+   *
+   * All state updates should go through this actor to guarantee, that 
`updateState` and `updateCluster` are called
+   * mutually exclusive and not concurrently.
+   */
+  private val monitor = actorSystem.actorOf(Props(new Actor {
+    override def preStart(): Unit = {
+      cluster.subscribe(self, classOf[MemberEvent], classOf[ReachabilityEvent])
+    }
+
+    // all members of the cluster that are available
+    var availableMembers = Set.empty[Member]
+
+    override def receive: Receive = {
+      case CurrentInvokerPoolState(newState) =>
+        schedulingState.updateInvokers(newState)
+
+      // State of the cluster as it is right now
+      case CurrentClusterState(members, _, _, _, _) =>
+        availableMembers = members.filter(_.status == MemberStatus.Up)
+        schedulingState.updateCluster(availableMembers.size)
+
+      // General lifecycle events and events concerning the reachability of 
members. Split-brain is not a huge concern
+      // in this case as only the invoker-threshold is adjusted according to 
the perceived cluster-size.
+      // Taking the unreachable member out of the cluster from that 
point-of-view results in a better experience
+      // even under split-brain-conditions, as that (in the worst-case) 
results in premature overloading of invokers vs.
+      // going into overflow mode prematurely.
+      case event: ClusterDomainEvent =>
+        availableMembers = event match {
+          case MemberUp(member)          => availableMembers + member
+          case ReachableMember(member)   => availableMembers + member
+          case MemberRemoved(member, _)  => availableMembers - member
+          case UnreachableMember(member) => availableMembers - member
+          case _                         => availableMembers
+        }
+
+        schedulingState.updateCluster(availableMembers.size)
+    }
+  }))
+
+  /** Loadbalancer interface methods */
+  override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = 
Future.successful(schedulingState.invokers)
+  override def activeActivationsFor(namespace: UUID): Future[Int] =
+    
Future.successful(activationsPerNamespace.get(namespace).map(_.intValue()).getOrElse(0))
+  override def totalActiveActivations: Future[Int] = 
Future.successful(totalActivations.intValue())
+  override def clusterSize: Int = schedulingState.clusterSize
+
+  /** 1. Publish a message to the loadbalancer */
+  override def publish(action: ExecutableWhiskActionMetaData, msg: 
ActivationMessage)(
+    implicit transid: TransactionId): Future[Future[Either[ActivationId, 
WhiskActivation]]] = {
+
+    val invokersToUse = if (!action.exec.pull) schedulingState.managedInvokers 
else schedulingState.blackboxInvokers
+    val chosen = if (invokersToUse.nonEmpty) {
+      val hash = 
ShardingContainerPoolBalancer.generateHash(msg.user.namespace, 
action.fullyQualifiedName(false))
+      val homeInvoker = hash % invokersToUse.size
+      val stepSize = schedulingState.stepSizes(hash % 
schedulingState.stepSizes.size)
+      ShardingContainerPoolBalancer.schedule(invokersToUse, 
schedulingState.invokerSlots, homeInvoker, stepSize)
+    } else {
+      None
+    }
+
+    chosen
+      .map { invoker =>
+        val entry = setupActivation(msg, action, invoker)
+        sendActivationToInvoker(messageProducer, msg, invoker).map { _ =>
+          entry.promise.future
+        }
+      }
+      .getOrElse(Future.failed(LoadBalancerException("No invokers available")))
+  }
+
+  /** 2. Update local state with the to be executed activation */
+  private def setupActivation(msg: ActivationMessage,
+                              action: ExecutableWhiskActionMetaData,
+                              instance: InstanceId): ActivationEntry = {
+
+    totalActivations.increment()
+    activationsPerNamespace.getOrElseUpdate(msg.user.uuid, new 
LongAdder()).increment()
+
+    val timeout = action.limits.timeout.duration.max(TimeLimit.STD_DURATION) + 
1.minute
+    // Install a timeout handler for the catastrophic case where an active ack 
is not received at all
+    // (because say an invoker is down completely, or the connection to the 
message bus is disrupted) or when
+    // the active ack is significantly delayed (possibly dues to long queues 
but the subject should not be penalized);
+    // in this case, if the activation handler is still registered, remove it 
and update the books.
+    activations.getOrElseUpdate(
+      msg.activationId, {
+        val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
+          processCompletion(Left(msg.activationId), msg.transid, forced = 
true, invoker = instance)
+        }
+
+        // please note: timeoutHandler.cancel must be called on all 
non-timeout paths, e.g. Success
+        ActivationEntry(
+          msg.activationId,
+          msg.user.uuid,
+          instance,
+          timeoutHandler,
+          Promise[Either[ActivationId, WhiskActivation]]())
+      })
+  }
+
+  private val messagingProvider = SpiLoader.get[MessagingProvider]
+  private val messageProducer = messagingProvider.getProducer(config, 
executionContext)
+
+  /** 3. Send the activation to the invoker */
+  private def sendActivationToInvoker(producer: MessageProducer,
+                                      msg: ActivationMessage,
+                                      invoker: InstanceId): 
Future[RecordMetadata] = {
+    implicit val transid: TransactionId = msg.transid
+
+    val topic = s"invoker${invoker.toInt}"
+
+    
MetricEmitter.emitCounterMetric(LoggingMarkers.LOADBALANCER_ACTIVATION_START(msg.user.uuid.asString))
+    val start = transid.started(
+      this,
+      LoggingMarkers.CONTROLLER_KAFKA,
+      s"posting topic '$topic' with activation id '${msg.activationId}'",
+      logLevel = InfoLevel)
+
+    producer.send(topic, msg).andThen {
+      case Success(status) =>
+        transid.finished(
+          this,
+          start,
+          s"posted to 
${status.topic()}[${status.partition()}][${status.offset()}]",
+          logLevel = InfoLevel)
+      case Failure(e) => transid.failed(this, start, s"error on posting to 
topic $topic")
+    }
+  }
+
+  /**
+   * Subscribes to active acks (completion messages from the invokers), and
+   * registers a handler for received active acks from invokers.
+   */
+  private val maxActiveAcksPerPoll = 128
+  private val activeAckPollDuration = 1.second
+  private val activeAckConsumer =
+    messagingProvider.getConsumer(
+      config,
+      "completions",
+      s"completed${controllerInstance.toInt}",
+      maxPeek = maxActiveAcksPerPoll)
+
+  private val activationFeed = actorSystem.actorOf(Props {
+    new MessageFeed(
+      "activeack",
+      logging,
+      activeAckConsumer,
+      maxActiveAcksPerPoll,
+      activeAckPollDuration,
+      processActiveAck)
+  })
+
+  /** 4. Get the active-ack message and parse it */
+  private def processActiveAck(bytes: Array[Byte]): Future[Unit] = Future {
+    val raw = new String(bytes, StandardCharsets.UTF_8)
+    CompletionMessage.parse(raw) match {
+      case Success(m: CompletionMessage) =>
+        processCompletion(m.response, m.transid, forced = false, invoker = 
m.invoker)
+        activationFeed ! MessageFeed.Processed
+
+      case Failure(t) =>
+        activationFeed ! MessageFeed.Processed
+        logging.error(this, s"failed processing message: $raw with $t")
+    }
+  }
+
+  /** 5. Process the active-ack and update the state accordingly */
+  private def processCompletion(response: Either[ActivationId, 
WhiskActivation],
+                                tid: TransactionId,
+                                forced: Boolean,
+                                invoker: InstanceId): Unit = {
+    val aid = response.fold(l => l, r => r.activationId)
+
+    // treat left as success (as it is the result of a message exceeding the 
bus limit)
+    val isSuccess = response.fold(l => true, r => !r.response.isWhiskError)
+
+    activations.remove(aid) match {
+      case Some(entry) =>
+        if (!forced) {
+          entry.timeoutHandler.cancel()
+          entry.promise.trySuccess(response)
+        } else {
+          entry.promise.tryFailure(new Throwable("no active ack received"))
+        }
+
+        totalActivations.decrement()
+        activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
+        schedulingState.invokerSlots.lift(invoker.toInt).foreach(_.release())
+
+        logging.info(this, s"${if (!forced) "received" else "forced"} active 
ack for '$aid'")(tid)
+        // Active acks that are received here are strictly from user actions - 
health actions are not part of
+        // the load balancer's activation map. Inform the invoker pool 
supervisor of the user action completion.
+        invokerPool ! InvocationFinishedMessage(invoker, isSuccess)
+      case None if !forced =>
+        // the entry has already been removed but we receive an active ack for 
this activation Id.
+        // This happens for health actions, because they don't have an entry 
in Loadbalancerdata or
+        // for activations that already timed out.
+        invokerPool ! InvocationFinishedMessage(invoker, isSuccess)
+        logging.debug(this, s"received active ack for '$aid' which has no 
entry")(tid)
+      case None =>
+        // the entry has already been removed by an active ack. This part of 
the code is reached by the timeout.
+        // As the active ack is already processed we don't have to do anything 
here.
+        logging.debug(this, s"forced active ack for '$aid' which has no 
entry")(tid)
+    }
+  }
+
+  private val invokerPool = {
+    InvokerPool.prepare(controllerInstance, WhiskEntityStore.datastore(config))
+
+    actorSystem.actorOf(
+      InvokerPool.props(
+        (f, i) => f.actorOf(InvokerActor.props(i, controllerInstance)),
+        (m, i) => sendActivationToInvoker(messageProducer, m, i),
+        messagingProvider.getConsumer(config, 
s"health${controllerInstance.toInt}", "health", maxPeek = 128),
+        Some(monitor)))
+  }
+}
+
+object ShardingContainerPoolBalancer extends LoadBalancerProvider {
+
+  override def loadBalancer(whiskConfig: WhiskConfig, instance: InstanceId)(
+    implicit actorSystem: ActorSystem,
+    logging: Logging,
+    materializer: ActorMaterializer): LoadBalancer = new 
ShardingContainerPoolBalancer(whiskConfig, instance)
+
+  def requiredProperties: Map[String, String] =
+    kafkaHosts ++
+      Map(controllerLocalBookkeeping -> null, controllerSeedNodes -> null)
+
+  def generateHash(namespace: EntityName, action: FullyQualifiedEntityName): 
Int = {
+    (namespace.asString.hashCode() ^ action.asString.hashCode()).abs
+  }
+
+  /**
+   * Scans through all invokers and searches for an invoker tries to get a 
free slot on an invoker. If no slot can be
+   * obtained, randomly picks a healthy invoker.
+   *
+   * @param invokers a list of available invokers to search in, including 
their state
+   * @param dispatched semaphores for each invoker to give the slots away from
+   * @param index the index to start from (initially should be the 
"homeInvoker"
+   * @param step stable identifier of the entity to be scheduled
+   * @return an invoker to schedule to or None of no invoker is available
+   */
+  @tailrec
+  def schedule(invokers: IndexedSeq[InvokerHealth],
+               dispatched: IndexedSeq[ForcableSemaphore],
+               index: Int,
+               step: Int,
+               stepsDone: Int = 0): Option[InstanceId] = {
+    val numInvokers = invokers.size
+
+    if (numInvokers > 0) {
+      val invoker = invokers(index)
+      // If the current invoker is healthy and we can get a slot
+      if (invoker.status == Healthy && 
dispatched(invoker.id.toInt).tryAcquire()) {
+        Some(invoker.id)
+      } else {
+        // If we've gone through all invokers
+        if (stepsDone == numInvokers + 1) {
+          val healthyInvokers = invokers.filter(_.status == Healthy)
+          if (healthyInvokers.nonEmpty) {
+            // Choose a healthy invoker randomly
+            val random = 
ThreadLocalRandom.current().nextInt(healthyInvokers.size)
+            dispatched(random).forceAcquire()
+            Some(healthyInvokers(random).id)
+          } else {
+            None
+          }
+        } else {
+          val newIndex = (index + step) % numInvokers
+          schedule(invokers, dispatched, newIndex, step, stepsDone + 1)
+        }
+      }
+    } else {
+      None
+    }
+  }
+}
+
+/**
+ * Holds the state necessary for scheduling of actions.
+ *
+ * @param _invokers all of the known invokers in the system
+ * @param _managedInvokers all invokers for managed runtimes
+ * @param _blackboxInvokers all invokers for blackbox runtimes
+ * @param _stepSizes the step-sizes possible for the current invoker count
+ * @param _invokerSlots state of accessible slots of each invoker
+ */
+case class ShardingContainerPoolBalancerState(
+  private var _invokers: IndexedSeq[InvokerHealth] = 
IndexedSeq.empty[InvokerHealth],
+  private var _managedInvokers: IndexedSeq[InvokerHealth] = 
IndexedSeq.empty[InvokerHealth],
+  private var _blackboxInvokers: IndexedSeq[InvokerHealth] = 
IndexedSeq.empty[InvokerHealth],
+  private var _stepSizes: Seq[Int] = 
ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(0),
+  private var _invokerSlots: IndexedSeq[ForcableSemaphore] = 
IndexedSeq.empty[ForcableSemaphore],
+  private var _clusterSize: Int = 1)(
+  lbConfig: ShardingContainerPoolBalancerConfig =
+    
loadConfigOrThrow[ShardingContainerPoolBalancerConfig](ConfigKeys.loadbalancer))(implicit
 logging: Logging) {
+
+  private val totalInvokerThreshold = lbConfig.invokerBusyThreshold
+  private var currentInvokerThreshold = totalInvokerThreshold
+
+  private val blackboxFraction: Double = Math.max(0.0, Math.min(1.0, 
lbConfig.blackboxFraction))
+  logging.info(this, s"blackboxFraction = 
$blackboxFraction")(TransactionId.loadbalancer)
+
+  /** Getters for the variables, setting from the outside is only allowed 
through the update methods below */
+  def invokers: IndexedSeq[InvokerHealth] = _invokers
+  def managedInvokers: IndexedSeq[InvokerHealth] = _managedInvokers
+  def blackboxInvokers: IndexedSeq[InvokerHealth] = _blackboxInvokers
+  def stepSizes: Seq[Int] = _stepSizes
+  def invokerSlots: IndexedSeq[ForcableSemaphore] = _invokerSlots
+  def clusterSize: Int = _clusterSize
+
+  /**
+   * Updates the scheduling state with the new invokers.
+   *
+   * This is okay to not happen atomically since dirty reads of the values set 
are not dangerous. It is important though
+   * to update the "invokers" variables last, since they will determine the 
range of invokers to choose from.
+   *
+   * Handling a shrinking invokers list is not necessary, because InvokerPool 
won't shrink its own list but rather
+   * report the invoker as "Offline".
+   *
+   * It is important that this method does not run concurrently to itself 
and/or to `updateCluster`
+   */
+  def updateInvokers(newInvokers: IndexedSeq[InvokerHealth]): Unit = {
+    val oldSize = _invokers.size
+    val newSize = newInvokers.size
+
+    if (oldSize != newSize) {
+      _stepSizes = ContainerPoolBalancer.pairwiseCoprimeNumbersUntil(newSize)
+      if (oldSize < newSize) {
+        // Keeps the existing state..
+        _invokerSlots = _invokerSlots.padTo(newSize, new 
ForcableSemaphore(currentInvokerThreshold))
+      }
+    }
+
+    val blackboxes = Math.max(1, (newSize.toDouble * blackboxFraction).toInt)
+    val managed = Math.max(1, newSize - blackboxes)
+
+    _invokers = newInvokers
+    _blackboxInvokers = _invokers.takeRight(blackboxes)
+    _managedInvokers = _invokers.take(managed)
+
+    logging.info(
+      this,
+      s"loadbalancer invoker status updated. managedInvokers = $managed 
blackboxInvokers = $blackboxes")(
+      TransactionId.loadbalancer)
+  }
+
+  /**
+   * Updates the size of a cluster. Throws away all state for simplicity.
+   *
+   * This is okay to not happen atomically, since a dirty read of the values 
set are not dangerous. At worst the
+   * scheduler works on outdated invoker-load data which is acceptable.
+   *
+   * It is important that this method does not run concurrently to itself 
and/or to `updateState`
+   */
+  def updateCluster(newSize: Int): Unit = {
+    val actualSize = newSize max 1 // if a cluster size < 1 is reported, falls 
back to a size of 1 (alone)
+    if (_clusterSize != actualSize) {
+      _clusterSize = actualSize
+      val newTreshold = (totalInvokerThreshold / actualSize) max 1 // letting 
this fall below 1 doesn't make sense
+      currentInvokerThreshold = newTreshold
+      _invokerSlots = _invokerSlots.map(_ => new 
ForcableSemaphore(currentInvokerThreshold))
+
+      logging.info(
+        this,
+        s"loadbalancer cluster size changed to $actualSize active nodes. 
invokerThreshold = $currentInvokerThreshold")(
+        TransactionId.loadbalancer)
+    }
+  }
+}
+
+/**
+ * Configuration for the sharding container pool balancer.
+ *
+ * @param blackboxFraction the fraction of all invokers to use exclusively for 
blackboxes
+ * @param invokerBusyThreshold how many slots an invoker has available in total
+ */
+case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double, 
invokerBusyThreshold: Int)
diff --git a/docs/deploy.md b/docs/deploy.md
index 99bf5ab..49c2636 100644
--- a/docs/deploy.md
+++ b/docs/deploy.md
@@ -17,6 +17,12 @@ How to down the members.
 Link to akka clustering documentation:
 https://doc.akka.io/docs/akka/2.5.4/scala/cluster-usage.html
 
+## Shared state vs. Sharding
+
+OpenWhisk supports both a shared state and a sharding model. By default the 
shared-state loadbalancer is used. The sharding loadbalancer is the newer 
implementation and scheduled to eventually supersede the shared-state 
implementation and become the default. To configure your system to use the 
sharding implementation, set `controller_loadbalancer_spi` to 
`whisk.core.loadBalancer.ShardingContainerPoolBalancer`.
+
+The sharding loadbalancer has the caveat of being limited in its scalability 
in its current implementation. It uses "horizontal" sharding, which means that 
the slots on each invoker are evenly divided to the loadbalancers. For example: 
In a system with 2 loadbalancers and invokers which have 16 slots each, each 
loadbalancer would get 8 slots on each invoker. In this specific case, a 
cluster of loadbalancers > 16 instances does not make sense, since each 
loadbalancer would only have a fra [...]
+
 # Invoker use of docker-runc
 
 To improve performance, Invokers attempt to maintain warm containers for 
frequently executed actions. To optimize resource usage, the action containers 
are paused/unpaused between invocations.  The system can be configured to use 
either docker-runc or docker to perform the pause/unpause operations by setting 
the value of the environment variable INVOKER_USE_RUNC to true or false 
respectively. If not set, it will default to true (use docker-runc).
diff --git a/tests/src/test/scala/limits/ThrottleTests.scala 
b/tests/src/test/scala/limits/ThrottleTests.scala
index dfee5c0..bfbc3f6 100644
--- a/tests/src/test/scala/limits/ThrottleTests.scala
+++ b/tests/src/test/scala/limits/ThrottleTests.scala
@@ -188,15 +188,12 @@ class ThrottleTests
     val numGroups = (totalInvokes / maximumConcurrentInvokes) + 1
     val invokesPerGroup = (totalInvokes / numGroups) + 1
     val interGroupSleep = 5.seconds
-    val results = (1 to numGroups)
-      .map { i =>
-        if (i != 1) { Thread.sleep(interGroupSleep.toMillis) }
-        untilThrottled(invokesPerGroup) { () =>
-          wsk.action.invoke(name, Map("payload" -> "testWord".toJson), 
expectedExitCode = DONTCARE_EXIT)
-        }
+    val results = (1 to numGroups).flatMap { i =>
+      if (i != 1) { Thread.sleep(interGroupSleep.toMillis) }
+      untilThrottled(invokesPerGroup) { () =>
+        wsk.action.invoke(name, Map("payload" -> "testWord".toJson), 
expectedExitCode = DONTCARE_EXIT)
       }
-      .flatten
-      .toList
+    }.toList
     val afterInvokes = Instant.now
 
     try {
@@ -244,10 +241,12 @@ class ThrottleTests
         action.create(name, timeoutAction)
     }
 
-    // The sleep is necessary as the load balancer currently has a latency 
before recognizing concurency.
+    // The sleep is necessary as the load balancer currently has a latency 
before recognizing concurrency.
     val sleep = 15.seconds
-    val slowInvokes = maximumConcurrentInvokes
-    val fastInvokes = 2
+    // Adding a bit of overcommit since some loadbalancers rely on some 
overcommit. This won't hurt those who don't
+    // since all activations are taken into account to check for throttled 
invokes below.
+    val slowInvokes = (maximumConcurrentInvokes * 1.2).toInt
+    val fastInvokes = 4
     val fastInvokeDuration = 4.seconds
     val slowInvokeDuration = sleep + fastInvokeDuration
 
diff --git a/tests/src/test/scala/whisk/common/ForcableSemaphoreTests.scala 
b/tests/src/test/scala/whisk/common/ForcableSemaphoreTests.scala
new file mode 100644
index 0000000..bcd3871
--- /dev/null
+++ b/tests/src/test/scala/whisk/common/ForcableSemaphoreTests.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.common
+
+import org.junit.runner.RunWith
+import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.junit.JUnitRunner
+
+@RunWith(classOf[JUnitRunner])
+class ForcableSemaphoreTests extends FlatSpec with Matchers {
+  behavior of "ForcableSemaphore"
+
+  it should "not allow to acquire, force or release negative amounts of 
permits" in {
+    val s = new ForcableSemaphore(2)
+    an[IllegalArgumentException] should be thrownBy s.tryAcquire(0)
+    an[IllegalArgumentException] should be thrownBy s.tryAcquire(-1)
+
+    an[IllegalArgumentException] should be thrownBy s.forceAcquire(0)
+    an[IllegalArgumentException] should be thrownBy s.forceAcquire(-1)
+
+    an[IllegalArgumentException] should be thrownBy s.release(0)
+    an[IllegalArgumentException] should be thrownBy s.release(-1)
+  }
+
+  it should "allow to acquire the defined amount of permits only" in {
+    val s = new ForcableSemaphore(2)
+    s.tryAcquire() shouldBe true // 1 permit left
+    s.tryAcquire() shouldBe true // 0 permits left
+    s.tryAcquire() shouldBe false
+
+    val s2 = new ForcableSemaphore(4)
+    s2.tryAcquire(5) shouldBe false // only 4 permits available
+    s2.tryAcquire(3) shouldBe true // 1 permit left
+    s2.tryAcquire(2) shouldBe false // only 1 permit available
+    s2.tryAcquire() shouldBe true
+  }
+
+  it should "allow to release permits again" in {
+    val s = new ForcableSemaphore(2)
+    s.tryAcquire() shouldBe true // 1 permit left
+    s.tryAcquire() shouldBe true // 0 permits left
+    s.tryAcquire() shouldBe false
+    s.release() // 1 permit left
+    s.tryAcquire() shouldBe true
+    s.release(2) // 1 permit left
+    s.tryAcquire(2) shouldBe true
+  }
+
+  it should "allow to force permits, delaying the acceptance of 'usual' 
permits until all of forced permits are released" in {
+    val s = new ForcableSemaphore(2)
+    s.tryAcquire(2) shouldBe true // 0 permits left
+    s.forceAcquire(5) // -5 permits left
+    s.tryAcquire() shouldBe false
+    s.release(4) // -1 permits left
+    s.tryAcquire() shouldBe false
+    s.release() // 0 permits left
+    s.tryAcquire() shouldBe false
+    s.release() // 1 permit left
+    s.tryAcquire() shouldBe true
+  }
+
+  it should "not give away more permits even under concurrent load" in {
+    // 100 iterations of this test
+    (0 until 100).foreach { _ =>
+      val s = new ForcableSemaphore(32)
+      // try to acquire more permits than allowed in parallel
+      val acquires = (0 until 64).par.map(_ => s.tryAcquire()).seq
+
+      val result = Seq.fill(32)(true) ++ Seq.fill(32)(false)
+      acquires should contain theSameElementsAs result
+    }
+  }
+}
diff --git 
a/tests/src/test/scala/whisk/core/controller/test/RateThrottleTests.scala 
b/tests/src/test/scala/whisk/core/controller/test/RateThrottleTests.scala
index 720e738..3e2ea2a 100644
--- a/tests/src/test/scala/whisk/core/controller/test/RateThrottleTests.scala
+++ b/tests/src/test/scala/whisk/core/controller/test/RateThrottleTests.scala
@@ -44,8 +44,8 @@ class RateThrottleTests extends FlatSpec with Matchers with 
StreamLogging {
   behavior of "Rate Throttle"
 
   it should "throttle when rate exceeds allowed threshold" in {
-    new RateThrottler("test", 0, 
_.limits.invocationsPerMinute).check(subject).ok shouldBe false
-    val rt = new RateThrottler("test", 1, _.limits.invocationsPerMinute)
+    new RateThrottler("test", _ => 0).check(subject).ok shouldBe false
+    val rt = new RateThrottler("test", _ => 1)
     rt.check(subject).ok shouldBe true
     rt.check(subject).ok shouldBe false
     rt.check(subject).ok shouldBe false
@@ -55,7 +55,7 @@ class RateThrottleTests extends FlatSpec with Matchers with 
StreamLogging {
 
   it should "check against an alternative limit if passed in" in {
     val withLimits = subject.copy(limits = UserLimits(invocationsPerMinute = 
Some(5)))
-    val rt = new RateThrottler("test", 1, _.limits.invocationsPerMinute)
+    val rt = new RateThrottler("test", u => 
u.limits.invocationsPerMinute.getOrElse(1))
     rt.check(withLimits).ok shouldBe true // 1
     rt.check(withLimits).ok shouldBe true // 2
     rt.check(withLimits).ok shouldBe true // 3
diff --git 
a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
 
b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
new file mode 100644
index 0000000..ca8442f
--- /dev/null
+++ 
b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.loadBalancer.test
+
+import common.StreamLogging
+import org.junit.runner.RunWith
+import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.junit.JUnitRunner
+import whisk.common.ForcableSemaphore
+import whisk.core.entity.InstanceId
+import whisk.core.loadBalancer._
+
+/**
+ * Unit tests for the ContainerPool object.
+ *
+ * These tests test only the "static" methods "schedule" and "remove"
+ * of the ContainerPool object.
+ */
+@RunWith(classOf[JUnitRunner])
+class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with 
StreamLogging {
+  behavior of "ShardingContainerPoolBalancerState"
+
+  def healthy(i: Int) = new InvokerHealth(InstanceId(i), Healthy)
+  def unhealthy(i: Int) = new InvokerHealth(InstanceId(i), UnHealthy)
+  def offline(i: Int) = new InvokerHealth(InstanceId(i), Offline)
+
+  def semaphores(count: Int, max: Int): IndexedSeq[ForcableSemaphore] =
+    IndexedSeq.fill(count)(new ForcableSemaphore(max))
+
+  it should "update invoker's state, growing the slots data and keeping valid 
old data" in {
+    // start empty
+    val slots = 10
+    val state = 
ShardingContainerPoolBalancerState()(ShardingContainerPoolBalancerConfig(0.5, 
slots))
+    state.invokers shouldBe 'empty
+    state.blackboxInvokers shouldBe 'empty
+    state.managedInvokers shouldBe 'empty
+    state.invokerSlots shouldBe 'empty
+    state.stepSizes shouldBe Seq()
+
+    // apply one update, verify everything is updated accordingly
+    val update1 = IndexedSeq(healthy(0))
+    state.updateInvokers(update1)
+
+    state.invokers shouldBe update1
+    state.blackboxInvokers shouldBe update1 // fallback to at least one
+    state.managedInvokers shouldBe update1 // fallback to at least one
+    state.invokerSlots.head.availablePermits shouldBe slots
+    state.stepSizes shouldBe Seq(1)
+
+    // aquire a slot to alter invoker state
+    state.invokerSlots.head.tryAcquire()
+    state.invokerSlots.head.availablePermits shouldBe slots - 1
+
+    // apply second update, growing the state
+    val update2 = IndexedSeq(healthy(0), healthy(1))
+    state.updateInvokers(update2)
+
+    state.invokers shouldBe update2
+    state.managedInvokers shouldBe IndexedSeq(update2.head)
+    state.blackboxInvokers shouldBe IndexedSeq(update2.last)
+    state.invokerSlots.head.availablePermits shouldBe slots - 1
+    state.invokerSlots(1).availablePermits shouldBe slots
+    state.stepSizes shouldBe Seq(1)
+  }
+
+  it should "update the cluster size, adjusting the invoker slots accordingly" 
in {
+    val slots = 10
+    val state = 
ShardingContainerPoolBalancerState()(ShardingContainerPoolBalancerConfig(0.5, 
slots))
+    state.updateInvokers(IndexedSeq(healthy(0)))
+
+    state.invokerSlots.head.tryAcquire()
+    state.invokerSlots.head.availablePermits shouldBe slots - 1
+
+    state.updateCluster(2)
+    state.invokerSlots.head.availablePermits shouldBe slots / 2 // state reset 
+ divided by 2
+  }
+
+  it should "fallback to a size of 1 (alone) if cluster size is < 1" in {
+    val slots = 10
+    val state = 
ShardingContainerPoolBalancerState()(ShardingContainerPoolBalancerConfig(0.5, 
slots))
+    state.updateInvokers(IndexedSeq(healthy(0)))
+
+    state.invokerSlots.head.availablePermits shouldBe slots
+
+    state.updateCluster(2)
+    state.invokerSlots.head.availablePermits shouldBe slots / 2
+
+    state.updateCluster(0)
+    state.invokerSlots.head.availablePermits shouldBe slots
+
+    state.updateCluster(-1)
+    state.invokerSlots.head.availablePermits shouldBe slots
+  }
+
+  it should "set the threshold to 1 if the cluster is bigger than there are 
slots on 1 invoker" in {
+    val slots = 10
+    val state = 
ShardingContainerPoolBalancerState()(ShardingContainerPoolBalancerConfig(0.5, 
slots))
+    state.updateInvokers(IndexedSeq(healthy(0)))
+
+    state.invokerSlots.head.availablePermits shouldBe slots
+
+    state.updateCluster(20)
+
+    state.invokerSlots.head.availablePermits shouldBe 1
+  }
+
+  behavior of "schedule"
+
+  it should "return None on an empty invoker list" in {
+    ShardingContainerPoolBalancer.schedule(IndexedSeq.empty, IndexedSeq.empty, 
index = 0, step = 2) shouldBe None
+  }
+
+  it should "return None if no invokers are healthy" in {
+    val invokerCount = 3
+    val invokerSlots = semaphores(invokerCount, 3)
+    val invokers = (0 until invokerCount).map(unhealthy)
+
+    ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, index = 0, 
step = 2) shouldBe None
+  }
+
+  it should "choose the first available invoker, jumping in stepSize steps, 
falling back to randomized scheduling once all invokers are full" in {
+    val invokerCount = 3
+    val invokerSlots = semaphores(invokerCount + 3, 3) // needs to be offset 
by 3 as well
+    val invokers = (0 until invokerCount).map(i => healthy(i + 3)) // offset 
by 3 to asset InstanceId is returned
+
+    val expectedResult = Seq(3, 3, 3, 5, 5, 5, 4, 4, 4)
+    val result = expectedResult.map { _ =>
+      ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, index = 
0, step = 2).get.toInt
+    }
+
+    result shouldBe expectedResult
+
+    val bruteResult = (0 to 100).map { _ =>
+      ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, index = 
0, step = 2).get.toInt
+    }
+
+    bruteResult should contain allOf (3, 4, 5)
+  }
+
+  it should "ignore unhealthy or offline invokers" in {
+    val invokers = IndexedSeq(healthy(0), unhealthy(1), offline(2), healthy(3))
+    val invokerSlots = semaphores(invokers.size, 3)
+
+    val expectedResult = Seq(0, 0, 0, 3, 3, 3)
+    val result = expectedResult.map { _ =>
+      ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, index = 
0, step = 1).get.toInt
+    }
+
+    result shouldBe expectedResult
+
+    // more schedules will result in randomized invokers, but the unhealthy 
and offline invokers should not be part
+    val bruteResult = (0 to 100).map { _ =>
+      ShardingContainerPoolBalancer.schedule(invokers, invokerSlots, index = 
0, step = 1).get.toInt
+    }
+
+    bruteResult should contain allOf (0, 3)
+    bruteResult should contain noneOf (1, 2)
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
cbic...@apache.org.

Reply via email to