This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e47321  Introduce active-ack timeout factor. (#3767)
9e47321 is described below

commit 9e473210d05e6b3759dbfb3bb8e1fd2bd1149ce0
Author: Martin Henke <martin.he...@web.de>
AuthorDate: Wed Jul 25 17:11:03 2018 +0200

    Introduce active-ack timeout factor. (#3767)
    
    If an active-ack does not appear after a certain timeout (action timeout + 
1 minute), the active-ack is considered as lost and we "force" it to keep the 
loadbalancer's state sane.
    
    In some cases where the system gets increasingly slow, this timeout is too 
narrow. This makes the value configurable to be able to rapidly adjust it and 
to find a good default in production environments.
    
    Co-authored-by: Markus Thömmes <markusthoem...@me.com>
---
 ansible/group_vars/all                                     |  1 +
 ansible/roles/controller/tasks/deploy.yml                  |  2 ++
 core/controller/src/main/resources/reference.conf          |  4 ++++
 .../core/loadBalancer/ShardingContainerPoolBalancer.scala  | 14 +++++++++++---
 .../test/ShardingContainerPoolBalancerTests.scala          | 13 ++++++++-----
 5 files changed, 26 insertions(+), 8 deletions(-)

diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 5e59411..aa32ede 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -66,6 +66,7 @@ controller:
   heap: "{{ controller_heap | default('2g') }}"
   arguments: "{{ controller_arguments | default('') }}"
   blackboxFraction: "{{ controller_blackbox_fraction | default(0.10) }}"
+  timeoutFactor: "{{ controller_timeout_factor | default(2) }}"
   instances: "{{ groups['controllers'] | length }}"
   localBookkeeping: "{{ controller_local_bookkeeping | default('false') }}"
   akka:
diff --git a/ansible/roles/controller/tasks/deploy.yml 
b/ansible/roles/controller/tasks/deploy.yml
index 80c6a23..92c576d 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -226,6 +226,8 @@
         "{{ invoker.busyThreshold }}"
       "CONFIG_whisk_loadbalancer_blackboxFraction":
         "{{ controller.blackboxFraction }}"
+      "CONFIG_whisk_loadbalancer_timeoutFactor":
+        "{{ controller.timeoutFactor }}"
 
       "CONFIG_kamon_statsd_hostname": "{{ metrics.kamon.host }}"
       "CONFIG_kamon_statsd_port": "{{ metrics.kamon.port }}"
diff --git a/core/controller/src/main/resources/reference.conf 
b/core/controller/src/main/resources/reference.conf
index 85712a4..3cf073c 100644
--- a/core/controller/src/main/resources/reference.conf
+++ b/core/controller/src/main/resources/reference.conf
@@ -8,6 +8,10 @@ whisk {
   loadbalancer {
     invoker-busy-threshold: 4
     blackbox-fraction: 10%
+    # factor to increase the timeout for forced active acks
+    # timeout = time-limit.std * timeoutfactor + 1m
+    # default is 2 because init and run can both use the configured timeout 
fully
+    timeout-factor = 2
   }
   controller {
     protocol: http
diff --git 
a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
 
b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index fa9c40f..dd84af9 100644
--- 
a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ 
b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -156,6 +156,8 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: Con
     None
   }
 
+  private val lbConfig = 
loadConfigOrThrow[ShardingContainerPoolBalancerConfig](ConfigKeys.loadbalancer)
+
   /** State related to invocations and throttling */
   private val activations = TrieMap[ActivationId, ActivationEntry]()
   private val activationsPerNamespace = TrieMap[UUID, LongAdder]()
@@ -163,7 +165,7 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: Con
   private val totalActivationMemory = new LongAdder()
 
   /** State needed for scheduling. */
-  private val schedulingState = ShardingContainerPoolBalancerState()()
+  private val schedulingState = ShardingContainerPoolBalancerState()(lbConfig)
 
   actorSystem.scheduler.schedule(0.seconds, 10.seconds) {
     
MetricEmitter.emitHistogramMetric(LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance),
 totalActivations.longValue)
@@ -254,7 +256,12 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, 
controllerInstance: Con
     totalActivationMemory.add(action.limits.memory.megabytes)
     activationsPerNamespace.getOrElseUpdate(msg.user.namespace.uuid, new 
LongAdder()).increment()
 
-    val timeout = action.limits.timeout.duration.max(TimeLimit.STD_DURATION) + 
1.minute
+    // Timeout is a multiple of the configured maximum action duration. The 
minimum timeout is the configured standard
+    // value for action durations to avoid too tight timeouts.
+    // Timeouts in general are diluted by a configurable factor. In essence 
this factor controls how much slack you want
+    // to allow in your topics before you start reporting failed activations.
+    val timeout = (action.limits.timeout.duration.max(TimeLimit.STD_DURATION) 
* lbConfig.timeoutFactor) + 1.minute
+
     // Install a timeout handler for the catastrophic case where an active ack 
is not received at all
     // (because say an invoker is down completely, or the connection to the 
message bus is disrupted) or when
     // the active ack is significantly delayed (possibly dues to long queues 
but the subject should not be penalized);
@@ -579,8 +586,9 @@ case class ClusterConfig(useClusterBootstrap: Boolean)
  *
  * @param blackboxFraction the fraction of all invokers to use exclusively for 
blackboxes
  * @param invokerBusyThreshold how many slots an invoker has available in total
+ * @param timeoutFactor factor to influence the timeout period for forced 
active acks (time-limit.std * timeoutFactor + 1m)
  */
-case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double, 
invokerBusyThreshold: Int)
+case class ShardingContainerPoolBalancerConfig(blackboxFraction: Double, 
invokerBusyThreshold: Int, timeoutFactor: Int)
 
 /**
  * State kept for each activation until completion.
diff --git 
a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
 
b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index 9b2567e..96a2cc3 100644
--- 
a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ 
b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -42,10 +42,13 @@ class ShardingContainerPoolBalancerTests extends FlatSpec 
with Matchers with Str
   def semaphores(count: Int, max: Int): IndexedSeq[ForcableSemaphore] =
     IndexedSeq.fill(count)(new ForcableSemaphore(max))
 
+  def lbConfig(blackboxFraction: Double, invokerBusyThreshold: Int) =
+    ShardingContainerPoolBalancerConfig(blackboxFraction, 
invokerBusyThreshold, 1)
+
   it should "update invoker's state, growing the slots data and keeping valid 
old data" in {
     // start empty
     val slots = 10
-    val state = 
ShardingContainerPoolBalancerState()(ShardingContainerPoolBalancerConfig(0.5, 
slots))
+    val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, slots))
     state.invokers shouldBe 'empty
     state.blackboxInvokers shouldBe 'empty
     state.managedInvokers shouldBe 'empty
@@ -85,7 +88,7 @@ class ShardingContainerPoolBalancerTests extends FlatSpec 
with Matchers with Str
 
   it should "allow managed partition to overlap with blackbox for small N" in {
     Seq(0.1, 0.2, 0.3, 0.4, 0.5).foreach { bf =>
-      val state = 
ShardingContainerPoolBalancerState()(ShardingContainerPoolBalancerConfig(bf, 1))
+      val state = ShardingContainerPoolBalancerState()(lbConfig(bf, 1))
 
       (1 to 100).toSeq.foreach { i =>
         state.updateInvokers((1 to i).map(_ => healthy(1)))
@@ -112,7 +115,7 @@ class ShardingContainerPoolBalancerTests extends FlatSpec 
with Matchers with Str
 
   it should "update the cluster size, adjusting the invoker slots accordingly" 
in {
     val slots = 10
-    val state = 
ShardingContainerPoolBalancerState()(ShardingContainerPoolBalancerConfig(0.5, 
slots))
+    val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, slots))
     state.updateInvokers(IndexedSeq(healthy(0)))
 
     state.invokerSlots.head.tryAcquire()
@@ -124,7 +127,7 @@ class ShardingContainerPoolBalancerTests extends FlatSpec 
with Matchers with Str
 
   it should "fallback to a size of 1 (alone) if cluster size is < 1" in {
     val slots = 10
-    val state = 
ShardingContainerPoolBalancerState()(ShardingContainerPoolBalancerConfig(0.5, 
slots))
+    val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, slots))
     state.updateInvokers(IndexedSeq(healthy(0)))
 
     state.invokerSlots.head.availablePermits shouldBe slots
@@ -141,7 +144,7 @@ class ShardingContainerPoolBalancerTests extends FlatSpec 
with Matchers with Str
 
   it should "set the threshold to 1 if the cluster is bigger than there are 
slots on 1 invoker" in {
     val slots = 10
-    val state = 
ShardingContainerPoolBalancerState()(ShardingContainerPoolBalancerConfig(0.5, 
slots))
+    val state = ShardingContainerPoolBalancerState()(lbConfig(0.5, slots))
     state.updateInvokers(IndexedSeq(healthy(0)))
 
     state.invokerSlots.head.availablePermits shouldBe slots

Reply via email to