This is an automated email from the ASF dual-hosted git repository. cbickel pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push: new 9dd34f2 Throttle the system based on active-ack timeouts. (#3875) 9dd34f2 is described below commit 9dd34f2f7f82a52d8e2400559781626bda8b8d02 Author: Markus Thömmes <markusthoem...@me.com> AuthorDate: Thu Jul 26 13:45:16 2018 +0200 Throttle the system based on active-ack timeouts. (#3875) Today, we have an arbitrary system-wide limit of maximum concurrent connections. In general that is fine, but it doesn't have a direct correlation to what's actually happening in the system. This adds a new state to each monitored invoker: Overloaded. An invoker will go into overloaded state if active-acks are starting to timeout. Eventually, if the system is really overloaded, all Invokers will be in overloaded state which will cause the loadbalancer to return a failure. This failure now results in a 503 - System overloaded message back to the user. --- ansible/README.md | 2 - ansible/group_vars/all | 1 - ansible/roles/controller/tasks/deploy.yml | 2 - ansible/templates/whisk.properties.j2 | 1 - .../src/main/scala/whisk/common/Logging.scala | 4 +- .../src/main/scala/whisk/common/RingBuffer.scala | 2 +- .../src/main/scala/whisk/core/WhiskConfig.scala | 2 - .../main/scala/whisk/core/controller/Actions.scala | 4 + .../scala/whisk/core/controller/Controller.scala | 4 +- .../scala/whisk/core/controller/WebActions.scala | 5 + .../core/entitlement/ActivationThrottler.scala | 19 +-- .../scala/whisk/core/entitlement/Entitlement.scala | 29 +--- .../core/loadBalancer/InvokerSupervision.scala | 176 ++++++++++++--------- .../ShardingContainerPoolBalancer.scala | 49 ++++-- tests/performance/preparation/deploy.sh | 2 +- .../test/InvokerSupervisionTests.scala | 67 ++++++-- .../test/ShardingContainerPoolBalancerTests.scala | 3 +- 17 files changed, 211 insertions(+), 161 deletions(-) diff --git a/ansible/README.md b/ansible/README.md index f4f147c..ec7d086 100644 --- a/ansible/README.md +++ b/ansible/README.md @@ -348,12 +348,10 @@ The default system throttling limits are configured in this file [./group_vars/a limits: invocationsPerMinute: "{{ limit_invocations_per_minute | default(60) }}" concurrentInvocations: "{{ limit_invocations_concurrent | default(30) }}" - concurrentInvocationsSystem: "{{ limit_invocations_concurrent_system | default(5000) }}" firesPerMinute: "{{ limit_fires_per_minute | default(60) }}" sequenceMaxLength: "{{ limit_sequence_max_length | default(50) }}" ``` - The `limits.invocationsPerMinute` represents the allowed namespace action invocations per minute. - The `limits.concurrentInvocations` represents the maximum concurrent invocations allowed per namespace. -- The `limits.concurrentInvocationsSystem` represents the maximum concurrent invocations the system will allow across all namespaces. - The `limits.firesPerMinute` represents the allowed namespace trigger firings per minute. - The `limits.sequenceMaxLength` represents the maximum length of a sequence action. diff --git a/ansible/group_vars/all b/ansible/group_vars/all index aa32ede..2114630 100644 --- a/ansible/group_vars/all +++ b/ansible/group_vars/all @@ -53,7 +53,6 @@ runtimesManifest: "{{ runtimes_manifest | default(lookup('file', openwhisk_home limits: invocationsPerMinute: "{{ limit_invocations_per_minute | default(60) }}" concurrentInvocations: "{{ limit_invocations_concurrent | default(30) }}" - concurrentInvocationsSystem: "{{ limit_invocations_concurrent_system | default(5000) }}" firesPerMinute: "{{ limit_fires_per_minute | default(60) }}" sequenceMaxLength: "{{ limit_sequence_max_length | default(50) }}" diff --git a/ansible/roles/controller/tasks/deploy.yml b/ansible/roles/controller/tasks/deploy.yml index 92c576d..11d7269 100644 --- a/ansible/roles/controller/tasks/deploy.yml +++ b/ansible/roles/controller/tasks/deploy.yml @@ -178,8 +178,6 @@ "LIMITS_ACTIONS_INVOKES_PERMINUTE": "{{ limits.invocationsPerMinute }}" "LIMITS_ACTIONS_INVOKES_CONCURRENT": "{{ limits.concurrentInvocations }}" - "LIMITS_ACTIONS_INVOKES_CONCURRENTINSYSTEM": - "{{ limits.concurrentInvocationsSystem }}" "LIMITS_TRIGGERS_FIRES_PERMINUTE": "{{ limits.firesPerMinute }}" "LIMITS_ACTIONS_SEQUENCE_MAXLENGTH": "{{ limits.sequenceMaxLength }}" diff --git a/ansible/templates/whisk.properties.j2 b/ansible/templates/whisk.properties.j2 index bfdba3b..ce3f2db 100644 --- a/ansible/templates/whisk.properties.j2 +++ b/ansible/templates/whisk.properties.j2 @@ -35,7 +35,6 @@ runtimes.manifest={{ runtimesManifest | to_json }} limits.actions.invokes.perMinute={{ limits.invocationsPerMinute }} limits.actions.invokes.concurrent={{ limits.concurrentInvocations }} -limits.actions.invokes.concurrentInSystem={{ limits.concurrentInvocationsSystem }} limits.triggers.fires.perMinute={{ limits.firesPerMinute }} limits.actions.sequence.maxLength={{ limits.sequenceMaxLength }} diff --git a/common/scala/src/main/scala/whisk/common/Logging.scala b/common/scala/src/main/scala/whisk/common/Logging.scala index 17bc2d3..56ab754 100644 --- a/common/scala/src/main/scala/whisk/common/Logging.scala +++ b/common/scala/src/main/scala/whisk/common/Logging.scala @@ -272,8 +272,8 @@ object LoggingMarkers { def INVOKER_STARTUP(i: Int) = LogMarkerToken(invoker, s"startup$i", count) // Check invoker healthy state from loadbalancer - val LOADBALANCER_INVOKER_OFFLINE = LogMarkerToken(loadbalancer, "invokerOffline", count) - val LOADBALANCER_INVOKER_UNHEALTHY = LogMarkerToken(loadbalancer, "invokerUnhealthy", count) + def LOADBALANCER_INVOKER_STATUS_CHANGE(state: String) = + LogMarkerToken(loadbalancer, "invokerState", count, Some(state)) val LOADBALANCER_ACTIVATION_START = LogMarkerToken(loadbalancer, "activations", count) def LOADBALANCER_ACTIVATIONS_INFLIGHT(controllerInstance: ControllerInstanceId) = diff --git a/common/scala/src/main/scala/whisk/common/RingBuffer.scala b/common/scala/src/main/scala/whisk/common/RingBuffer.scala index 4f5a6c7..8c4713d 100644 --- a/common/scala/src/main/scala/whisk/common/RingBuffer.scala +++ b/common/scala/src/main/scala/whisk/common/RingBuffer.scala @@ -28,5 +28,5 @@ class RingBuffer[T](size: Int) { def add(el: T) = inner.add(el) - def toList() = inner.toArray().asInstanceOf[Array[T]].toList + def toList = inner.toArray().asInstanceOf[Array[T]].toList } diff --git a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala index fd6eeec..7396c02 100644 --- a/common/scala/src/main/scala/whisk/core/WhiskConfig.scala +++ b/common/scala/src/main/scala/whisk/core/WhiskConfig.scala @@ -78,7 +78,6 @@ class WhiskConfig(requiredProperties: Map[String, String], val actionInvokePerMinuteLimit = this(WhiskConfig.actionInvokePerMinuteLimit) val actionInvokeConcurrentLimit = this(WhiskConfig.actionInvokeConcurrentLimit) val triggerFirePerMinuteLimit = this(WhiskConfig.triggerFirePerMinuteLimit) - val actionInvokeSystemOverloadLimit = this(WhiskConfig.actionInvokeSystemOverloadLimit) val actionSequenceLimit = this(WhiskConfig.actionSequenceMaxLimit) val controllerSeedNodes = this(WhiskConfig.controllerSeedNodes) } @@ -189,7 +188,6 @@ object WhiskConfig { val actionSequenceMaxLimit = "limits.actions.sequence.maxLength" val actionInvokePerMinuteLimit = "limits.actions.invokes.perMinute" val actionInvokeConcurrentLimit = "limits.actions.invokes.concurrent" - val actionInvokeSystemOverloadLimit = "limits.actions.invokes.concurrentInSystem" val triggerFirePerMinuteLimit = "limits.triggers.fires.perMinute" val controllerSeedNodes = "akka.cluster.seed.nodes" } diff --git a/core/controller/src/main/scala/whisk/core/controller/Actions.scala b/core/controller/src/main/scala/whisk/core/controller/Actions.scala index 4bfdb65..12c8ec6 100644 --- a/core/controller/src/main/scala/whisk/core/controller/Actions.scala +++ b/core/controller/src/main/scala/whisk/core/controller/Actions.scala @@ -45,6 +45,7 @@ import whisk.http.Messages import whisk.http.Messages._ import whisk.core.entitlement.Resource import whisk.core.entitlement.Collection +import whisk.core.loadBalancer.LoadBalancerException /** * A singleton object which defines the properties that must be present in a configuration @@ -280,6 +281,9 @@ trait WhiskActionsApi extends WhiskCollectionAPI with PostActionActivation with case Failure(RejectRequest(code, message)) => logging.debug(this, s"[POST] action rejected with code $code: $message") terminate(code, message) + case Failure(t: LoadBalancerException) => + logging.error(this, s"[POST] failed in loadbalancer: ${t.getMessage}") + terminate(ServiceUnavailable) case Failure(t: Throwable) => logging.error(this, s"[POST] action activation failed: ${t.getMessage}") terminate(InternalServerError) diff --git a/core/controller/src/main/scala/whisk/core/controller/Controller.scala b/core/controller/src/main/scala/whisk/core/controller/Controller.scala index 8167a13..c130727 100644 --- a/core/controller/src/main/scala/whisk/core/controller/Controller.scala +++ b/core/controller/src/main/scala/whisk/core/controller/Controller.scala @@ -42,7 +42,7 @@ import whisk.core.entitlement._ import whisk.core.entity._ import whisk.core.entity.ActivationId.ActivationIdGenerator import whisk.core.entity.ExecManifest.Runtimes -import whisk.core.loadBalancer.{Healthy, LoadBalancerProvider} +import whisk.core.loadBalancer.{InvokerState, LoadBalancerProvider} import whisk.http.BasicHttpService import whisk.http.BasicRasService import whisk.spi.SpiLoader @@ -151,7 +151,7 @@ class Controller(val instance: ControllerInstanceId, complete { loadBalancer .invokerHealth() - .map(_.count(_.status == Healthy).toJson) + .map(_.count(_.status == InvokerState.Healthy).toJson) } } } diff --git a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala index f373a1f..e195561 100644 --- a/core/controller/src/main/scala/whisk/core/controller/WebActions.scala +++ b/core/controller/src/main/scala/whisk/core/controller/WebActions.scala @@ -52,6 +52,7 @@ import whisk.core.controller.actions.PostActionActivation import whisk.core.database._ import whisk.core.entity._ import whisk.core.entity.types._ +import whisk.core.loadBalancer.LoadBalancerException import whisk.http.ErrorResponse.terminate import whisk.http.Messages import whisk.http.LenientSprayJsonSupport._ @@ -673,6 +674,10 @@ trait WhiskWebActionsApi extends Directives with ValidateRequestSize with PostAc case Failure(t: RejectRequest) => terminate(t.code, t.message) + case Failure(t: LoadBalancerException) => + logging.error(this, s"failed in loadbalancer: $t") + terminate(ServiceUnavailable) + case Failure(t) => logging.error(this, s"exception in completeRequest: $t") terminate(InternalServerError) diff --git a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala index 92d5434..b48a4dd 100644 --- a/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala +++ b/core/controller/src/main/scala/whisk/core/entitlement/ActivationThrottler.scala @@ -30,14 +30,11 @@ import scala.concurrent.{ExecutionContext, Future} * * @param loadBalancer contains active quotas * @param concurrencyLimit a calculated limit relative to the user using the system - * @param systemOverloadLimit the limit when the system is considered overloaded */ -class ActivationThrottler(loadBalancer: LoadBalancer, concurrencyLimit: Identity => Int, systemOverloadLimit: Int)( +class ActivationThrottler(loadBalancer: LoadBalancer, concurrencyLimit: Identity => Int)( implicit logging: Logging, executionContext: ExecutionContext) { - logging.info(this, s"systemOverloadLimit = $systemOverloadLimit")(TransactionId.controller) - /** * Checks whether the operation should be allowed to proceed. */ @@ -50,20 +47,6 @@ class ActivationThrottler(loadBalancer: LoadBalancer, concurrencyLimit: Identity ConcurrentRateLimit(concurrentActivations, currentLimit) } } - - /** - * Checks whether the system is in a generally overloaded state. - */ - def isOverloaded()(implicit tid: TransactionId): Future[Boolean] = { - loadBalancer.totalActiveActivations.map { concurrentActivations => - val overloaded = concurrentActivations > systemOverloadLimit - if (overloaded) - logging.info( - this, - s"concurrent activations in system = $concurrentActivations, below limit = $systemOverloadLimit") - overloaded - } - } } sealed trait RateLimit { diff --git a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala index 078530a..a553357 100644 --- a/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala +++ b/core/controller/src/main/scala/whisk/core/entitlement/Entitlement.scala @@ -35,7 +35,6 @@ import whisk.core.entity._ import whisk.core.loadBalancer.{LoadBalancer, ShardingContainerPoolBalancer} import whisk.http.ErrorResponse import whisk.http.Messages -import whisk.http.Messages._ import whisk.core.connector.MessagingProvider import whisk.spi.SpiLoader import whisk.spi.Spi @@ -74,8 +73,7 @@ protected[core] object EntitlementProvider { val requiredProperties = Map( WhiskConfig.actionInvokePerMinuteLimit -> null, WhiskConfig.actionInvokeConcurrentLimit -> null, - WhiskConfig.triggerFirePerMinuteLimit -> null, - WhiskConfig.actionInvokeSystemOverloadLimit -> null) + WhiskConfig.triggerFirePerMinuteLimit -> null) } /** @@ -148,8 +146,7 @@ protected[core] abstract class EntitlementProvider( private val concurrentInvokeThrottler = new ActivationThrottler( loadBalancer, - activationThrottleCalculator(config.actionInvokeConcurrentLimit.toInt, _.limits.concurrentInvocations), - config.actionInvokeSystemOverloadLimit.toInt) + activationThrottleCalculator(config.actionInvokeConcurrentLimit.toInt, _.limits.concurrentInvocations)) private val messagingProvider = SpiLoader.get[MessagingProvider] private val eventProducer = messagingProvider.getProducer(this.config) @@ -196,8 +193,7 @@ protected[core] abstract class EntitlementProvider( protected[core] def checkThrottles(user: Identity)(implicit transid: TransactionId): Future[Unit] = { logging.debug(this, s"checking user '${user.subject}' has not exceeded activation quota") - checkSystemOverload(ACTIVATE) - .flatMap(_ => checkThrottleOverload(Future.successful(invokeRateThrottler.check(user)), user)) + checkThrottleOverload(Future.successful(invokeRateThrottler.check(user)), user) .flatMap(_ => checkThrottleOverload(concurrentInvokeThrottler.check(user), user)) } @@ -257,8 +253,7 @@ protected[core] abstract class EntitlementProvider( val throttleCheck = if (noThrottle) Future.successful(()) else - checkSystemOverload(right) - .flatMap(_ => checkUserThrottle(user, right, resources)) + checkUserThrottle(user, right, resources) .flatMap(_ => checkConcurrentUserThrottle(user, right, resources)) throttleCheck .flatMap(_ => checkPrivilege(user, right, resources)) @@ -312,22 +307,6 @@ protected[core] abstract class EntitlementProvider( } /** - * Limits activations if the system is overloaded. - * - * @param right the privilege, if ACTIVATE then check quota else return None - * @return future completing successfully if system is not overloaded else failing with a rejection - */ - protected def checkSystemOverload(right: Privilege)(implicit transid: TransactionId): Future[Unit] = { - concurrentInvokeThrottler.isOverloaded.flatMap { isOverloaded => - val systemOverload = right == ACTIVATE && isOverloaded - if (systemOverload) { - logging.error(this, "system is overloaded") - Future.failed(RejectRequest(TooManyRequests, systemOverloaded)) - } else Future.successful(()) - } - } - - /** * Limits activations if subject exceeds their own limits. * If the requested right is an activation, the set of resources must contain an activation of an action or filter to be throttled. * While it is possible for the set of resources to contain more than one action or trigger, the plurality is ignored and treated diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala index 15646b8..223b747 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/InvokerSupervision.scala @@ -20,7 +20,7 @@ package whisk.core.loadBalancer import java.nio.charset.StandardCharsets import scala.collection.immutable -import scala.concurrent.{Await, Future} +import scala.concurrent.{Await, ExecutionContext, Future} import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success @@ -45,19 +45,47 @@ case object GetStatus case object Tick // States an Invoker can be in -sealed trait InvokerState { val asString: String } -case object Offline extends InvokerState { val asString = "down" } -case object Healthy extends InvokerState { val asString = "up" } -case object UnHealthy extends InvokerState { val asString = "unhealthy" } +sealed trait InvokerState { + val asString: String + val isUsable: Boolean +} + +object InvokerState { + // Invokers in this state can be used to schedule workload to + sealed trait Usable extends InvokerState { val isUsable = true } + // No workload should be scheduled to invokers in this state + sealed trait Unusable extends InvokerState { val isUsable = false } + + // A completely healthy invoker, pings arriving fine, no system errors + case object Healthy extends Usable { val asString = "up" } + // Pings are arriving fine, the invoker returns system errors though + case object Unhealthy extends Unusable { val asString = "unhealthy" } + // Pings are arriving fine, the invoker does not respond with active-acks in the expected time though + case object Unresponsible extends Unusable { val asString = "unresponsible" } + // Pings are not arriving for this invoker + case object Offline extends Unusable { val asString = "down" } +} + +// Possible answers of an activation +sealed trait InvocationFinishedResult +object InvocationFinishedResult { + // The activation could be successfully executed from the system's point of view. That includes user- and application + // errors + case object Success extends InvocationFinishedResult + // The activation could not be executed because of a system error + case object SystemError extends InvocationFinishedResult + // The active-ack did not arrive before it timed out + case object Timeout extends InvocationFinishedResult +} case class ActivationRequest(msg: ActivationMessage, invoker: InvokerInstanceId) -case class InvocationFinishedMessage(invokerInstance: InvokerInstanceId, successful: Boolean) +case class InvocationFinishedMessage(invokerInstance: InvokerInstanceId, result: InvocationFinishedResult) // Sent to a monitor if the state changed case class CurrentInvokerPoolState(newState: IndexedSeq[InvokerHealth]) // Data stored in the Invoker -final case class InvokerInfo(buffer: RingBuffer[Boolean]) +final case class InvokerInfo(buffer: RingBuffer[InvocationFinishedResult]) /** * Actor representing a pool of invokers @@ -76,10 +104,12 @@ class InvokerPool(childFactory: (ActorRefFactory, InvokerInstanceId) => ActorRef monitor: Option[ActorRef]) extends Actor { - implicit val transid = TransactionId.invokerHealth - implicit val logging = new AkkaLogging(context.system.log) - implicit val timeout = Timeout(5.seconds) - implicit val ec = context.dispatcher + import InvokerState._ + + implicit val transid: TransactionId = TransactionId.invokerHealth + implicit val logging: Logging = new AkkaLogging(context.system.log) + implicit val timeout: Timeout = Timeout(5.seconds) + implicit val ec: ExecutionContext = context.dispatcher // State of the actor. Mutable vars with immutable collections prevents closures or messages // from leaking the state for external mutation @@ -87,7 +117,7 @@ class InvokerPool(childFactory: (ActorRefFactory, InvokerInstanceId) => ActorRef var refToInstance = immutable.Map.empty[ActorRef, InvokerInstanceId] var status = IndexedSeq[InvokerHealth]() - def receive = { + def receive: Receive = { case p: PingMessage => val invoker = instanceToRef.getOrElse(p.instance, registerInvoker(p.instance)) instanceToRef = instanceToRef.updated(p.instance, invoker) @@ -116,15 +146,15 @@ class InvokerPool(childFactory: (ActorRefFactory, InvokerInstanceId) => ActorRef case msg: ActivationRequest => sendActivationToInvoker(msg.msg, msg.invoker).pipeTo(sender) } - def logStatus() = { + def logStatus(): Unit = { monitor.foreach(_ ! CurrentInvokerPoolState(status)) val pretty = status.map(i => s"${i.id.toInt} -> ${i.status}") logging.info(this, s"invoker status changed to ${pretty.mkString(", ")}") } /** Receive Ping messages from invokers. */ - val pingPollDuration = 1.second - val invokerPingFeed = context.system.actorOf(Props { + val pingPollDuration: FiniteDuration = 1.second + val invokerPingFeed: ActorRef = context.system.actorOf(Props { new MessageFeed( "ping", logging, @@ -149,7 +179,7 @@ class InvokerPool(childFactory: (ActorRefFactory, InvokerInstanceId) => ActorRef } /** Pads a list to a given length using the given function to compute entries */ - def padToIndexed[A](list: IndexedSeq[A], n: Int, f: (Int) => A) = list ++ (list.size until n).map(f) + def padToIndexed[A](list: IndexedSeq[A], n: Int, f: (Int) => A): IndexedSeq[A] = list ++ (list.size until n).map(f) // Register a new invoker def registerInvoker(instanceId: InvokerInstanceId): ActorRef = { @@ -170,9 +200,9 @@ class InvokerPool(childFactory: (ActorRefFactory, InvokerInstanceId) => ActorRef object InvokerPool { private def createTestActionForInvokerHealth(db: EntityStore, action: WhiskAction): Future[Unit] = { - implicit val tid = TransactionId.loadbalancer - implicit val ec = db.executionContext - implicit val logging = db.logging + implicit val tid: TransactionId = TransactionId.loadbalancer + implicit val ec: ExecutionContext = db.executionContext + implicit val logging: Logging = db.logging WhiskAction .get(db, action.docid) @@ -214,12 +244,12 @@ object InvokerPool { def props(f: (ActorRefFactory, InvokerInstanceId) => ActorRef, p: (ActivationMessage, InvokerInstanceId) => Future[RecordMetadata], pc: MessageConsumer, - m: Option[ActorRef] = None) = { + m: Option[ActorRef] = None): Props = { Props(new InvokerPool(f, p, pc, m)) } /** A stub identity for invoking the test action. This does not need to be a valid identity. */ - val healthActionIdentity = { + val healthActionIdentity: Identity = { val whiskSystem = "whisk.system" val uuid = UUID() Identity( @@ -230,13 +260,13 @@ object InvokerPool { } /** An action to use for monitoring invoker health. */ - def healthAction(i: ControllerInstanceId) = ExecManifest.runtimesManifest.resolveDefaultRuntime("nodejs:6").map { - manifest => + def healthAction(i: ControllerInstanceId): Option[WhiskAction] = + ExecManifest.runtimesManifest.resolveDefaultRuntime("nodejs:6").map { manifest => new WhiskAction( namespace = healthActionIdentity.namespace.name.toPath, name = EntityName(s"invokerHealthTestAction${i.asString}"), exec = CodeExecAsString(manifest, """function main(params) { return params; }""", None)) - } + } } /** @@ -247,47 +277,47 @@ object InvokerPool { */ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: ControllerInstanceId) extends FSM[InvokerState, InvokerInfo] { - implicit val transid = TransactionId.invokerHealth - implicit val logging = new AkkaLogging(context.system.log) + + import InvokerState._ + + implicit val transid: TransactionId = TransactionId.invokerHealth + implicit val logging: Logging = new AkkaLogging(context.system.log) val name = s"invoker${invokerInstance.toInt}" - val healthyTimeout = 10.seconds + val healthyTimeout: FiniteDuration = 10.seconds // This is done at this point to not intermingle with the state-machine // especially their timeouts. def customReceive: Receive = { case _: RecordMetadata => // The response of putting testactions to the MessageProducer. We don't have to do anything with them. } - override def receive = customReceive.orElse(super.receive) + override def receive: Receive = customReceive.orElse(super.receive) - /** - * Always start UnHealthy. Then the invoker receives some test activations and becomes Healthy. - */ - startWith(UnHealthy, InvokerInfo(new RingBuffer[Boolean](InvokerActor.bufferSize))) + /** Always start UnHealthy. Then the invoker receives some test activations and becomes Healthy. */ + startWith(Unhealthy, InvokerInfo(new RingBuffer[InvocationFinishedResult](InvokerActor.bufferSize))) - /** - * An Offline invoker represents an existing but broken - * invoker. This means, that it does not send pings anymore. - */ + /** An Offline invoker represents an existing but broken invoker. This means, that it does not send pings anymore. */ when(Offline) { - case Event(_: PingMessage, _) => goto(UnHealthy) + case Event(_: PingMessage, _) => goto(Unhealthy) } - /** - * An UnHealthy invoker represents an invoker that was not able to handle actions successfully. - */ - when(UnHealthy, stateTimeout = healthyTimeout) { + // To be used for all states that should send test actions to reverify the invoker + val healthPingingState: StateFunction = { case Event(_: PingMessage, _) => stay case Event(StateTimeout, _) => goto(Offline) - case Event(Tick, info) => { + case Event(Tick, _) => invokeTestAction() stay - } } + /** An Unhealthy invoker represents an invoker that was not able to handle actions successfully. */ + when(Unhealthy, stateTimeout = healthyTimeout)(healthPingingState) + + /** An Unresponsible invoker represents an invoker that is not responding with active acks in a timely manner */ + when(Unresponsible, stateTimeout = healthyTimeout)(healthPingingState) + /** - * A Healthy invoker is characterized by continuously getting - * pings. It will go offline if that state is not confirmed + * A Healthy invoker is characterized by continuously getting pings. It will go offline if that state is not confirmed * for 20 seconds. */ when(Healthy, stateTimeout = healthyTimeout) { @@ -295,70 +325,68 @@ class InvokerActor(invokerInstance: InvokerInstanceId, controllerInstance: Contr case Event(StateTimeout, _) => goto(Offline) } - /** - * Handle the completion of an Activation in every state. - */ + /** Handle the completion of an Activation in every state. */ whenUnhandled { - case Event(cm: InvocationFinishedMessage, info) => handleCompletionMessage(cm.successful, info.buffer) + case Event(cm: InvocationFinishedMessage, info) => handleCompletionMessage(cm.result, info.buffer) } /** Logging on Transition change */ onTransition { - case _ -> Offline => - transid.mark( - this, - LoggingMarkers.LOADBALANCER_INVOKER_OFFLINE, - s"$name is offline", - akka.event.Logging.WarningLevel) - case _ -> UnHealthy => + case _ -> newState if !newState.isUsable => transid.mark( this, - LoggingMarkers.LOADBALANCER_INVOKER_UNHEALTHY, - s"$name is unhealthy", + LoggingMarkers.LOADBALANCER_INVOKER_STATUS_CHANGE(newState.asString), + s"$name is ${newState.asString}", akka.event.Logging.WarningLevel) - case _ -> Healthy => logging.info(this, s"$name is healthy") + case _ -> newState if newState.isUsable => logging.info(this, s"$name is ${newState.asString}") } - /** Scheduler to send test activations when the invoker is unhealthy. */ - onTransition { - case _ -> UnHealthy => { + // To be used for all states that should send test actions to reverify the invoker + def healthPingingTransitionHandler(state: InvokerState): TransitionHandler = { + case _ -> `state` => invokeTestAction() - setTimer(InvokerActor.timerName, Tick, 1.minute, true) - } - case UnHealthy -> _ => cancelTimer(InvokerActor.timerName) + setTimer(InvokerActor.timerName, Tick, 1.minute, repeat = true) + case `state` -> _ => cancelTimer(InvokerActor.timerName) } + onTransition(healthPingingTransitionHandler(Unhealthy)) + onTransition(healthPingingTransitionHandler(Unresponsible)) + initialize() /** * Handling for active acks. This method saves the result (successful or unsuccessful) * into an RingBuffer and checks, if the InvokerActor has to be changed to UnHealthy. * - * @param wasActivationSuccessful: result of Activation + * @param result: result of Activation * @param buffer to be used */ - private def handleCompletionMessage(wasActivationSuccessful: Boolean, buffer: RingBuffer[Boolean]) = { - buffer.add(wasActivationSuccessful) + private def handleCompletionMessage(result: InvocationFinishedResult, + buffer: RingBuffer[InvocationFinishedResult]) = { + buffer.add(result) // If the action is successful it seems like the Invoker is Healthy again. So we execute immediately // a new test action to remove the errors out of the RingBuffer as fast as possible. // The actions that arrive while the invoker is unhealthy are most likely health actions. // It is possible they are normal user actions as well. This can happen if such actions were in the // invoker queue or in progress while the invoker's status flipped to Unhealthy. - if (wasActivationSuccessful && stateName == UnHealthy) { + if (result == InvocationFinishedResult.Success && stateName == Unhealthy) { invokeTestAction() } // Stay in online if the activations was successful. // Stay in offline, if an activeAck reaches the controller. - if ((stateName == Healthy && wasActivationSuccessful) || stateName == Offline) { + if ((stateName == Healthy && result == InvocationFinishedResult.Success) || stateName == Offline) { stay } else { - // Goto UnHealthy if there are more errors than accepted in buffer, else goto Healthy - if (buffer.toList.count(_ == true) >= InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance) { - gotoIfNotThere(Healthy) + val entries = buffer.toList + // Goto Unhealthy or Unresponsible respectively if there are more errors than accepted in buffer, else goto Healthy + if (entries.count(_ == InvocationFinishedResult.SystemError) > InvokerActor.bufferErrorTolerance) { + gotoIfNotThere(Unhealthy) + } else if (entries.count(_ == InvocationFinishedResult.Timeout) > InvokerActor.bufferErrorTolerance) { + gotoIfNotThere(Unresponsible) } else { - gotoIfNotThere(UnHealthy) + gotoIfNotThere(Healthy) } } } diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala index dd84af9..3cdce1d 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala @@ -244,7 +244,17 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con entry.promise.future } } - .getOrElse(Future.failed(LoadBalancerException("No invokers available"))) + .getOrElse { + // report the state of all invokers + val actionType = if (!action.exec.pull) "non-blackbox" else "blackbox" + val invokerStates = invokersToUse.foldLeft(Map.empty[InvokerState, Int]) { (agg, curr) => + val count = agg.getOrElse(curr.status, 0) + 1 + agg + (curr.status -> count) + } + + logging.error(this, s"failed to schedule $actionType action, invokers to use: $invokerStates") + Future.failed(LoadBalancerException("No invokers available")) + } } /** 2. Update local state with the to be executed activation */ @@ -353,8 +363,18 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con invoker: InvokerInstanceId): Unit = { val aid = response.fold(l => l, r => r.activationId) - // treat left as success (as it is the result of a message exceeding the bus limit) - val isSuccess = response.fold(_ => true, r => !r.response.isWhiskError) + val invocationResult = if (forced) { + InvocationFinishedResult.Timeout + } else { + // If the response contains a system error, report that, otherwise report Success + // Left generally is considered a Success, since that could be a message not fitting into Kafka + val isSystemError = response.fold(_ => false, _.response.isWhiskError) + if (isSystemError) { + InvocationFinishedResult.SystemError + } else { + InvocationFinishedResult.Success + } + } activations.remove(aid) match { case Some(entry) => @@ -373,16 +393,21 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con logging.info(this, s"${if (!forced) "received" else "forced"} active ack for '$aid'")(tid) // Active acks that are received here are strictly from user actions - health actions are not part of // the load balancer's activation map. Inform the invoker pool supervisor of the user action completion. - invokerPool ! InvocationFinishedMessage(invoker, isSuccess) + invokerPool ! InvocationFinishedMessage(invoker, invocationResult) + case None if tid == TransactionId.invokerHealth => + // Health actions do not have an ActivationEntry as they are written on the message bus directly. Their result + // is important to pass to the invokerPool because they are used to determine if the invoker can be considered + // healthy again. + logging.info(this, s"received active ack for health action on $invoker")(tid) + invokerPool ! InvocationFinishedMessage(invoker, invocationResult) case None if !forced => - // the entry has already been removed but we receive an active ack for this activation Id. - // This happens for health actions, because they don't have an entry in Loadbalancerdata or - // for activations that already timed out. - invokerPool ! InvocationFinishedMessage(invoker, isSuccess) + // Received an active-ack that has already been taken out of the state because of a timeout (forced active-ack). + // The result is ignored because a timeout has already been reported to the invokerPool per the force. logging.debug(this, s"received active ack for '$aid' which has no entry")(tid) case None => - // the entry has already been removed by an active ack. This part of the code is reached by the timeout. - // As the active ack is already processed we don't have to do anything here. + // The entry has already been removed by an active ack. This part of the code is reached by the timeout and can + // happen if active-ack and timeout happen roughly at the same time (the timeout was triggered before the active + // ack canceled the timer). As the active ack is already processed we don't have to do anything here. logging.debug(this, s"forced active ack for '$aid' which has no entry")(tid) } } @@ -446,12 +471,12 @@ object ShardingContainerPoolBalancer extends LoadBalancerProvider { if (numInvokers > 0) { val invoker = invokers(index) // If the current invoker is healthy and we can get a slot - if (invoker.status == Healthy && dispatched(invoker.id.toInt).tryAcquire()) { + if (invoker.status.isUsable && dispatched(invoker.id.toInt).tryAcquire()) { Some(invoker.id) } else { // If we've gone through all invokers if (stepsDone == numInvokers + 1) { - val healthyInvokers = invokers.filter(_.status == Healthy) + val healthyInvokers = invokers.filter(_.status.isUsable) if (healthyInvokers.nonEmpty) { // Choose a healthy invoker randomly val random = healthyInvokers(ThreadLocalRandom.current().nextInt(healthyInvokers.size)).id diff --git a/tests/performance/preparation/deploy.sh b/tests/performance/preparation/deploy.sh index 6e01097..ba4377e 100755 --- a/tests/performance/preparation/deploy.sh +++ b/tests/performance/preparation/deploy.sh @@ -25,7 +25,7 @@ TERM=dumb ./gradlew distDocker -PdockerImagePrefix=testing $GRADLE_PROJS_SKIP # Deploy Openwhisk cd $ROOTDIR/ansible -ANSIBLE_CMD="$ANSIBLE_CMD -e limit_invocations_per_minute=999999 -e limit_invocations_concurrent=999999 -e limit_invocations_concurrent_system=999999 -e controller_client_auth=false" +ANSIBLE_CMD="$ANSIBLE_CMD -e limit_invocations_per_minute=999999 -e limit_invocations_concurrent=999999 -e controller_client_auth=false" $ANSIBLE_CMD setup.yml $ANSIBLE_CMD prereq.yml diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala index 36d0e42..7537ab0 100644 --- a/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala +++ b/tests/src/test/scala/whisk/core/loadBalancer/test/InvokerSupervisionTests.scala @@ -51,13 +51,12 @@ import whisk.core.entity.ActivationId.ActivationIdGenerator import whisk.core.entity._ import whisk.core.loadBalancer.ActivationRequest import whisk.core.loadBalancer.GetStatus -import whisk.core.loadBalancer.Healthy +import whisk.core.loadBalancer.InvokerState._ +import whisk.core.loadBalancer.InvocationFinishedResult import whisk.core.loadBalancer.InvocationFinishedMessage import whisk.core.loadBalancer.InvokerActor import whisk.core.loadBalancer.InvokerPool import whisk.core.loadBalancer.InvokerState -import whisk.core.loadBalancer.Offline -import whisk.core.loadBalancer.UnHealthy import whisk.core.loadBalancer.InvokerHealth import whisk.utils.retry import whisk.core.connector.test.TestConnector @@ -163,7 +162,7 @@ class InvokerSupervisionTests allStates(supervisor) shouldBe zipWithInstance(IndexedSeq(Healthy)) // Send message and expect receive in invoker - val msg = InvocationFinishedMessage(invokerInstance, true) + val msg = InvocationFinishedMessage(invokerInstance, InvocationFinishedResult.Success) supervisor ! msg invoker.expectMsg(msg) } @@ -214,34 +213,68 @@ class InvokerSupervisionTests within(timeout.duration) { pool.send(invoker, SubscribeTransitionCallBack(pool.ref)) - pool.expectMsg(CurrentState(invoker, UnHealthy)) + pool.expectMsg(CurrentState(invoker, Unhealthy)) timeout(invoker) - pool.expectMsg(Transition(invoker, UnHealthy, Offline)) + pool.expectMsg(Transition(invoker, Unhealthy, Offline)) invoker ! PingMessage(InvokerInstanceId(0)) - pool.expectMsg(Transition(invoker, Offline, UnHealthy)) + pool.expectMsg(Transition(invoker, Offline, Unhealthy)) } } - // unhealthy -> healthy + // unhealthy -> healthy -> unhealthy -> healthy it should "goto healthy again, if unhealthy and error buffer has enough successful invocations" in { val pool = TestProbe() val invoker = pool.system.actorOf(InvokerActor.props(InvokerInstanceId(0), ControllerInstanceId("0"))) within(timeout.duration) { pool.send(invoker, SubscribeTransitionCallBack(pool.ref)) - pool.expectMsg(CurrentState(invoker, UnHealthy)) + pool.expectMsg(CurrentState(invoker, Unhealthy)) + + (1 to InvokerActor.bufferSize).foreach { _ => + invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.Success) + } + pool.expectMsg(Transition(invoker, Unhealthy, Healthy)) // Fill buffer with errors (1 to InvokerActor.bufferSize).foreach { _ => - invoker ! InvocationFinishedMessage(InvokerInstanceId(0), false) + invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.SystemError) + } + pool.expectMsg(Transition(invoker, Healthy, Unhealthy)) + + // Fill buffer with successful invocations to become healthy again (one below errorTolerance) + (1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ => + invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.Success) + } + pool.expectMsg(Transition(invoker, Unhealthy, Healthy)) + } + } + + // unhealthy -> healthy -> overloaded -> healthy + it should "goto healthy again, if overloaded and error buffer has enough successful invocations" in { + val pool = TestProbe() + val invoker = pool.system.actorOf(InvokerActor.props(InvokerInstanceId(0), ControllerInstanceId("0"))) + + within(timeout.duration) { + pool.send(invoker, SubscribeTransitionCallBack(pool.ref)) + pool.expectMsg(CurrentState(invoker, Unhealthy)) + + (1 to InvokerActor.bufferSize).foreach { _ => + invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.Success) + } + pool.expectMsg(Transition(invoker, Unhealthy, Healthy)) + + // Fill buffer with timeouts + (1 to InvokerActor.bufferSize).foreach { _ => + invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.Timeout) } + pool.expectMsg(Transition(invoker, Healthy, Unresponsible)) // Fill buffer with successful invocations to become healthy again (one below errorTolerance) (1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ => - invoker ! InvocationFinishedMessage(InvokerInstanceId(0), true) + invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.Success) } - pool.expectMsg(Transition(invoker, UnHealthy, Healthy)) + pool.expectMsg(Transition(invoker, Unresponsible, Healthy)) } } @@ -253,25 +286,25 @@ class InvokerSupervisionTests within(timeout.duration) { pool.send(invoker, SubscribeTransitionCallBack(pool.ref)) - pool.expectMsg(CurrentState(invoker, UnHealthy)) + pool.expectMsg(CurrentState(invoker, Unhealthy)) timeout(invoker) - pool.expectMsg(Transition(invoker, UnHealthy, Offline)) + pool.expectMsg(Transition(invoker, Unhealthy, Offline)) invoker ! PingMessage(InvokerInstanceId(0)) - pool.expectMsg(Transition(invoker, Offline, UnHealthy)) + pool.expectMsg(Transition(invoker, Offline, Unhealthy)) } } it should "start timer to send testactions when unhealthy" in { val invoker = TestFSMRef(new InvokerActor(InvokerInstanceId(0), ControllerInstanceId("0"))) - invoker.stateName shouldBe UnHealthy + invoker.stateName shouldBe Unhealthy invoker.isTimerActive(InvokerActor.timerName) shouldBe true // Fill buffer with successful invocations to become healthy again (one below errorTolerance) (1 to InvokerActor.bufferSize - InvokerActor.bufferErrorTolerance).foreach { _ => - invoker ! InvocationFinishedMessage(InvokerInstanceId(0), true) + invoker ! InvocationFinishedMessage(InvokerInstanceId(0), InvocationFinishedResult.Success) } invoker.stateName shouldBe Healthy diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala index 96a2cc3..0755b99 100644 --- a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala +++ b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala @@ -24,6 +24,7 @@ import org.scalatest.junit.JUnitRunner import whisk.common.{ForcableSemaphore, TransactionId} import whisk.core.entity.InvokerInstanceId import whisk.core.loadBalancer._ +import whisk.core.loadBalancer.InvokerState._ /** * Unit tests for the ContainerPool object. @@ -36,7 +37,7 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str behavior of "ShardingContainerPoolBalancerState" def healthy(i: Int) = new InvokerHealth(InvokerInstanceId(i), Healthy) - def unhealthy(i: Int) = new InvokerHealth(InvokerInstanceId(i), UnHealthy) + def unhealthy(i: Int) = new InvokerHealth(InvokerInstanceId(i), Unhealthy) def offline(i: Int) = new InvokerHealth(InvokerInstanceId(i), Offline) def semaphores(count: Int, max: Int): IndexedSeq[ForcableSemaphore] =