cbickel closed pull request #3424: Send active-ack in any case of a parseable message. URL: https://github.com/apache/incubator-openwhisk/pull/3424
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala index 0f466d3246..97d2008db9 100644 --- a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala +++ b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala @@ -205,6 +205,8 @@ object Messages { } } + val namespacesBlacklisted = "The action was not invoked due to a blacklisted namespace." + val actionRemovedWhileInvoking = "Action could not be found or may have been deleted." val actionMismatchWhileInvoking = "Action version is not compatible and cannot be invoked." val actionFetchErrorWhileInvoking = "Action could not be fetched." diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala index afe0c899e2..0729103a6f 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala @@ -37,7 +37,7 @@ import whisk.core.entity.size._ import whisk.http.Messages import whisk.spi.SpiLoader -import scala.concurrent.Future +import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import scala.util.{Failure, Success} @@ -46,8 +46,8 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa logging: Logging) { implicit val materializer: ActorMaterializer = ActorMaterializer() - implicit val ec = actorSystem.dispatcher - implicit val cfg = config + implicit val ec: ExecutionContext = actorSystem.dispatcher + implicit val cfg: WhiskConfig = config private val logsProvider = SpiLoader.get[LogStoreProvider].logStore(actorSystem) logging.info(this, s"LogStoreProvider: ${logsProvider.getClass}") @@ -59,7 +59,7 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa * task or actor because further operation does not make sense if something * goes wrong here. Initialization will throw an exception upon failure. */ - val containerFactory = + private val containerFactory = SpiLoader .get[ContainerFactoryProvider] .getContainerFactory( @@ -90,26 +90,26 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa } /** Initialize message consumers */ - val topic = s"invoker${instance.toInt}" - val maximumContainers = config.invokerNumCore.toInt * config.invokerCoreShare.toInt - val msgProvider = SpiLoader.get[MessagingProvider] - val consumer = msgProvider.getConsumer( + private val topic = s"invoker${instance.toInt}" + private val maximumContainers = config.invokerNumCore.toInt * config.invokerCoreShare.toInt + private val msgProvider = SpiLoader.get[MessagingProvider] + private val consumer = msgProvider.getConsumer( config, topic, topic, maximumContainers, maxPollInterval = TimeLimit.MAX_DURATION + 1.minute) - val activationFeed = actorSystem.actorOf(Props { + private val activationFeed = actorSystem.actorOf(Props { new MessageFeed("activation", logging, consumer, maximumContainers, 500.milliseconds, processActivationMessage) }) /** Sends an active-ack. */ - val ack = (tid: TransactionId, - activationResult: WhiskActivation, - blockingInvoke: Boolean, - controllerInstance: InstanceId) => { - implicit val transid = tid + private val ack = (tid: TransactionId, + activationResult: WhiskActivation, + blockingInvoke: Boolean, + controllerInstance: InstanceId) => { + implicit val transid: TransactionId = tid def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = { val msg = CompletionMessage(transid, res, instance) @@ -129,8 +129,8 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa } /** Stores an activation in the database. */ - val store = (tid: TransactionId, activation: WhiskActivation) => { - implicit val transid = tid + private val store = (tid: TransactionId, activation: WhiskActivation) => { + implicit val transid: TransactionId = tid logging.debug(this, "recording the activation result to the data store") WhiskActivation.put(activationStore, activation)(tid, notifier = None).andThen { case Success(id) => logging.debug(this, s"recorded activation") @@ -139,18 +139,16 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa } /** Creates a ContainerProxy Actor when being called. */ - val childFactory = (f: ActorRefFactory) => + private val childFactory = (f: ActorRefFactory) => f.actorOf(ContainerProxy.props(containerFactory.createContainer, ack, store, logsProvider.collectLogs, instance)) - val prewarmKind = "nodejs:6" - val prewarmExec = ExecManifest.runtimesManifest + private val prewarmKind = "nodejs:6" + private val prewarmExec = ExecManifest.runtimesManifest .resolveDefaultRuntime(prewarmKind) - .map { manifest => - new CodeExecAsString(manifest, "", None) - } + .map(manifest => CodeExecAsString(manifest, "", None)) .get - val pool = actorSystem.actorOf( + private val pool = actorSystem.actorOf( ContainerPool.props( childFactory, maximumContainers, @@ -163,92 +161,99 @@ class InvokerReactive(config: WhiskConfig, instance: InstanceId, producer: Messa Future(ActivationMessage.parse(new String(bytes, StandardCharsets.UTF_8))) .flatMap(Future.fromTry) .flatMap { msg => + // The message has been parsed correctly, thus the following code needs to *always* produce at least an + // active-ack. + + implicit val transid: TransactionId = msg.transid + if (!namespaceBlacklist.isBlacklisted(msg.user)) { - Future.successful(msg) - } else { - Future.failed(NamespaceBlacklistedException(msg.user.namespace.name)) - } - } - .filter(_.action.version.isDefined) - .flatMap { msg => - implicit val transid = msg.transid + val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION, logLevel = InfoLevel) + val namespace = msg.action.path + val name = msg.action.name + val actionid = FullyQualifiedEntityName(namespace, name).toDocId.asDocInfo(msg.revision) + val subject = msg.user.subject - val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION, logLevel = InfoLevel) - val namespace = msg.action.path - val name = msg.action.name - val actionid = FullyQualifiedEntityName(namespace, name).toDocId.asDocInfo(msg.revision) - val subject = msg.user.subject + logging.debug(this, s"${actionid.id} $subject ${msg.activationId}") - logging.debug(this, s"${actionid.id} $subject ${msg.activationId}") + // caching is enabled since actions have revision id and an updated + // action will not hit in the cache due to change in the revision id; + // if the doc revision is missing, then bypass cache + if (actionid.rev == DocRevision.empty) logging.warn(this, s"revision was not provided for ${actionid.id}") - // caching is enabled since actions have revision id and an updated - // action will not hit in the cache due to change in the revision id; - // if the doc revision is missing, then bypass cache - if (actionid.rev == DocRevision.empty) { - logging.warn(this, s"revision was not provided for ${actionid.id}") - } + WhiskAction + .get(entityStore, actionid.id, actionid.rev, fromCache = actionid.rev != DocRevision.empty) + .flatMap { action => + action.toExecutableWhiskAction match { + case Some(executable) => + pool ! Run(executable, msg) + Future.successful(()) + case None => + logging.error(this, s"non-executable action reached the invoker ${action.fullyQualifiedName(false)}") + Future.failed(new IllegalStateException("non-executable action reached the invoker")) + } + } + .recoverWith { + case t => + // If the action cannot be found, the user has concurrently deleted it, + // making this an application error. All other errors are considered system + // errors and should cause the invoker to be considered unhealthy. + val response = t match { + case _: NoDocumentException => + ActivationResponse.applicationError(Messages.actionRemovedWhileInvoking) + case _: DocumentTypeMismatchException | _: DocumentUnreadable => + ActivationResponse.whiskError(Messages.actionMismatchWhileInvoking) + case _ => + ActivationResponse.whiskError(Messages.actionFetchErrorWhileInvoking) + } - WhiskAction - .get(entityStore, actionid.id, actionid.rev, fromCache = actionid.rev != DocRevision.empty) - .flatMap { action => - action.toExecutableWhiskAction match { - case Some(executable) => - pool ! Run(executable, msg) + val activation = generateFallbackActivation(msg, response) + activationFeed ! MessageFeed.Processed + ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex) + store(msg.transid, activation) Future.successful(()) - case None => - logging.error(this, s"non-executable action reached the invoker ${action.fullyQualifiedName(false)}") - Future.failed(new IllegalStateException("non-executable action reached the invoker")) } - } - .recoverWith { - case t => - // If the action cannot be found, the user has concurrently deleted it, - // making this an application error. All other errors are considered system - // errors and should cause the invoker to be considered unhealthy. - val response = t match { - case _: NoDocumentException => - ActivationResponse.applicationError(Messages.actionRemovedWhileInvoking) - case _: DocumentTypeMismatchException | _: DocumentUnreadable => - ActivationResponse.whiskError(Messages.actionMismatchWhileInvoking) - case _ => - ActivationResponse.whiskError(Messages.actionFetchErrorWhileInvoking) - } - val now = Instant.now - val causedBy = if (msg.causedBySequence) { - Some(Parameters(WhiskActivation.causedByAnnotation, JsString(Exec.SEQUENCE))) - } else None - val activation = WhiskActivation( - activationId = msg.activationId, - namespace = msg.user.namespace.toPath, - subject = msg.user.subject, - cause = msg.cause, - name = msg.action.name, - version = msg.action.version.getOrElse(SemVer()), - start = now, - end = now, - duration = Some(0), - response = response, - annotations = { - Parameters(WhiskActivation.pathAnnotation, JsString(msg.action.asString)) ++ causedBy - }) - - activationFeed ! MessageFeed.Processed - ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex) - store(msg.transid, activation) - Future.successful(()) - } + } else { + // Iff the current namespace is blacklisted, an active-ack is only produced to keep the loadbalancer protocol + // Due to the protective nature of the blacklist, a database entry is not written. + activationFeed ! MessageFeed.Processed + val activation = + generateFallbackActivation(msg, ActivationResponse.applicationError(Messages.namespacesBlacklisted)) + ack(msg.transid, activation, false, msg.rootControllerIndex) + logging.warn(this, s"namespace ${msg.user.namespace} was blocked in invoker.") + Future.successful(()) + } } .recoverWith { case t => // Iff everything above failed, we have a terminal error at hand. Either the message failed // to deserialize, or something threw an error where it is not expected to throw. activationFeed ! MessageFeed.Processed - t match { - case nse: NamespaceBlacklistedException => logging.warn(this, nse.getMessage) - case _ => logging.error(this, s"terminal failure while processing message: $t") - } + logging.error(this, s"terminal failure while processing message: $t") Future.successful(()) } } + /** Generates an activation with zero runtime. Usually used for error cases */ + private def generateFallbackActivation(msg: ActivationMessage, response: ActivationResponse): WhiskActivation = { + val now = Instant.now + val causedBy = if (msg.causedBySequence) { + Some(Parameters(WhiskActivation.causedByAnnotation, JsString(Exec.SEQUENCE))) + } else None + + WhiskActivation( + activationId = msg.activationId, + namespace = msg.user.namespace.toPath, + subject = msg.user.subject, + cause = msg.cause, + name = msg.action.name, + version = msg.action.version.getOrElse(SemVer()), + start = now, + end = now, + duration = Some(0), + response = response, + annotations = { + Parameters(WhiskActivation.pathAnnotation, JsString(msg.action.asString)) ++ causedBy + }) + } + } diff --git a/core/invoker/src/main/scala/whisk/core/invoker/NamespaceBlacklist.scala b/core/invoker/src/main/scala/whisk/core/invoker/NamespaceBlacklist.scala index 9909c82dfd..4f4336e319 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/NamespaceBlacklist.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/NamespaceBlacklist.scala @@ -73,6 +73,3 @@ object NamespaceBlacklist { /** Configuration relevant to the namespace blacklist */ case class NamespaceBlacklistConfig(pollInterval: FiniteDuration) - -/** Indicates the activation was stopped due to a blacklisted identity */ -case class NamespaceBlacklistedException(ns: String) extends Exception(s"Namespace $ns was blocked in invoker.") ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services