gaborgsomogyi commented on a change in pull request #26397: [SPARK-29755][CORE]
Provide @JsonDeserialize for Option[Long] in LogInfo & AttemptInfoWrapper
URL: https://github.com/apache/spark/pull/26397#discussion_r344110822
##########
File path:
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
##########
@@ -1283,6 +1283,56 @@ class FsHistoryProviderSuite extends SparkFunSuite with
Matchers with Logging {
assert(deserializedOldObj.isComplete === false)
}
+ test("SPARK-29755 LogInfo should be serialized/deserialized by jackson
properly") {
+ def assertSerDe(serializer: KVStoreScalaSerializer, info: LogInfo): Unit =
{
+ val infoAfterSerDe = serializer.deserialize(serializer.serialize(info),
classOf[LogInfo])
+ assert(infoAfterSerDe === info)
+ assertOptionAfterSerde(infoAfterSerDe.lastIndex, info.lastIndex)
+ }
+
+ val serializer = new KVStoreScalaSerializer()
+ val logInfoWithIndexAsNone = LogInfo("dummy", 0, LogType.EventLogs,
Some("appId"),
+ Some("attemptId"), 100, None, false)
+ assertSerDe(serializer, logInfoWithIndexAsNone)
+
+ val logInfoWithIndex = LogInfo("dummy", 0, LogType.EventLogs,
Some("appId"),
+ Some("attemptId"), 100, Some(3), false)
+ assertSerDe(serializer, logInfoWithIndex)
+ }
+
+ test("SPARK-29755 AttemptInfoWrapper should be serialized/deserialized by
jackson properly") {
+ def assertSerDe(serializer: KVStoreScalaSerializer, attempt:
AttemptInfoWrapper): Unit = {
+ val attemptAfterSerDe =
serializer.deserialize(serializer.serialize(attempt),
+ classOf[AttemptInfoWrapper])
+ assert(attemptAfterSerDe.info === attempt.info)
+ // skip comparing some fields, as they've not triggered SPARK-29755
+ assertOptionAfterSerde(attemptAfterSerDe.lastIndex, attempt.lastIndex)
+ }
+
+ val serializer = new KVStoreScalaSerializer()
+ val appInfo = new ApplicationAttemptInfo(None, new Date(1), new Date(1),
new Date(1),
+ 10, "spark", false, "dummy")
+ val attemptInfoWithIndexAsNone = new AttemptInfoWrapper(appInfo,
"dummyPath", 10, None,
+ None, None, None, None)
+ assertSerDe(serializer, attemptInfoWithIndexAsNone)
+
+ val attemptInfoWithIndex = new AttemptInfoWrapper(appInfo, "dummyPath",
10, Some(1),
+ None, None, None, None)
+ assertSerDe(serializer, attemptInfoWithIndex)
+ }
+
+ private def assertOptionAfterSerde(opt: Option[Long], expected:
Option[Long]): Unit = {
+ if (expected.isEmpty) {
+ assert(opt.isEmpty)
+ } else {
+ // The issue happens only the value in Option is being unboxed. Here we
ensure unboxing
Review comment:
s/happens only the/happens only when the
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]