Github user juanrh commented on a diff in the pull request:
https://github.com/apache/spark/pull/19583#discussion_r148075569
--- Diff: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ---
@@ -51,7 +51,26 @@ private case class ExecutorRegistered(executorId: String)
private case class ExecutorRemoved(executorId: String)
-private[spark] case class HeartbeatResponse(reregisterBlockManager:
Boolean)
+private[spark] case class UpdatedEpoch(epoch: Long)
+
+private[spark] object HeartbeatResponse {
+ def apply(reregisterBlockManager: Boolean,
+ updatedEpoch: Option[Long] = None): HeartbeatResponse =
+
updatedEpoch.fold[HeartbeatResponse](BasicHeartbeatResponse(reregisterBlockManager))
{
+ epoch => HeartbeatResponseWithEpoch(reregisterBlockManager,
Some(epoch))
+ }
+}
+
+private[spark] sealed trait HeartbeatResponse {
+ def reregisterBlockManager: Boolean
+ def updatedEpoch: Option[Long] = None
+}
+
+private[spark] case class BasicHeartbeatResponse(reregisterBlockManager:
Boolean)
+ extends HeartbeatResponse
+private[spark] case class
HeartbeatResponseWithEpoch(reregisterBlockManager: Boolean,
+ override val
updatedEpoch: Option[Long])
+ extends HeartbeatResponse
--- End diff --
The idea was saving a bit of space when serializing, because
`BasicHeartbeatResponse` doesn't even have an `updatedEpoch` field but encodes
the `None` in the type. As `BasicHeartbeatResponse` is what we send most of the
time, then we only pay for the additional optional field when we actually have
an updated epoch to send. But this might be a tiny optimization that is not
worth the complexity
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]