cbickel closed pull request #3424: Send active-ack in any case of a parseable 
message.
URL: https://github.com/apache/incubator-openwhisk/pull/3424
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala 
b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
index 0f466d3246..97d2008db9 100644
--- a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
+++ b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala
@@ -205,6 +205,8 @@ object Messages {
     }
   }
 
+  val namespacesBlacklisted = "The action was not invoked due to a blacklisted 
namespace."
+
   val actionRemovedWhileInvoking = "Action could not be found or may have been 
deleted."
   val actionMismatchWhileInvoking = "Action version is not compatible and 
cannot be invoked."
   val actionFetchErrorWhileInvoking = "Action could not be fetched."
diff --git 
a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala 
b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
index afe0c899e2..0729103a6f 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala
@@ -37,7 +37,7 @@ import whisk.core.entity.size._
 import whisk.http.Messages
 import whisk.spi.SpiLoader
 
-import scala.concurrent.Future
+import scala.concurrent.{ExecutionContext, Future}
 import scala.concurrent.duration._
 import scala.util.{Failure, Success}
 
@@ -46,8 +46,8 @@ class InvokerReactive(config: WhiskConfig, instance: 
InstanceId, producer: Messa
   logging: Logging) {
 
   implicit val materializer: ActorMaterializer = ActorMaterializer()
-  implicit val ec = actorSystem.dispatcher
-  implicit val cfg = config
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  implicit val cfg: WhiskConfig = config
 
   private val logsProvider = 
SpiLoader.get[LogStoreProvider].logStore(actorSystem)
   logging.info(this, s"LogStoreProvider: ${logsProvider.getClass}")
@@ -59,7 +59,7 @@ class InvokerReactive(config: WhiskConfig, instance: 
InstanceId, producer: Messa
    * task or actor because further operation does not make sense if something
    * goes wrong here. Initialization will throw an exception upon failure.
    */
-  val containerFactory =
+  private val containerFactory =
     SpiLoader
       .get[ContainerFactoryProvider]
       .getContainerFactory(
@@ -90,26 +90,26 @@ class InvokerReactive(config: WhiskConfig, instance: 
InstanceId, producer: Messa
   }
 
   /** Initialize message consumers */
-  val topic = s"invoker${instance.toInt}"
-  val maximumContainers = config.invokerNumCore.toInt * 
config.invokerCoreShare.toInt
-  val msgProvider = SpiLoader.get[MessagingProvider]
-  val consumer = msgProvider.getConsumer(
+  private val topic = s"invoker${instance.toInt}"
+  private val maximumContainers = config.invokerNumCore.toInt * 
config.invokerCoreShare.toInt
+  private val msgProvider = SpiLoader.get[MessagingProvider]
+  private val consumer = msgProvider.getConsumer(
     config,
     topic,
     topic,
     maximumContainers,
     maxPollInterval = TimeLimit.MAX_DURATION + 1.minute)
 
-  val activationFeed = actorSystem.actorOf(Props {
+  private val activationFeed = actorSystem.actorOf(Props {
     new MessageFeed("activation", logging, consumer, maximumContainers, 
500.milliseconds, processActivationMessage)
   })
 
   /** Sends an active-ack. */
-  val ack = (tid: TransactionId,
-             activationResult: WhiskActivation,
-             blockingInvoke: Boolean,
-             controllerInstance: InstanceId) => {
-    implicit val transid = tid
+  private val ack = (tid: TransactionId,
+                     activationResult: WhiskActivation,
+                     blockingInvoke: Boolean,
+                     controllerInstance: InstanceId) => {
+    implicit val transid: TransactionId = tid
 
     def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = 
false) = {
       val msg = CompletionMessage(transid, res, instance)
@@ -129,8 +129,8 @@ class InvokerReactive(config: WhiskConfig, instance: 
InstanceId, producer: Messa
   }
 
   /** Stores an activation in the database. */
-  val store = (tid: TransactionId, activation: WhiskActivation) => {
-    implicit val transid = tid
+  private val store = (tid: TransactionId, activation: WhiskActivation) => {
+    implicit val transid: TransactionId = tid
     logging.debug(this, "recording the activation result to the data store")
     WhiskActivation.put(activationStore, activation)(tid, notifier = 
None).andThen {
       case Success(id) => logging.debug(this, s"recorded activation")
@@ -139,18 +139,16 @@ class InvokerReactive(config: WhiskConfig, instance: 
InstanceId, producer: Messa
   }
 
   /** Creates a ContainerProxy Actor when being called. */
-  val childFactory = (f: ActorRefFactory) =>
+  private val childFactory = (f: ActorRefFactory) =>
     f.actorOf(ContainerProxy.props(containerFactory.createContainer, ack, 
store, logsProvider.collectLogs, instance))
 
-  val prewarmKind = "nodejs:6"
-  val prewarmExec = ExecManifest.runtimesManifest
+  private val prewarmKind = "nodejs:6"
+  private val prewarmExec = ExecManifest.runtimesManifest
     .resolveDefaultRuntime(prewarmKind)
-    .map { manifest =>
-      new CodeExecAsString(manifest, "", None)
-    }
+    .map(manifest => CodeExecAsString(manifest, "", None))
     .get
 
-  val pool = actorSystem.actorOf(
+  private val pool = actorSystem.actorOf(
     ContainerPool.props(
       childFactory,
       maximumContainers,
@@ -163,92 +161,99 @@ class InvokerReactive(config: WhiskConfig, instance: 
InstanceId, producer: Messa
     Future(ActivationMessage.parse(new String(bytes, StandardCharsets.UTF_8)))
       .flatMap(Future.fromTry)
       .flatMap { msg =>
+        // The message has been parsed correctly, thus the following code 
needs to *always* produce at least an
+        // active-ack.
+
+        implicit val transid: TransactionId = msg.transid
+
         if (!namespaceBlacklist.isBlacklisted(msg.user)) {
-          Future.successful(msg)
-        } else {
-          Future.failed(NamespaceBlacklistedException(msg.user.namespace.name))
-        }
-      }
-      .filter(_.action.version.isDefined)
-      .flatMap { msg =>
-        implicit val transid = msg.transid
+          val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION, 
logLevel = InfoLevel)
+          val namespace = msg.action.path
+          val name = msg.action.name
+          val actionid = FullyQualifiedEntityName(namespace, 
name).toDocId.asDocInfo(msg.revision)
+          val subject = msg.user.subject
 
-        val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION, 
logLevel = InfoLevel)
-        val namespace = msg.action.path
-        val name = msg.action.name
-        val actionid = FullyQualifiedEntityName(namespace, 
name).toDocId.asDocInfo(msg.revision)
-        val subject = msg.user.subject
+          logging.debug(this, s"${actionid.id} $subject ${msg.activationId}")
 
-        logging.debug(this, s"${actionid.id} $subject ${msg.activationId}")
+          // caching is enabled since actions have revision id and an updated
+          // action will not hit in the cache due to change in the revision id;
+          // if the doc revision is missing, then bypass cache
+          if (actionid.rev == DocRevision.empty) logging.warn(this, s"revision 
was not provided for ${actionid.id}")
 
-        // caching is enabled since actions have revision id and an updated
-        // action will not hit in the cache due to change in the revision id;
-        // if the doc revision is missing, then bypass cache
-        if (actionid.rev == DocRevision.empty) {
-          logging.warn(this, s"revision was not provided for ${actionid.id}")
-        }
+          WhiskAction
+            .get(entityStore, actionid.id, actionid.rev, fromCache = 
actionid.rev != DocRevision.empty)
+            .flatMap { action =>
+              action.toExecutableWhiskAction match {
+                case Some(executable) =>
+                  pool ! Run(executable, msg)
+                  Future.successful(())
+                case None =>
+                  logging.error(this, s"non-executable action reached the 
invoker ${action.fullyQualifiedName(false)}")
+                  Future.failed(new IllegalStateException("non-executable 
action reached the invoker"))
+              }
+            }
+            .recoverWith {
+              case t =>
+                // If the action cannot be found, the user has concurrently 
deleted it,
+                // making this an application error. All other errors are 
considered system
+                // errors and should cause the invoker to be considered 
unhealthy.
+                val response = t match {
+                  case _: NoDocumentException =>
+                    
ActivationResponse.applicationError(Messages.actionRemovedWhileInvoking)
+                  case _: DocumentTypeMismatchException | _: 
DocumentUnreadable =>
+                    
ActivationResponse.whiskError(Messages.actionMismatchWhileInvoking)
+                  case _ =>
+                    
ActivationResponse.whiskError(Messages.actionFetchErrorWhileInvoking)
+                }
 
-        WhiskAction
-          .get(entityStore, actionid.id, actionid.rev, fromCache = 
actionid.rev != DocRevision.empty)
-          .flatMap { action =>
-            action.toExecutableWhiskAction match {
-              case Some(executable) =>
-                pool ! Run(executable, msg)
+                val activation = generateFallbackActivation(msg, response)
+                activationFeed ! MessageFeed.Processed
+                ack(msg.transid, activation, msg.blocking, 
msg.rootControllerIndex)
+                store(msg.transid, activation)
                 Future.successful(())
-              case None =>
-                logging.error(this, s"non-executable action reached the 
invoker ${action.fullyQualifiedName(false)}")
-                Future.failed(new IllegalStateException("non-executable action 
reached the invoker"))
             }
-          }
-          .recoverWith {
-            case t =>
-              // If the action cannot be found, the user has concurrently 
deleted it,
-              // making this an application error. All other errors are 
considered system
-              // errors and should cause the invoker to be considered 
unhealthy.
-              val response = t match {
-                case _: NoDocumentException =>
-                  
ActivationResponse.applicationError(Messages.actionRemovedWhileInvoking)
-                case _: DocumentTypeMismatchException | _: DocumentUnreadable 
=>
-                  
ActivationResponse.whiskError(Messages.actionMismatchWhileInvoking)
-                case _ =>
-                  
ActivationResponse.whiskError(Messages.actionFetchErrorWhileInvoking)
-              }
-              val now = Instant.now
-              val causedBy = if (msg.causedBySequence) {
-                Some(Parameters(WhiskActivation.causedByAnnotation, 
JsString(Exec.SEQUENCE)))
-              } else None
-              val activation = WhiskActivation(
-                activationId = msg.activationId,
-                namespace = msg.user.namespace.toPath,
-                subject = msg.user.subject,
-                cause = msg.cause,
-                name = msg.action.name,
-                version = msg.action.version.getOrElse(SemVer()),
-                start = now,
-                end = now,
-                duration = Some(0),
-                response = response,
-                annotations = {
-                  Parameters(WhiskActivation.pathAnnotation, 
JsString(msg.action.asString)) ++ causedBy
-                })
-
-              activationFeed ! MessageFeed.Processed
-              ack(msg.transid, activation, msg.blocking, 
msg.rootControllerIndex)
-              store(msg.transid, activation)
-              Future.successful(())
-          }
+        } else {
+          // Iff the current namespace is blacklisted, an active-ack is only 
produced to keep the loadbalancer protocol
+          // Due to the protective nature of the blacklist, a database entry 
is not written.
+          activationFeed ! MessageFeed.Processed
+          val activation =
+            generateFallbackActivation(msg, 
ActivationResponse.applicationError(Messages.namespacesBlacklisted))
+          ack(msg.transid, activation, false, msg.rootControllerIndex)
+          logging.warn(this, s"namespace ${msg.user.namespace} was blocked in 
invoker.")
+          Future.successful(())
+        }
       }
       .recoverWith {
         case t =>
           // Iff everything above failed, we have a terminal error at hand. 
Either the message failed
           // to deserialize, or something threw an error where it is not 
expected to throw.
           activationFeed ! MessageFeed.Processed
-          t match {
-            case nse: NamespaceBlacklistedException => logging.warn(this, 
nse.getMessage)
-            case _                                  => logging.error(this, 
s"terminal failure while processing message: $t")
-          }
+          logging.error(this, s"terminal failure while processing message: $t")
           Future.successful(())
       }
   }
 
+  /** Generates an activation with zero runtime. Usually used for error cases 
*/
+  private def generateFallbackActivation(msg: ActivationMessage, response: 
ActivationResponse): WhiskActivation = {
+    val now = Instant.now
+    val causedBy = if (msg.causedBySequence) {
+      Some(Parameters(WhiskActivation.causedByAnnotation, 
JsString(Exec.SEQUENCE)))
+    } else None
+
+    WhiskActivation(
+      activationId = msg.activationId,
+      namespace = msg.user.namespace.toPath,
+      subject = msg.user.subject,
+      cause = msg.cause,
+      name = msg.action.name,
+      version = msg.action.version.getOrElse(SemVer()),
+      start = now,
+      end = now,
+      duration = Some(0),
+      response = response,
+      annotations = {
+        Parameters(WhiskActivation.pathAnnotation, 
JsString(msg.action.asString)) ++ causedBy
+      })
+  }
+
 }
diff --git 
a/core/invoker/src/main/scala/whisk/core/invoker/NamespaceBlacklist.scala 
b/core/invoker/src/main/scala/whisk/core/invoker/NamespaceBlacklist.scala
index 9909c82dfd..4f4336e319 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/NamespaceBlacklist.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/NamespaceBlacklist.scala
@@ -73,6 +73,3 @@ object NamespaceBlacklist {
 
 /** Configuration relevant to the namespace blacklist */
 case class NamespaceBlacklistConfig(pollInterval: FiniteDuration)
-
-/** Indicates the activation was stopped due to a blacklisted identity */
-case class NamespaceBlacklistedException(ns: String) extends 
Exception(s"Namespace $ns was blocked in invoker.")


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to