[GitHub] [openwhisk] chetanmeh commented on a change in pull request #4624: Combines active ack and slot release when both are available.

2019-09-19 Thread GitBox
chetanmeh commented on a change in pull request #4624: Combines active ack and 
slot release when both are available.
URL: https://github.com/apache/openwhisk/pull/4624#discussion_r326466033
 
 

 ##
 File path: 
core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
 ##
 @@ -49,14 +49,21 @@ object InvokerReactive extends InvokerProvider {
* are either completion messages for an activation to indicate a resource 
slot is free, or result-forwarding
* messages for continuations (e.g., sequences and conductor actions).
*
-   * @param TransactionId the transaction id for the activation
-   * @param WhiskActivaiton is the activation result
-   * @param Boolean is true iff the activation was a blocking request
-   * @param ControllerInstanceId the originating controller/loadbalancer id
-   * @param UUID is the UUID for the namespace owning the activation
-   * @param Boolean is true this is resource free message and false if this is 
a result forwarding message
+   * @param tid the transaction id for the activation
+   * @param activaiton is the activation result
+   * @param blockingInvoke is true iff the activation was a blocking request
+   * @param controllerInstance the originating controller/loadbalancer id
+   * @param userId is the UUID for the namespace owning the activation
+   * @param acknowledegment the acknowledgement message to send
*/
-  type ActiveAck = (TransactionId, WhiskActivation, Boolean, 
ControllerInstanceId, UUID, Boolean) => Future[Any]
+  trait ActiveAck {
+def apply(tid: TransactionId,
 
 Review comment:
   Nice use of `apply`! Minimizes the impact of switch


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [openwhisk] chetanmeh commented on a change in pull request #4624: Combines active ack and slot release when both are available.

2019-09-19 Thread GitBox
chetanmeh commented on a change in pull request #4624: Combines active ack and 
slot release when both are available.
URL: https://github.com/apache/openwhisk/pull/4624#discussion_r326205999
 
 

 ##
 File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
 ##
 @@ -64,100 +64,165 @@ case class ActivationMessage(override val transid: 
TransactionId,
   def causedBySequence: Boolean = cause.isDefined
 }
 
-object ActivationMessage extends DefaultJsonProtocol {
-
-  def parse(msg: String) = Try(serdes.read(msg.parseJson))
-
-  private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
-  implicit val serdes = jsonFormat11(ActivationMessage.apply)
-}
-
 /**
  * Message that is sent from the invoker to the controller after action is 
completed or after slot is free again for
  * new actions.
  */
 abstract class AcknowledegmentMessage(private val tid: TransactionId) extends 
Message {
   override val transid: TransactionId = tid
-  override def serialize: String = {
-AcknowledegmentMessage.serdes.write(this).compactPrint
-  }
+  override def serialize: String = 
AcknowledegmentMessage.serdes.write(this).compactPrint
+
+  /** Pithy descriptor for logging. */
+  def name: String
+
+  /** Does message indicate slot is free? */
+  def isSlotFree: Option[InvokerInstanceId]
+
+  /** Does message contain a result? */
+  def result: Option[Either[ActivationId, WhiskActivation]]
+
+  /**
+   * Is the acknowledgement for an activation that failed internally?
+   * For some message, this is not relevant and the result is None.
+   */
+  def isSystemError: Option[Boolean]
+
+  def activationId: ActivationId
+
+  /** Serializes the message to JSON. */
+  def toJson: JsValue
+
+  /**
+   * Converts the message to a more compact form if it cannot cross the 
message bus as is or some of its details are not necessary.
+   */
+  def shrink: AcknowledegmentMessage
 }
 
 /**
- * This message is sent from the invoker to the controller, after the slot of 
an invoker that has been used by the
- * current action, is free again (after log collection)
+ * This message is sent from an invoker to the controller in situtations when 
the resource slot and the action
+ * result are available at the same time, and so the split-phase notification 
is not necessary. Instead the message
+ * combines the `CompletionMessage` and `ResultMessage`. The `response` may be 
an `ActivationId` to allow for failures
+ * to send the activation result because of event-bus size limitations.
  */
-case class CompletionMessage(override val transid: TransactionId,
- activationId: ActivationId,
- isSystemError: Boolean,
- invoker: InvokerInstanceId)
+case class CombinedCompletionAndResultMessage(override val transid: 
TransactionId,
+  response: Either[ActivationId, 
WhiskActivation],
+  override val isSystemError: 
Option[Boolean],
+  invoker: InvokerInstanceId)
 extends AcknowledegmentMessage(transid) {
-
-  override def toString = {
-activationId.asString
-  }
+  override def name = "combined"
+  override def result = Some(response)
+  override def isSlotFree = Some(invoker)
+  override def activationId = response.fold(identity, _.activationId)
+  override def toJson = CombinedCompletionAndResultMessage.serdes.write(this)
+  override def shrink = copy(response = response.flatMap(a => 
Left(a.activationId)))
+  override def toString = response.fold(identity, _.activationId).asString
 }
 
-object CompletionMessage extends DefaultJsonProtocol {
-  def parse(msg: String): Try[CompletionMessage] = 
Try(serdes.read(msg.parseJson))
-  implicit val serdes = jsonFormat4(CompletionMessage.apply)
+/**
+ * This message is sent from an invoker to the controller, once the resource 
slot in the invoker (used by the
+ * corresponding activation) free again (i.e., after log collection). The 
`CompletionMessage` is part of a split
+ * phase notification to the load balancer where an invoker first sends a 
`ResultMessage` and later sends the
+ * `CompletionMessage`.
+ */
+case class CompletionMessage(override val transid: TransactionId,
+ override val activationId: ActivationId,
+ override val isSystemError: Option[Boolean],
+ invoker: InvokerInstanceId)
+extends AcknowledegmentMessage(transid) {
+  override def name = "completion"
+  override def result = None
+  override def isSlotFree = Some(invoker)
+  override def toJson = CompletionMessage.serdes.write(this)
+  override def shrink = this
+  override def toString = activationId.asString
 }
 
 /**
- * That message will be sent from the invoker to the controller after action 
completion if the user wants to have
- * the result immediately (blocking 

[GitHub] [openwhisk] chetanmeh commented on a change in pull request #4624: Combines active ack and slot release when both are available.

2019-09-19 Thread GitBox
chetanmeh commented on a change in pull request #4624: Combines active ack and 
slot release when both are available.
URL: https://github.com/apache/openwhisk/pull/4624#discussion_r326202534
 
 

 ##
 File path: 
core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
 ##
 @@ -628,8 +637,15 @@ class ContainerProxy(
 // completion message which frees a load balancer slot is sent after the 
active ack future
 // completes to ensure proper ordering.
 val sendResult = if (job.msg.blocking) {
-  activation.map(
-sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, 
job.msg.user.namespace.uuid, false))
+  activation.map { result =>
+sendActiveAck(
+  tid,
+  result,
+  job.msg.blocking,
+  job.msg.rootControllerIndex,
+  job.msg.user.namespace.uuid,
+  ResultMessage(tid, result))
 
 Review comment:
   Opened #4635 to track this


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [openwhisk] chetanmeh commented on a change in pull request #4624: Combines active ack and slot release when both are available.

2019-09-19 Thread GitBox
chetanmeh commented on a change in pull request #4624: Combines active ack and 
slot release when both are available.
URL: https://github.com/apache/openwhisk/pull/4624#discussion_r326054019
 
 

 ##
 File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
 ##
 @@ -64,100 +64,165 @@ case class ActivationMessage(override val transid: 
TransactionId,
   def causedBySequence: Boolean = cause.isDefined
 }
 
-object ActivationMessage extends DefaultJsonProtocol {
-
-  def parse(msg: String) = Try(serdes.read(msg.parseJson))
-
-  private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
-  implicit val serdes = jsonFormat11(ActivationMessage.apply)
-}
-
 /**
  * Message that is sent from the invoker to the controller after action is 
completed or after slot is free again for
  * new actions.
  */
 abstract class AcknowledegmentMessage(private val tid: TransactionId) extends 
Message {
   override val transid: TransactionId = tid
-  override def serialize: String = {
-AcknowledegmentMessage.serdes.write(this).compactPrint
-  }
+  override def serialize: String = 
AcknowledegmentMessage.serdes.write(this).compactPrint
+
+  /** Pithy descriptor for logging. */
+  def name: String
+
+  /** Does message indicate slot is free? */
+  def isSlotFree: Option[InvokerInstanceId]
+
+  /** Does message contain a result? */
+  def result: Option[Either[ActivationId, WhiskActivation]]
+
+  /**
+   * Is the acknowledgement for an activation that failed internally?
+   * For some message, this is not relevant and the result is None.
+   */
+  def isSystemError: Option[Boolean]
+
+  def activationId: ActivationId
+
+  /** Serializes the message to JSON. */
+  def toJson: JsValue
+
+  /**
+   * Converts the message to a more compact form if it cannot cross the 
message bus as is or some of its details are not necessary.
+   */
+  def shrink: AcknowledegmentMessage
 }
 
 /**
- * This message is sent from the invoker to the controller, after the slot of 
an invoker that has been used by the
- * current action, is free again (after log collection)
+ * This message is sent from an invoker to the controller in situtations when 
the resource slot and the action
+ * result are available at the same time, and so the split-phase notification 
is not necessary. Instead the message
+ * combines the `CompletionMessage` and `ResultMessage`. The `response` may be 
an `ActivationId` to allow for failures
+ * to send the activation result because of event-bus size limitations.
  */
-case class CompletionMessage(override val transid: TransactionId,
- activationId: ActivationId,
- isSystemError: Boolean,
- invoker: InvokerInstanceId)
+case class CombinedCompletionAndResultMessage(override val transid: 
TransactionId,
+  response: Either[ActivationId, 
WhiskActivation],
+  override val isSystemError: 
Option[Boolean],
+  invoker: InvokerInstanceId)
 extends AcknowledegmentMessage(transid) {
-
-  override def toString = {
-activationId.asString
-  }
+  override def name = "combined"
+  override def result = Some(response)
+  override def isSlotFree = Some(invoker)
+  override def activationId = response.fold(identity, _.activationId)
+  override def toJson = CombinedCompletionAndResultMessage.serdes.write(this)
+  override def shrink = copy(response = response.flatMap(a => 
Left(a.activationId)))
+  override def toString = response.fold(identity, _.activationId).asString
 }
 
-object CompletionMessage extends DefaultJsonProtocol {
-  def parse(msg: String): Try[CompletionMessage] = 
Try(serdes.read(msg.parseJson))
-  implicit val serdes = jsonFormat4(CompletionMessage.apply)
+/**
+ * This message is sent from an invoker to the controller, once the resource 
slot in the invoker (used by the
+ * corresponding activation) free again (i.e., after log collection). The 
`CompletionMessage` is part of a split
+ * phase notification to the load balancer where an invoker first sends a 
`ResultMessage` and later sends the
+ * `CompletionMessage`.
+ */
+case class CompletionMessage(override val transid: TransactionId,
+ override val activationId: ActivationId,
+ override val isSystemError: Option[Boolean],
+ invoker: InvokerInstanceId)
+extends AcknowledegmentMessage(transid) {
+  override def name = "completion"
+  override def result = None
+  override def isSlotFree = Some(invoker)
+  override def toJson = CompletionMessage.serdes.write(this)
+  override def shrink = this
+  override def toString = activationId.asString
 }
 
 /**
- * That message will be sent from the invoker to the controller after action 
completion if the user wants to have
- * the result immediately (blocking 

[GitHub] [openwhisk] chetanmeh commented on a change in pull request #4624: Combines active ack and slot release when both are available.

2019-09-19 Thread GitBox
chetanmeh commented on a change in pull request #4624: Combines active ack and 
slot release when both are available.
URL: https://github.com/apache/openwhisk/pull/4624#discussion_r325989534
 
 

 ##
 File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
 ##
 @@ -64,100 +64,165 @@ case class ActivationMessage(override val transid: 
TransactionId,
   def causedBySequence: Boolean = cause.isDefined
 }
 
-object ActivationMessage extends DefaultJsonProtocol {
-
-  def parse(msg: String) = Try(serdes.read(msg.parseJson))
-
-  private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
-  implicit val serdes = jsonFormat11(ActivationMessage.apply)
-}
-
 /**
  * Message that is sent from the invoker to the controller after action is 
completed or after slot is free again for
  * new actions.
  */
 abstract class AcknowledegmentMessage(private val tid: TransactionId) extends 
Message {
   override val transid: TransactionId = tid
-  override def serialize: String = {
-AcknowledegmentMessage.serdes.write(this).compactPrint
-  }
+  override def serialize: String = 
AcknowledegmentMessage.serdes.write(this).compactPrint
+
+  /** Pithy descriptor for logging. */
+  def name: String
+
+  /** Does message indicate slot is free? */
+  def isSlotFree: Option[InvokerInstanceId]
+
+  /** Does message contain a result? */
+  def result: Option[Either[ActivationId, WhiskActivation]]
+
+  /**
+   * Is the acknowledgement for an activation that failed internally?
+   * For some message, this is not relevant and the result is None.
+   */
+  def isSystemError: Option[Boolean]
+
+  def activationId: ActivationId
+
+  /** Serializes the message to JSON. */
+  def toJson: JsValue
+
+  /**
+   * Converts the message to a more compact form if it cannot cross the 
message bus as is or some of its details are not necessary.
+   */
+  def shrink: AcknowledegmentMessage
 }
 
 /**
- * This message is sent from the invoker to the controller, after the slot of 
an invoker that has been used by the
- * current action, is free again (after log collection)
+ * This message is sent from an invoker to the controller in situtations when 
the resource slot and the action
+ * result are available at the same time, and so the split-phase notification 
is not necessary. Instead the message
+ * combines the `CompletionMessage` and `ResultMessage`. The `response` may be 
an `ActivationId` to allow for failures
+ * to send the activation result because of event-bus size limitations.
  */
-case class CompletionMessage(override val transid: TransactionId,
- activationId: ActivationId,
- isSystemError: Boolean,
- invoker: InvokerInstanceId)
+case class CombinedCompletionAndResultMessage(override val transid: 
TransactionId,
+  response: Either[ActivationId, 
WhiskActivation],
+  override val isSystemError: 
Option[Boolean],
+  invoker: InvokerInstanceId)
 extends AcknowledegmentMessage(transid) {
-
-  override def toString = {
-activationId.asString
-  }
+  override def name = "combined"
+  override def result = Some(response)
+  override def isSlotFree = Some(invoker)
+  override def activationId = response.fold(identity, _.activationId)
+  override def toJson = CombinedCompletionAndResultMessage.serdes.write(this)
+  override def shrink = copy(response = response.flatMap(a => 
Left(a.activationId)))
+  override def toString = response.fold(identity, _.activationId).asString
 
 Review comment:
   Nit: Reuse `activationId.asString`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [openwhisk] chetanmeh commented on a change in pull request #4624: Combines active ack and slot release when both are available.

2019-09-19 Thread GitBox
chetanmeh commented on a change in pull request #4624: Combines active ack and 
slot release when both are available.
URL: https://github.com/apache/openwhisk/pull/4624#discussion_r325993909
 
 

 ##
 File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
 ##
 @@ -64,100 +64,165 @@ case class ActivationMessage(override val transid: 
TransactionId,
   def causedBySequence: Boolean = cause.isDefined
 }
 
-object ActivationMessage extends DefaultJsonProtocol {
-
-  def parse(msg: String) = Try(serdes.read(msg.parseJson))
-
-  private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
-  implicit val serdes = jsonFormat11(ActivationMessage.apply)
-}
-
 /**
  * Message that is sent from the invoker to the controller after action is 
completed or after slot is free again for
  * new actions.
  */
 abstract class AcknowledegmentMessage(private val tid: TransactionId) extends 
Message {
   override val transid: TransactionId = tid
-  override def serialize: String = {
-AcknowledegmentMessage.serdes.write(this).compactPrint
-  }
+  override def serialize: String = 
AcknowledegmentMessage.serdes.write(this).compactPrint
+
+  /** Pithy descriptor for logging. */
+  def name: String
+
+  /** Does message indicate slot is free? */
+  def isSlotFree: Option[InvokerInstanceId]
+
+  /** Does message contain a result? */
+  def result: Option[Either[ActivationId, WhiskActivation]]
+
+  /**
+   * Is the acknowledgement for an activation that failed internally?
+   * For some message, this is not relevant and the result is None.
+   */
+  def isSystemError: Option[Boolean]
+
+  def activationId: ActivationId
+
+  /** Serializes the message to JSON. */
+  def toJson: JsValue
+
+  /**
+   * Converts the message to a more compact form if it cannot cross the 
message bus as is or some of its details are not necessary.
+   */
+  def shrink: AcknowledegmentMessage
 }
 
 /**
- * This message is sent from the invoker to the controller, after the slot of 
an invoker that has been used by the
- * current action, is free again (after log collection)
+ * This message is sent from an invoker to the controller in situtations when 
the resource slot and the action
+ * result are available at the same time, and so the split-phase notification 
is not necessary. Instead the message
+ * combines the `CompletionMessage` and `ResultMessage`. The `response` may be 
an `ActivationId` to allow for failures
+ * to send the activation result because of event-bus size limitations.
  */
-case class CompletionMessage(override val transid: TransactionId,
- activationId: ActivationId,
- isSystemError: Boolean,
- invoker: InvokerInstanceId)
+case class CombinedCompletionAndResultMessage(override val transid: 
TransactionId,
+  response: Either[ActivationId, 
WhiskActivation],
+  override val isSystemError: 
Option[Boolean],
+  invoker: InvokerInstanceId)
 extends AcknowledegmentMessage(transid) {
-
-  override def toString = {
-activationId.asString
-  }
+  override def name = "combined"
+  override def result = Some(response)
+  override def isSlotFree = Some(invoker)
+  override def activationId = response.fold(identity, _.activationId)
+  override def toJson = CombinedCompletionAndResultMessage.serdes.write(this)
+  override def shrink = copy(response = response.flatMap(a => 
Left(a.activationId)))
+  override def toString = response.fold(identity, _.activationId).asString
 }
 
-object CompletionMessage extends DefaultJsonProtocol {
-  def parse(msg: String): Try[CompletionMessage] = 
Try(serdes.read(msg.parseJson))
-  implicit val serdes = jsonFormat4(CompletionMessage.apply)
+/**
+ * This message is sent from an invoker to the controller, once the resource 
slot in the invoker (used by the
+ * corresponding activation) free again (i.e., after log collection). The 
`CompletionMessage` is part of a split
+ * phase notification to the load balancer where an invoker first sends a 
`ResultMessage` and later sends the
+ * `CompletionMessage`.
+ */
+case class CompletionMessage(override val transid: TransactionId,
+ override val activationId: ActivationId,
+ override val isSystemError: Option[Boolean],
+ invoker: InvokerInstanceId)
+extends AcknowledegmentMessage(transid) {
+  override def name = "completion"
+  override def result = None
+  override def isSlotFree = Some(invoker)
+  override def toJson = CompletionMessage.serdes.write(this)
+  override def shrink = this
+  override def toString = activationId.asString
 }
 
 /**
- * That message will be sent from the invoker to the controller after action 
completion if the user wants to have
- * the result immediately (blocking 

[GitHub] [openwhisk] chetanmeh commented on a change in pull request #4624: Combines active ack and slot release when both are available.

2019-09-19 Thread GitBox
chetanmeh commented on a change in pull request #4624: Combines active ack and 
slot release when both are available.
URL: https://github.com/apache/openwhisk/pull/4624#discussion_r325988736
 
 

 ##
 File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
 ##
 @@ -64,100 +64,165 @@ case class ActivationMessage(override val transid: 
TransactionId,
   def causedBySequence: Boolean = cause.isDefined
 }
 
-object ActivationMessage extends DefaultJsonProtocol {
-
-  def parse(msg: String) = Try(serdes.read(msg.parseJson))
-
-  private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
-  implicit val serdes = jsonFormat11(ActivationMessage.apply)
-}
-
 /**
  * Message that is sent from the invoker to the controller after action is 
completed or after slot is free again for
  * new actions.
  */
 abstract class AcknowledegmentMessage(private val tid: TransactionId) extends 
Message {
   override val transid: TransactionId = tid
-  override def serialize: String = {
-AcknowledegmentMessage.serdes.write(this).compactPrint
-  }
+  override def serialize: String = 
AcknowledegmentMessage.serdes.write(this).compactPrint
+
+  /** Pithy descriptor for logging. */
+  def name: String
 
 Review comment:
   Message type makes more sense compared to message name. So may be rename to 
`messageType`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [openwhisk] chetanmeh commented on a change in pull request #4624: Combines active ack and slot release when both are available.

2019-09-19 Thread GitBox
chetanmeh commented on a change in pull request #4624: Combines active ack and 
slot release when both are available.
URL: https://github.com/apache/openwhisk/pull/4624#discussion_r325996077
 
 

 ##
 File path: 
core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala
 ##
 @@ -628,8 +637,15 @@ class ContainerProxy(
 // completion message which frees a load balancer slot is sent after the 
active ack future
 // completes to ensure proper ordering.
 val sendResult = if (job.msg.blocking) {
-  activation.map(
-sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, 
job.msg.user.namespace.uuid, false))
+  activation.map { result =>
+sendActiveAck(
+  tid,
+  result,
+  job.msg.blocking,
+  job.msg.rootControllerIndex,
+  job.msg.user.namespace.uuid,
+  ResultMessage(tid, result))
 
 Review comment:
   Later optimization: Here we can also send 
`CombinedCompletionAndResultMessage` if we know that logs are not to be 
collected for 
   
   1. `job.action.limits.logs.asMegaBytes == 0.MB`
   2. OR `LogDriverLogStore` is being used
   
   Specially in `LogDriverLogStore` based setup then `ResultMessage` would not 
be used then


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [openwhisk] chetanmeh commented on a change in pull request #4624: Combines active ack and slot release when both are available.

2019-09-19 Thread GitBox
chetanmeh commented on a change in pull request #4624: Combines active ack and 
slot release when both are available.
URL: https://github.com/apache/openwhisk/pull/4624#discussion_r325988193
 
 

 ##
 File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
 ##
 @@ -64,100 +64,165 @@ case class ActivationMessage(override val transid: 
TransactionId,
   def causedBySequence: Boolean = cause.isDefined
 }
 
-object ActivationMessage extends DefaultJsonProtocol {
-
-  def parse(msg: String) = Try(serdes.read(msg.parseJson))
-
-  private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
-  implicit val serdes = jsonFormat11(ActivationMessage.apply)
-}
-
 /**
  * Message that is sent from the invoker to the controller after action is 
completed or after slot is free again for
  * new actions.
  */
 abstract class AcknowledegmentMessage(private val tid: TransactionId) extends 
Message {
   override val transid: TransactionId = tid
-  override def serialize: String = {
-AcknowledegmentMessage.serdes.write(this).compactPrint
-  }
+  override def serialize: String = 
AcknowledegmentMessage.serdes.write(this).compactPrint
+
+  /** Pithy descriptor for logging. */
+  def name: String
+
+  /** Does message indicate slot is free? */
+  def isSlotFree: Option[InvokerInstanceId]
+
+  /** Does message contain a result? */
+  def result: Option[Either[ActivationId, WhiskActivation]]
+
+  /**
+   * Is the acknowledgement for an activation that failed internally?
+   * For some message, this is not relevant and the result is None.
+   */
+  def isSystemError: Option[Boolean]
+
+  def activationId: ActivationId
+
+  /** Serializes the message to JSON. */
+  def toJson: JsValue
+
+  /**
+   * Converts the message to a more compact form if it cannot cross the 
message bus as is or some of its details are not necessary.
+   */
+  def shrink: AcknowledegmentMessage
 }
 
 /**
- * This message is sent from the invoker to the controller, after the slot of 
an invoker that has been used by the
- * current action, is free again (after log collection)
+ * This message is sent from an invoker to the controller in situtations when 
the resource slot and the action
 
 Review comment:
   Nit: `situtations` -> `situations`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [openwhisk] chetanmeh commented on a change in pull request #4624: Combines active ack and slot release when both are available.

2019-09-19 Thread GitBox
chetanmeh commented on a change in pull request #4624: Combines active ack and 
slot release when both are available.
URL: https://github.com/apache/openwhisk/pull/4624#discussion_r325991407
 
 

 ##
 File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
 ##
 @@ -64,100 +64,165 @@ case class ActivationMessage(override val transid: 
TransactionId,
   def causedBySequence: Boolean = cause.isDefined
 }
 
-object ActivationMessage extends DefaultJsonProtocol {
-
-  def parse(msg: String) = Try(serdes.read(msg.parseJson))
-
-  private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
-  implicit val serdes = jsonFormat11(ActivationMessage.apply)
-}
-
 /**
  * Message that is sent from the invoker to the controller after action is 
completed or after slot is free again for
  * new actions.
  */
 abstract class AcknowledegmentMessage(private val tid: TransactionId) extends 
Message {
   override val transid: TransactionId = tid
-  override def serialize: String = {
-AcknowledegmentMessage.serdes.write(this).compactPrint
-  }
+  override def serialize: String = 
AcknowledegmentMessage.serdes.write(this).compactPrint
+
+  /** Pithy descriptor for logging. */
+  def name: String
+
+  /** Does message indicate slot is free? */
+  def isSlotFree: Option[InvokerInstanceId]
+
+  /** Does message contain a result? */
+  def result: Option[Either[ActivationId, WhiskActivation]]
+
+  /**
+   * Is the acknowledgement for an activation that failed internally?
+   * For some message, this is not relevant and the result is None.
+   */
+  def isSystemError: Option[Boolean]
+
+  def activationId: ActivationId
+
+  /** Serializes the message to JSON. */
+  def toJson: JsValue
+
+  /**
+   * Converts the message to a more compact form if it cannot cross the 
message bus as is or some of its details are not necessary.
+   */
+  def shrink: AcknowledegmentMessage
 }
 
 /**
- * This message is sent from the invoker to the controller, after the slot of 
an invoker that has been used by the
- * current action, is free again (after log collection)
+ * This message is sent from an invoker to the controller in situtations when 
the resource slot and the action
+ * result are available at the same time, and so the split-phase notification 
is not necessary. Instead the message
+ * combines the `CompletionMessage` and `ResultMessage`. The `response` may be 
an `ActivationId` to allow for failures
+ * to send the activation result because of event-bus size limitations.
  */
-case class CompletionMessage(override val transid: TransactionId,
- activationId: ActivationId,
- isSystemError: Boolean,
- invoker: InvokerInstanceId)
+case class CombinedCompletionAndResultMessage(override val transid: 
TransactionId,
+  response: Either[ActivationId, 
WhiskActivation],
+  override val isSystemError: 
Option[Boolean],
+  invoker: InvokerInstanceId)
 extends AcknowledegmentMessage(transid) {
-
-  override def toString = {
-activationId.asString
-  }
+  override def name = "combined"
+  override def result = Some(response)
+  override def isSlotFree = Some(invoker)
+  override def activationId = response.fold(identity, _.activationId)
+  override def toJson = CombinedCompletionAndResultMessage.serdes.write(this)
+  override def shrink = copy(response = response.flatMap(a => 
Left(a.activationId)))
+  override def toString = response.fold(identity, _.activationId).asString
 }
 
-object CompletionMessage extends DefaultJsonProtocol {
-  def parse(msg: String): Try[CompletionMessage] = 
Try(serdes.read(msg.parseJson))
-  implicit val serdes = jsonFormat4(CompletionMessage.apply)
+/**
+ * This message is sent from an invoker to the controller, once the resource 
slot in the invoker (used by the
+ * corresponding activation) free again (i.e., after log collection). The 
`CompletionMessage` is part of a split
+ * phase notification to the load balancer where an invoker first sends a 
`ResultMessage` and later sends the
+ * `CompletionMessage`.
+ */
+case class CompletionMessage(override val transid: TransactionId,
+ override val activationId: ActivationId,
+ override val isSystemError: Option[Boolean],
+ invoker: InvokerInstanceId)
+extends AcknowledegmentMessage(transid) {
+  override def name = "completion"
+  override def result = None
+  override def isSlotFree = Some(invoker)
+  override def toJson = CompletionMessage.serdes.write(this)
+  override def shrink = this
+  override def toString = activationId.asString
 }
 
 /**
- * That message will be sent from the invoker to the controller after action 
completion if the user wants to have
- * the result immediately (blocking 

[GitHub] [openwhisk] chetanmeh commented on a change in pull request #4624: Combines active ack and slot release when both are available.

2019-09-19 Thread GitBox
chetanmeh commented on a change in pull request #4624: Combines active ack and 
slot release when both are available.
URL: https://github.com/apache/openwhisk/pull/4624#discussion_r325990234
 
 

 ##
 File path: 
core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala
 ##
 @@ -54,9 +54,10 @@ object InvokerReactive extends InvokerProvider {
* @param Boolean is true iff the activation was a blocking request
* @param ControllerInstanceId the originating controller/loadbalancer id
* @param UUID is the UUID for the namespace owning the activation
-   * @param Boolean is true this is resource free message and false if this is 
a result forwarding message
+   * @param AcknowledegmentMessage the acknowledgement message to send
*/
-  type ActiveAck = (TransactionId, WhiskActivation, Boolean, 
ControllerInstanceId, UUID, Boolean) => Future[Any]
+  type ActiveAck =
 
 Review comment:
   Minor observation (or Rant!) - I  always struggle in IDE whenever I want to 
see impl of parameters which are specified as function signature instead of 
trait (like in `ActiveAck`). Here things are bit simpler as we specify a `type` 
alias which narrows down the search. Otherwise one need to check the whole call 
hierarchy to understand where is. the actual impl
   
Having a trait enables easier checking of possible implementations to 
understand the code flow. This is pre existing stuff ... but may be later we 
refactor it and use a proper trait for such critical flows


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [openwhisk] chetanmeh commented on a change in pull request #4624: Combines active ack and slot release when both are available.

2019-09-18 Thread GitBox
chetanmeh commented on a change in pull request #4624: Combines active ack and 
slot release when both are available.
URL: https://github.com/apache/openwhisk/pull/4624#discussion_r325677965
 
 

 ##
 File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
 ##
 @@ -64,43 +64,34 @@ case class ActivationMessage(override val transid: 
TransactionId,
   def causedBySequence: Boolean = cause.isDefined
 }
 
-object ActivationMessage extends DefaultJsonProtocol {
-
-  def parse(msg: String) = Try(serdes.read(msg.parseJson))
-
-  private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
-  implicit val serdes = jsonFormat11(ActivationMessage.apply)
-}
-
 /**
  * Message that is sent from the invoker to the controller after action is 
completed or after slot is free again for
  * new actions.
  */
 abstract class AcknowledegmentMessage(private val tid: TransactionId) extends 
Message {
   override val transid: TransactionId = tid
-  override def serialize: String = {
-AcknowledegmentMessage.serdes.write(this).compactPrint
-  }
+  override def serialize: String = 
AcknowledegmentMessage.serdes.write(this).compactPrint
+  def toJson: JsValue
 }
 
 /**
- * This message is sent from the invoker to the controller, after the slot of 
an invoker that has been used by the
- * current action, is free again (after log collection)
+ * This message is sent from an invoker to the controller, after the resource 
slot in the invoke, used by the
+ * corresponding activation, is free again (i.e., after log collection). In 
some cases, the activation result is
+ * ready and the slot is freed at the same time. In such cases, the completion 
message carries the result as well.
+ * This is reflected by the of a Right() `response` and the param `result` is 
set to true.
+ * In some cases the `result` is true but the response is Left() if the 
message was too large for the message bus.
  */
 case class CompletionMessage(override val transid: TransactionId,
- activationId: ActivationId,
- isSystemError: Boolean,
+ response: Either[ActivationId, WhiskActivation],
+ result: Boolean, // true iff the message is a 
combined active ack and slot released
 
 Review comment:
   @rabbah As you change the current design I would also like to highlight a 
potential change I need for my activation persister service work (#4632)
   
   Per my current understand the `sendActiveAck` flow is like
   
   - `isSlotFree` false - active ack/ResultMessage
   - blocking - `WhiskActivation`
   - non-blocking - NONE
   - `isSlotFree` true  - result/CompletionMessage
   - blocking - `CompletionMessage`
   - non-blocking - `CompletionMessage`
   
   Later I would like to have a configurable support for sending 
`WhiskActivation` for non blocking call as well to a custom topic. Just wanted 
to make you aware on that 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [openwhisk] chetanmeh commented on a change in pull request #4624: Combines active ack and slot release when both are available.

2019-09-18 Thread GitBox
chetanmeh commented on a change in pull request #4624: Combines active ack and 
slot release when both are available.
URL: https://github.com/apache/openwhisk/pull/4624#discussion_r325677965
 
 

 ##
 File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
 ##
 @@ -64,43 +64,34 @@ case class ActivationMessage(override val transid: 
TransactionId,
   def causedBySequence: Boolean = cause.isDefined
 }
 
-object ActivationMessage extends DefaultJsonProtocol {
-
-  def parse(msg: String) = Try(serdes.read(msg.parseJson))
-
-  private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
-  implicit val serdes = jsonFormat11(ActivationMessage.apply)
-}
-
 /**
  * Message that is sent from the invoker to the controller after action is 
completed or after slot is free again for
  * new actions.
  */
 abstract class AcknowledegmentMessage(private val tid: TransactionId) extends 
Message {
   override val transid: TransactionId = tid
-  override def serialize: String = {
-AcknowledegmentMessage.serdes.write(this).compactPrint
-  }
+  override def serialize: String = 
AcknowledegmentMessage.serdes.write(this).compactPrint
+  def toJson: JsValue
 }
 
 /**
- * This message is sent from the invoker to the controller, after the slot of 
an invoker that has been used by the
- * current action, is free again (after log collection)
+ * This message is sent from an invoker to the controller, after the resource 
slot in the invoke, used by the
+ * corresponding activation, is free again (i.e., after log collection). In 
some cases, the activation result is
+ * ready and the slot is freed at the same time. In such cases, the completion 
message carries the result as well.
+ * This is reflected by the of a Right() `response` and the param `result` is 
set to true.
+ * In some cases the `result` is true but the response is Left() if the 
message was too large for the message bus.
  */
 case class CompletionMessage(override val transid: TransactionId,
- activationId: ActivationId,
- isSystemError: Boolean,
+ response: Either[ActivationId, WhiskActivation],
+ result: Boolean, // true iff the message is a 
combined active ack and slot released
 
 Review comment:
   @rabbah As you change the current design I would also like to highlight a 
potential change I need for my activation persister service work
   
   Per my current understand the `sendActiveAck` flow is like
   
   - `isSlotFree` false - active ack/ResultMessage
   - blocking - `WhiskActivation`
   - non-blocking - NONE
   - `isSlotFree` true  - result/CompletionMessage
   - blocking - `CompletionMessage`
   - non-blocking - `CompletionMessage`
   
   Later I would like to have a configurable support for sending 
`WhiskActivation` for non blocking call as well to a custom topic. Just wanted 
to make you aware on that 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [openwhisk] chetanmeh commented on a change in pull request #4624: Combines active ack and slot release when both are available.

2019-09-16 Thread GitBox
chetanmeh commented on a change in pull request #4624: Combines active ack and 
slot release when both are available.
URL: https://github.com/apache/openwhisk/pull/4624#discussion_r324789458
 
 

 ##
 File path: 
common/scala/src/main/scala/org/apache/openwhisk/core/connector/Message.scala
 ##
 @@ -64,43 +64,34 @@ case class ActivationMessage(override val transid: 
TransactionId,
   def causedBySequence: Boolean = cause.isDefined
 }
 
-object ActivationMessage extends DefaultJsonProtocol {
-
-  def parse(msg: String) = Try(serdes.read(msg.parseJson))
-
-  private implicit val fqnSerdes = FullyQualifiedEntityName.serdes
-  implicit val serdes = jsonFormat11(ActivationMessage.apply)
-}
-
 /**
  * Message that is sent from the invoker to the controller after action is 
completed or after slot is free again for
  * new actions.
  */
 abstract class AcknowledegmentMessage(private val tid: TransactionId) extends 
Message {
   override val transid: TransactionId = tid
-  override def serialize: String = {
-AcknowledegmentMessage.serdes.write(this).compactPrint
-  }
+  override def serialize: String = 
AcknowledegmentMessage.serdes.write(this).compactPrint
+  def toJson: JsValue
 }
 
 /**
- * This message is sent from the invoker to the controller, after the slot of 
an invoker that has been used by the
- * current action, is free again (after log collection)
+ * This message is sent from an invoker to the controller, after the resource 
slot in the invoke, used by the
+ * corresponding activation, is free again (i.e., after log collection). In 
some cases, the activation result is
+ * ready and the slot is freed at the same time. In such cases, the completion 
message carries the result as well.
+ * This is reflected by the of a Right() `response` and the param `result` is 
set to true.
+ * In some cases the `result` is true but the response is Left() if the 
message was too large for the message bus.
  */
 case class CompletionMessage(override val transid: TransactionId,
- activationId: ActivationId,
- isSystemError: Boolean,
+ response: Either[ActivationId, WhiskActivation],
+ result: Boolean, // true iff the message is a 
combined active ack and slot released
 
 Review comment:
   Would this change cause some compatibility issue when there would be a 
gradual roll out of new invokers in an existing cluster.
   
   So for we do not have any comparability contract for messages exchange via 
Kafka


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services