This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new a87340b  Add more diagnostic information to completion ack processing 
(#4561)
a87340b is described below

commit a87340bf72914a2710542511415a5a0a047db1fc
Author: Sven Lange-Last <sven.lange-l...@de.ibm.com>
AuthorDate: Tue Jul 23 08:30:51 2019 +0200

    Add more diagnostic information to completion ack processing (#4561)
    
    * Add more diagnostic information to completion ack processing
    
    Forced completion acks can be a source of overloaded invokers because 
forced acks free up invoker slots in the load balancer. If the invoker is just 
"late" with running an activation so that the invoker sends the result / 
completion ack after the completion ack timeout has been reached in the load 
balancer, the load balancer may send new activations to the invoker while it is 
still fully occupied. As a result, these new activations have to wait for a 
container and can cause forced com [...]
    
    Goal of this change is to improve visibility and diagnostic information so 
that this mechanism can be better understood and fixed in the next step.
    
    * Add metrics for forced completion acks.
    * Log more diagnostic information when forcing a completion ack due to 
timeout.
    * Log a warning if a completion ack arrives after it has already been 
forced - this is an indication that action processing took too long. Today, 
this situation is only logged as debug message.
    * Improve code comments.
    * Provide a function for calculating the completion ack timeout. In the 
long term, this should be unified with the action wait timeout in sequence 
activations.
    
    * Add metrics to documentation
    
    * Address review feedback
    
    Streamline implementation of `CompletionAckType` and sub-types to save 
boilerplate code. This should make the code more readable.
    
    * Address review feedback
    
    Use LogMarkerToken singletons when emitting metrics related to completion 
acks instead of creating LogMarkerToken instances whenever emitting a counter.
    
    * Address review feedback
    
    Only use curly braces for expressions in string interpolations where 
required to prevent IntelliJ warnings.
    
    * Address review feedback
    
    Make LogMarkerToken instances private.
---
 .../org/apache/openwhisk/common/Logging.scala      |  43 +++++++++
 .../core/loadBalancer/CommonLoadBalancer.scala     | 107 ++++++++++++++++-----
 .../ShardingContainerPoolBalancer.scala            |  15 ++-
 docs/metrics.md                                    |  14 ++-
 4 files changed, 152 insertions(+), 27 deletions(-)

diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
index 01c9754..c728c5a 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/common/Logging.scala
@@ -352,6 +352,26 @@ object LoggingMarkers {
   private val containerClient = "containerClient"
 
   /*
+   * The following markers are used to emit log messages as well as metrics. 
Add all LogMarkerTokens below to
+   * have a reference list of all metrics. The list below contains 
LogMarkerToken singletons (val) as well as
+   * LogMarkerToken creation functions (def). The LogMarkerToken creation 
functions allow to include variable
+   * information in metrics, such as the controller / invoker id or commands 
executed by a container factory.
+   *
+   * When using LogMarkerTokens for emitting metrics, you should use the 
convenience functions only once to
+   * create LogMarkerToken singletons instead of creating LogMarkerToken 
instances over and over again for each
+   * metric emit.
+   *
+   * Example:
+   * val MY_COUNTER_GREEN = LoggingMarkers.MY_COUNTER(GreenCounter)
+   * ...
+   * MetricEmitter.emitCounterMetric(MY_COUNTER_GREEN)
+   *
+   * instead of
+   *
+   * MetricEmitter.emitCounterMetric(LoggingMarkers.MY_COUNTER(GreenCounter))
+   */
+
+  /*
    * Controller related markers
    */
   def CONTROLLER_STARTUP(id: String) =
@@ -414,6 +434,29 @@ object LoggingMarkers {
       LogMarkerToken(loadbalancer + controllerInstance.asString, 
s"memory${actionType}Inflight", counter)(
         MeasurementUnit.none)
 
+  // Counter metrics for completion acks in load balancer
+  sealed abstract class CompletionAckType(val name: String) { def asString: 
String = name }
+  case object RegularCompletionAck extends CompletionAckType("regular")
+  case object ForcedCompletionAck extends CompletionAckType("forced")
+  case object HealthcheckCompletionAck extends CompletionAckType("healthcheck")
+  case object RegularAfterForcedCompletionAck extends 
CompletionAckType("regularAfterForced")
+  case object ForcedAfterRegularCompletionAck extends 
CompletionAckType("forcedAfterRegular")
+
+  // Convenience function to create log marker tokens used for emitting 
counter metrics related to completion acks.
+  def LOADBALANCER_COMPLETION_ACK(controllerInstance: ControllerInstanceId, 
completionAckType: CompletionAckType) =
+    if (TransactionId.metricsKamonTags)
+      LogMarkerToken(
+        loadbalancer,
+        "completionAck",
+        counter,
+        None,
+        Map("controller_id" -> controllerInstance.asString, "type" -> 
completionAckType.asString))(MeasurementUnit.none)
+    else
+      LogMarkerToken(
+        loadbalancer + controllerInstance.asString,
+        "completionAck_" + completionAckType.asString,
+        counter)(MeasurementUnit.none)
+
   // Time that is needed to execute the action
   val INVOKER_ACTIVATION_RUN =
     LogMarkerToken(invoker, "activationRun", 
start)(MeasurementUnit.time.milliseconds)
diff --git 
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
 
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
index 724a5f7..51bf4a8 100644
--- 
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
+++ 
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/CommonLoadBalancer.scala
@@ -84,17 +84,39 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
   override def totalActiveActivations: Future[Int] = 
Future.successful(totalActivations.intValue())
 
   /**
+   * Calculate the duration within which a completion ack must be received for 
an activation.
+   *
+   * Calculation is based on the passed action time limit. If the passed 
action time limit is shorter than
+   * the configured standard action time limit, the latter is used to avoid 
too tight timeouts.
+   *
+   * The base timeout is multiplied with a configurable timeout factor. This 
dilution controls how much slack you
+   * want to allow in your system before you start reporting failed 
activations. The default value of 2 bases
+   * on invoker behavior that a cold invocation's init duration may be as long 
as its run duration. Higher factors
+   * may account for additional wait times.
+   *
+   * Finally, a constant duration is added to the diluted timeout to be 
lenient towards general delays / wait times.
+   *
+   * @param actionTimeLimit the action's time limit
+   * @return the calculated time duration within which a completion ack must 
be received
+   */
+  private def calculateCompletionAckTimeout(actionTimeLimit: FiniteDuration): 
FiniteDuration = {
+    (actionTimeLimit.max(TimeLimit.STD_DURATION) * lbConfig.timeoutFactor) + 
1.minute
+  }
+
+  /**
    * 2. Update local state with the activation to be executed scheduled.
    *
    * All activations are tracked in the activationSlots map. Additionally, 
blocking invokes
-   * are tracked in the activation results map. When a result is received via 
activeack, it
+   * are tracked in the activationPromises map. When a result is received via 
result ack, it
    * will cause the result to be forwarded to the caller waiting on the 
result, and cancel
    * the DB poll which is also trying to do the same.
+   * Once the completion ack arrives, activationSlots entry will be removed.
    */
   protected def setupActivation(msg: ActivationMessage,
                                 action: ExecutableWhiskActionMetaData,
                                 instance: InvokerInstanceId): 
Future[Either[ActivationId, WhiskActivation]] = {
 
+    // Needed for emitting metrics.
     totalActivations.increment()
     val isBlackboxInvocation = action.exec.pull
     val totalActivationMemory =
@@ -103,23 +125,28 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
 
     activationsPerNamespace.getOrElseUpdate(msg.user.namespace.uuid, new 
LongAdder()).increment()
 
-    // Timeout is a multiple of the configured maximum action duration. The 
minimum timeout is the configured standard
-    // value for action durations to avoid too tight timeouts.
-    // Timeouts in general are diluted by a configurable factor. In essence 
this factor controls how much slack you want
-    // to allow in your topics before you start reporting failed activations.
-    val timeout = (action.limits.timeout.duration.max(TimeLimit.STD_DURATION) 
* lbConfig.timeoutFactor) + 1.minute
+    // Completion Ack must be received within the calculated time.
+    val completionAckTimeout = 
calculateCompletionAckTimeout(action.limits.timeout.duration)
 
+    // If activation is blocking, store a promise that we can mark successful 
later on once the result ack
+    // arrives. Return a Future representing the promise to caller.
+    // If activation is non-blocking, return a successfully completed Future 
to caller.
     val resultPromise = if (msg.blocking) {
       activationPromises.getOrElseUpdate(msg.activationId, 
Promise[Either[ActivationId, WhiskActivation]]()).future
     } else Future.successful(Left(msg.activationId))
 
-    // Install a timeout handler for the catastrophic case where an active ack 
is not received at all
+    // Install a timeout handler for the catastrophic case where a completion 
ack is not received at all
     // (because say an invoker is down completely, or the connection to the 
message bus is disrupted) or when
-    // the active ack is significantly delayed (possibly dues to long queues 
but the subject should not be penalized);
+    // the completion ack is significantly delayed (possibly dues to long 
queues but the subject should not be penalized);
     // in this case, if the activation handler is still registered, remove it 
and update the books.
+    //
+    // Attention: a significantly delayed completion ack means that the 
invoker is still busy or will be busy in future
+    // with running the action. So the current strategy of freeing up the 
activation's memory in invoker
+    // book-keeping will allow the load balancer to send more activations to 
the invoker. This can lead to
+    // invoker overloads so that activations need to wait until other 
activations complete.
     activationSlots.getOrElseUpdate(
       msg.activationId, {
-        val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
+        val timeoutHandler = 
actorSystem.scheduler.scheduleOnce(completionAckTimeout) {
           processCompletion(msg.activationId, msg.transid, forced = true, 
isSystemError = false, invoker = instance)
         }
 
@@ -129,10 +156,12 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
           msg.user.namespace.uuid,
           instance,
           action.limits.memory.megabytes.MB,
+          action.limits.timeout.duration,
           action.limits.concurrency.maxConcurrent,
           action.fullyQualifiedName(true),
           timeoutHandler,
-          isBlackboxInvocation)
+          isBlackboxInvocation,
+          msg.blocking)
       })
 
     resultPromise
@@ -167,14 +196,11 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
     }
   }
 
-  /**
-   * Subscribes to active acks (completion messages from the invokers), and
-   * registers a handler for received active acks from invokers.
-   */
+  /** Subscribes to ack messages from the invokers (result / completion) and 
registers a handler for these messages. */
   private val activationFeed: ActorRef =
     feedFactory.createFeed(actorSystem, messagingProvider, 
processAcknowledgement)
 
-  /** 4. Get the active-ack message and parse it */
+  /** 4. Get the ack message and parse it */
   protected[loadBalancer] def processAcknowledgement(bytes: Array[Byte]): 
Future[Unit] = Future {
     val raw = new String(bytes, StandardCharsets.UTF_8)
     AcknowledegmentMessage.parse(raw) match {
@@ -214,6 +240,18 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
 
   protected def releaseInvoker(invoker: InvokerInstanceId, entry: 
ActivationEntry)
 
+  // Singletons for counter metrics related to completion acks
+  protected val LOADBALANCER_COMPLETION_ACK_REGULAR =
+    LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, 
RegularCompletionAck)
+  protected val LOADBALANCER_COMPLETION_ACK_FORCED =
+    LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, 
ForcedCompletionAck)
+  protected val LOADBALANCER_COMPLETION_ACK_HEALTHCHECK =
+    LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, 
HealthcheckCompletionAck)
+  protected val LOADBALANCER_COMPLETION_ACK_REGULAR_AFTER_FORCED =
+    LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, 
RegularAfterForcedCompletionAck)
+  protected val LOADBALANCER_COMPLETION_ACK_FORCED_AFTER_REGULAR =
+    LoggingMarkers.LOADBALANCER_COMPLETION_ACK(controllerInstance, 
ForcedAfterRegularCompletionAck)
+
   /** 6. Process the completion ack and update the state */
   protected[loadBalancer] def processCompletion(aid: ActivationId,
                                                 tid: TransactionId,
@@ -238,7 +276,7 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
         totalActivations.decrement()
         val totalActivationMemory =
           if (entry.isBlackbox) totalBlackBoxActivationMemory else 
totalManagedActivationMemory
-        totalActivationMemory.add(entry.memory.toMB * (-1))
+        totalActivationMemory.add(entry.memoryLimit.toMB * (-1))
         activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
 
         releaseInvoker(invoker, entry)
@@ -248,16 +286,28 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
           // notice here that the activationPromises is not touched, because 
the expectation is that
           // the active ack is received as expected, and processing that 
message removed the promise
           // from the corresponding map
+          logging.info(this, s"received completion ack for '$aid', system 
error=$isSystemError")(tid)
+
+          MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_REGULAR)
+
         } else {
           // the entry has timed out; if the active ack is still around, 
remove its entry also
           // and complete the promise with a failure if necessary
           activationPromises
             .remove(aid)
             .foreach(_.tryFailure(new Throwable("no completion or active ack 
received yet")))
+          val actionType = if (entry.isBlackbox) "blackbox" else "managed"
+          val blockingType = if (entry.isBlocking) "blocking" else 
"non-blocking"
+          val completionAckTimeout = 
calculateCompletionAckTimeout(entry.timeLimit)
+          logging.warn(
+            this,
+            s"forced completion ack for '$aid', action 
'${entry.fullyQualifiedEntityName}' ($actionType), $blockingType, mem limit 
${entry.memoryLimit.toMB} MB, time limit ${entry.timeLimit.toMillis} ms, 
completion ack timeout $completionAckTimeout from $invoker")(
+            tid)
+
+          MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_FORCED)
         }
 
-        logging.info(this, s"${if (!forced) "received" else "forced"} 
completion ack for '$aid'")(tid)
-        // Active acks that are received here are strictly from user actions - 
health actions are not part of
+        // Completion acks that are received here are strictly from user 
actions - health actions are not part of
         // the load balancer's activation map. Inform the invoker pool 
supervisor of the user action completion.
         // guard this
         invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
@@ -266,17 +316,28 @@ abstract class CommonLoadBalancer(config: WhiskConfig,
         // is important to pass to the invokerPool because they are used to 
determine if the invoker can be considered
         // healthy again.
         logging.info(this, s"received completion ack for health action on 
$invoker")(tid)
+
+        
MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_HEALTHCHECK)
+
         // guard this
         invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
       case None if !forced =>
-        // Received an active-ack that has already been taken out of the state 
because of a timeout (forced active-ack).
+        // Received a completion ack that has already been taken out of the 
state because of a timeout (forced ack).
         // The result is ignored because a timeout has already been reported 
to the invokerPool per the force.
-        logging.debug(this, s"received completion ack for '$aid' which has no 
entry")(tid)
+        // Logging this condition as a warning because the invoker processed 
the activation and sent a completion
+        // message - but not in time.
+        logging.warn(
+          this,
+          s"received completion ack for '$aid' from $invoker which has no 
entry, system error=$isSystemError")(tid)
+
+        
MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_REGULAR_AFTER_FORCED)
       case None =>
-        // The entry has already been removed by an active ack. This part of 
the code is reached by the timeout and can
-        // happen if active-ack and timeout happen roughly at the same time 
(the timeout was triggered before the active
-        // ack canceled the timer). As the active ack is already processed we 
don't have to do anything here.
+        // The entry has already been removed by a completion ack. This part 
of the code is reached by the timeout and can
+        // happen if completion ack and timeout happen roughly at the same 
time (the timeout was triggered before the completion
+        // ack canceled the timer). As the completion ack is already processed 
we don't have to do anything here.
         logging.debug(this, s"forced completion ack for '$aid' which has no 
entry")(tid)
+
+        
MetricEmitter.emitCounterMetric(LOADBALANCER_COMPLETION_ACK_FORCED_AFTER_REGULAR)
     }
   }
 }
diff --git 
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
 
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 4a57400..d03081d 100644
--- 
a/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ 
b/core/controller/src/main/scala/org/apache/openwhisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -41,6 +41,7 @@ import org.apache.openwhisk.spi.SpiLoader
 
 import scala.annotation.tailrec
 import scala.concurrent.Future
+import scala.concurrent.duration.FiniteDuration
 
 /**
  * A loadbalancer that schedules workload based on a hashing-algorithm.
@@ -325,7 +326,7 @@ class ShardingContainerPoolBalancer(
   override protected def releaseInvoker(invoker: InvokerInstanceId, entry: 
ActivationEntry) = {
     schedulingState.invokerSlots
       .lift(invoker.toInt)
-      .foreach(_.releaseConcurrent(entry.fullyQualifiedEntityName, 
entry.maxConcurrent, entry.memory.toMB.toInt))
+      .foreach(_.releaseConcurrent(entry.fullyQualifiedEntityName, 
entry.maxConcurrent, entry.memoryLimit.toMB.toInt))
   }
 }
 
@@ -603,13 +604,21 @@ case class 
ShardingContainerPoolBalancerConfig(managedFraction: Double, blackbox
  * @param id id of the activation
  * @param namespaceId namespace that invoked the action
  * @param invokerName invoker the action is scheduled to
+ * @param memoryLimit memory limit of the invoked action
+ * @param timeLimit time limit of the invoked action
+ * @param maxConcurrent concurrency limit of the invoked action
+ * @param fullyQualifiedEntityName fully qualified name of the invoked action
  * @param timeoutHandler times out completion of this activation, should be 
canceled on good paths
+ * @param isBlackbox true if the invoked action is a blackbox action, 
otherwise false (managed action)
+ * @param isBlocking true if the action is invoked in a blocking fashion, i.e. 
"somebody" waits for the result
  */
 case class ActivationEntry(id: ActivationId,
                            namespaceId: UUID,
                            invokerName: InvokerInstanceId,
-                           memory: ByteSize,
+                           memoryLimit: ByteSize,
+                           timeLimit: FiniteDuration,
                            maxConcurrent: Int,
                            fullyQualifiedEntityName: FullyQualifiedEntityName,
                            timeoutHandler: Cancellable,
-                           isBlackbox: Boolean)
+                           isBlackbox: Boolean,
+                           isBlocking: Boolean)
diff --git a/docs/metrics.md b/docs/metrics.md
index b2bfc48..9d00b38 100644
--- a/docs/metrics.md
+++ b/docs/metrics.md
@@ -98,7 +98,7 @@ Histogram record the 
[distribution](http://kamon.io/documentation/0.6.x/kamon-co
 
 #### Gauges
 
-Gauges record the 
[distribution](https://kamon.io/docs/latest/core/metrics/#gauges) of given 
metric and there names are prefixed with `openwhisk.gauge`. For example 
`openwhisk.gauge.loadbalancer_totalHealthyInvoker_counter`. A gauge metrics 
provides the value at the given point and reports the same data unless the 
value has been changed be incremental or decremental than before. Gauges are 
useful for reporting metrics like kafka queue size or disk size.
+Gauges record the 
[distribution](https://kamon.io/docs/latest/core/metrics/#gauges) of given 
metric and their names are prefixed with `openwhisk.gauge`. For example 
`openwhisk.gauge.loadbalancer_totalHealthyInvoker_counter`. A gauge metrics 
provides the value at the given point and reports the same data unless the 
value has been changed be incremental or decremental than before. Gauges are 
useful for reporting metrics like kafka queue size or disk size.
 
 ### Metric Details
 
@@ -157,6 +157,18 @@ Metrics below are for invoker state as recorded within 
load balancer monitoring.
 * `openwhisk.gauge.loadbalancer_totalOfflineInvoker<invokerType>_counter` 
(gauge) - Records the count of managed invokers considered offline when no 
health pings arrive from the invokers. **invokerType** defines whether it is a 
managed or a blackbox invoker.
 * `openwhisk.gauge.loadbalancer_totalUnhealthyInvoker<invokerType>_counter` 
(gauge) - Records the count of managed invokers considered unhealthy when 
health pings arrive fine but the invokers report system errors. **invokerType** 
defines whether it is a managed or a blackbox invoker.
 
+Metrics below provide information about completion ack processing in load 
balancers. Depending on configuration setting `metrics_kamon_tags` (see above), 
a base metric with tags or a set of metrics without tags will be emitted.
+
+* Base metric `openwhisk.counter.loadbalancer_completionAck_counter`: count of 
processed regular or forced completion acks.
+* Tag `controller_id`: the controller's id.
+* Tag `type`: the exact type of completion ack.
+  * Type `regular`: a regular completion ack sent by an invoker and received 
in time. Does not include completion acks for healthcheck actions.
+  * Type `forced`: no completion ack was received in time and the timeout 
forced the completion ack to close.
+  * Type `healthcheck`: a regular completion ack for healthcheck actions sent 
by an invoker and received in time.
+  * Type `regularAfterForced`: a regular completion ack sent by an invoker and 
not received in time. The completion ack was already forced.
+  * Type `forcedAfterRegular`: a timeout tries to force a completion ack that 
has already been closed by a regular completion ack. A race condition that can 
occur if the regular completion ack is received near the timeout.
+* If `metrics_kamon_tags` is set to `false`, a set of metrics will be emitted 
constructed using following scheme: 
`openwhisk.counter.loadbalancer<controller_id>_completionAck_<type>_counter`.
+
 #### Invoker metrics
 
 ##### Container Init

Reply via email to