Repository: spark
Updated Branches:
  refs/heads/master 11973a7bd -> de8eefa80


[SPARK-1385] Use existing code for JSON de/serialization of BlockId

`BlockId.scala` offers a way to reconstruct a BlockId from a string through 
regex matching. `util/JsonProtocol.scala` duplicates this functionality by 
explicitly matching on the BlockId type.
With this PR, the de/serialization of BlockIds will go through the first 
(older) code path.

(Most of the line changes in this PR involve changing `==` to `===` in 
`JsonProtocolSuite.scala`)

Author: Andrew Or <andrewo...@gmail.com>

Closes #289 from andrewor14/blockid-json and squashes the following commits:

409d226 [Andrew Or] Simplify JSON de/serialization for BlockId


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de8eefa8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de8eefa8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de8eefa8

Branch: refs/heads/master
Commit: de8eefa804e229635eaa29a78b9e9ce161ac58e1
Parents: 11973a7
Author: Andrew Or <andrewo...@gmail.com>
Authored: Wed Apr 2 10:43:09 2014 -0700
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Wed Apr 2 10:43:09 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/util/JsonProtocol.scala    |  77 +---------
 .../apache/spark/util/JsonProtocolSuite.scala   | 141 +++++++++----------
 2 files changed, 72 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/de8eefa8/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 346f2b7..d9a6af6 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -195,7 +195,7 @@ private[spark] object JsonProtocol {
       
taskMetrics.shuffleWriteMetrics.map(shuffleWriteMetricsToJson).getOrElse(JNothing)
     val updatedBlocks = taskMetrics.updatedBlocks.map { blocks =>
         JArray(blocks.toList.map { case (id, status) =>
-          ("Block ID" -> blockIdToJson(id)) ~
+          ("Block ID" -> id.toString) ~
           ("Status" -> blockStatusToJson(status))
         })
       }.getOrElse(JNothing)
@@ -284,35 +284,6 @@ private[spark] object JsonProtocol {
     ("Replication" -> storageLevel.replication)
   }
 
-  def blockIdToJson(blockId: BlockId): JValue = {
-    val blockType = Utils.getFormattedClassName(blockId)
-    val json: JObject = blockId match {
-      case rddBlockId: RDDBlockId =>
-        ("RDD ID" -> rddBlockId.rddId) ~
-        ("Split Index" -> rddBlockId.splitIndex)
-      case shuffleBlockId: ShuffleBlockId =>
-        ("Shuffle ID" -> shuffleBlockId.shuffleId) ~
-        ("Map ID" -> shuffleBlockId.mapId) ~
-        ("Reduce ID" -> shuffleBlockId.reduceId)
-      case broadcastBlockId: BroadcastBlockId =>
-        "Broadcast ID" -> broadcastBlockId.broadcastId
-      case broadcastHelperBlockId: BroadcastHelperBlockId =>
-        ("Broadcast Block ID" -> 
blockIdToJson(broadcastHelperBlockId.broadcastId)) ~
-        ("Helper Type" -> broadcastHelperBlockId.hType)
-      case taskResultBlockId: TaskResultBlockId =>
-        "Task ID" -> taskResultBlockId.taskId
-      case streamBlockId: StreamBlockId =>
-        ("Stream ID" -> streamBlockId.streamId) ~
-        ("Unique ID" -> streamBlockId.uniqueId)
-      case tempBlockId: TempBlockId =>
-        val uuid = UUIDToJson(tempBlockId.id)
-        "Temp ID" -> uuid
-      case testBlockId: TestBlockId =>
-        "Test ID" -> testBlockId.id
-    }
-    ("Type" -> blockType) ~ json
-  }
-
   def blockStatusToJson(blockStatus: BlockStatus): JValue = {
     val storageLevel = storageLevelToJson(blockStatus.storageLevel)
     ("Storage Level" -> storageLevel) ~
@@ -513,7 +484,7 @@ private[spark] object JsonProtocol {
       Utils.jsonOption(json \ "Shuffle Write 
Metrics").map(shuffleWriteMetricsFromJson)
     metrics.updatedBlocks = Utils.jsonOption(json \ "Updated Blocks").map { 
value =>
       value.extract[List[JValue]].map { block =>
-        val id = blockIdFromJson(block \ "Block ID")
+        val id = BlockId((block \ "Block ID").extract[String])
         val status = blockStatusFromJson(block \ "Status")
         (id, status)
       }
@@ -616,50 +587,6 @@ private[spark] object JsonProtocol {
     StorageLevel(useDisk, useMemory, deserialized, replication)
   }
 
-  def blockIdFromJson(json: JValue): BlockId = {
-    val rddBlockId = Utils.getFormattedClassName(RDDBlockId)
-    val shuffleBlockId = Utils.getFormattedClassName(ShuffleBlockId)
-    val broadcastBlockId = Utils.getFormattedClassName(BroadcastBlockId)
-    val broadcastHelperBlockId = 
Utils.getFormattedClassName(BroadcastHelperBlockId)
-    val taskResultBlockId = Utils.getFormattedClassName(TaskResultBlockId)
-    val streamBlockId = Utils.getFormattedClassName(StreamBlockId)
-    val tempBlockId = Utils.getFormattedClassName(TempBlockId)
-    val testBlockId = Utils.getFormattedClassName(TestBlockId)
-
-    (json \ "Type").extract[String] match {
-      case `rddBlockId` =>
-        val rddId = (json \ "RDD ID").extract[Int]
-        val splitIndex = (json \ "Split Index").extract[Int]
-        new RDDBlockId(rddId, splitIndex)
-      case `shuffleBlockId` =>
-        val shuffleId = (json \ "Shuffle ID").extract[Int]
-        val mapId = (json \ "Map ID").extract[Int]
-        val reduceId = (json \ "Reduce ID").extract[Int]
-        new ShuffleBlockId(shuffleId, mapId, reduceId)
-      case `broadcastBlockId` =>
-        val broadcastId = (json \ "Broadcast ID").extract[Long]
-        new BroadcastBlockId(broadcastId)
-      case `broadcastHelperBlockId` =>
-        val broadcastBlockId =
-          blockIdFromJson(json \ "Broadcast Block 
ID").asInstanceOf[BroadcastBlockId]
-        val hType = (json \ "Helper Type").extract[String]
-        new BroadcastHelperBlockId(broadcastBlockId, hType)
-      case `taskResultBlockId` =>
-        val taskId = (json \ "Task ID").extract[Long]
-        new TaskResultBlockId(taskId)
-      case `streamBlockId` =>
-        val streamId = (json \ "Stream ID").extract[Int]
-        val uniqueId = (json \ "Unique ID").extract[Long]
-        new StreamBlockId(streamId, uniqueId)
-      case `tempBlockId` =>
-        val tempId = UUIDFromJson(json \ "Temp ID")
-        new TempBlockId(tempId)
-      case `testBlockId` =>
-        val testId = (json \ "Test ID").extract[String]
-        new TestBlockId(testId)
-    }
-  }
-
   def blockStatusFromJson(json: JValue): BlockStatus = {
     val storageLevel = storageLevelFromJson(json \ "Storage Level")
     val memorySize = (json \ "Memory Size").extract[Long]

http://git-wip-us.apache.org/repos/asf/spark/blob/de8eefa8/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 67c0a43..40c2901 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -112,7 +112,6 @@ class JsonProtocolSuite extends FunSuite {
     testBlockId(BroadcastHelperBlockId(BroadcastBlockId(2L), "Spark"))
     testBlockId(TaskResultBlockId(1L))
     testBlockId(StreamBlockId(1, 2L))
-    testBlockId(TempBlockId(UUID.randomUUID()))
   }
 
 
@@ -168,8 +167,8 @@ class JsonProtocolSuite extends FunSuite {
   }
 
   private def testBlockId(blockId: BlockId) {
-    val newBlockId = 
JsonProtocol.blockIdFromJson(JsonProtocol.blockIdToJson(blockId))
-    blockId == newBlockId
+    val newBlockId = BlockId(blockId.toString)
+    assert(blockId === newBlockId)
   }
 
 
@@ -180,90 +179,90 @@ class JsonProtocolSuite extends FunSuite {
   private def assertEquals(event1: SparkListenerEvent, event2: 
SparkListenerEvent) {
     (event1, event2) match {
       case (e1: SparkListenerStageSubmitted, e2: SparkListenerStageSubmitted) 
=>
-        assert(e1.properties == e2.properties)
+        assert(e1.properties === e2.properties)
         assertEquals(e1.stageInfo, e2.stageInfo)
       case (e1: SparkListenerStageCompleted, e2: SparkListenerStageCompleted) 
=>
         assertEquals(e1.stageInfo, e2.stageInfo)
       case (e1: SparkListenerTaskStart, e2: SparkListenerTaskStart) =>
-        assert(e1.stageId == e2.stageId)
+        assert(e1.stageId === e2.stageId)
         assertEquals(e1.taskInfo, e2.taskInfo)
       case (e1: SparkListenerTaskGettingResult, e2: 
SparkListenerTaskGettingResult) =>
         assertEquals(e1.taskInfo, e2.taskInfo)
       case (e1: SparkListenerTaskEnd, e2: SparkListenerTaskEnd) =>
-        assert(e1.stageId == e2.stageId)
-        assert(e1.taskType == e2.taskType)
+        assert(e1.stageId === e2.stageId)
+        assert(e1.taskType === e2.taskType)
         assertEquals(e1.reason, e2.reason)
         assertEquals(e1.taskInfo, e2.taskInfo)
         assertEquals(e1.taskMetrics, e2.taskMetrics)
       case (e1: SparkListenerJobStart, e2: SparkListenerJobStart) =>
-        assert(e1.jobId == e2.jobId)
-        assert(e1.properties == e2.properties)
-        assertSeqEquals(e1.stageIds, e2.stageIds, (i1: Int, i2: Int) => 
assert(i1 == i2))
+        assert(e1.jobId === e2.jobId)
+        assert(e1.properties === e2.properties)
+        assertSeqEquals(e1.stageIds, e2.stageIds, (i1: Int, i2: Int) => 
assert(i1 === i2))
       case (e1: SparkListenerJobEnd, e2: SparkListenerJobEnd) =>
-        assert(e1.jobId == e2.jobId)
+        assert(e1.jobId === e2.jobId)
         assertEquals(e1.jobResult, e2.jobResult)
       case (e1: SparkListenerEnvironmentUpdate, e2: 
SparkListenerEnvironmentUpdate) =>
         assertEquals(e1.environmentDetails, e2.environmentDetails)
       case (e1: SparkListenerBlockManagerAdded, e2: 
SparkListenerBlockManagerAdded) =>
-        assert(e1.maxMem == e2.maxMem)
+        assert(e1.maxMem === e2.maxMem)
         assertEquals(e1.blockManagerId, e2.blockManagerId)
       case (e1: SparkListenerBlockManagerRemoved, e2: 
SparkListenerBlockManagerRemoved) =>
         assertEquals(e1.blockManagerId, e2.blockManagerId)
       case (e1: SparkListenerUnpersistRDD, e2: SparkListenerUnpersistRDD) =>
-        assert(e1.rddId == e2.rddId)
+        assert(e1.rddId === e2.rddId)
       case (SparkListenerShutdown, SparkListenerShutdown) =>
       case _ => fail("Events don't match in types!")
     }
   }
 
   private def assertEquals(info1: StageInfo, info2: StageInfo) {
-    assert(info1.stageId == info2.stageId)
-    assert(info1.name == info2.name)
-    assert(info1.numTasks == info2.numTasks)
-    assert(info1.submissionTime == info2.submissionTime)
-    assert(info1.completionTime == info2.completionTime)
-    assert(info1.emittedTaskSizeWarning == info2.emittedTaskSizeWarning)
+    assert(info1.stageId === info2.stageId)
+    assert(info1.name === info2.name)
+    assert(info1.numTasks === info2.numTasks)
+    assert(info1.submissionTime === info2.submissionTime)
+    assert(info1.completionTime === info2.completionTime)
+    assert(info1.emittedTaskSizeWarning === info2.emittedTaskSizeWarning)
     assertEquals(info1.rddInfo, info2.rddInfo)
   }
 
   private def assertEquals(info1: RDDInfo, info2: RDDInfo) {
-    assert(info1.id == info2.id)
-    assert(info1.name == info2.name)
-    assert(info1.numPartitions == info2.numPartitions)
-    assert(info1.numCachedPartitions == info2.numCachedPartitions)
-    assert(info1.memSize == info2.memSize)
-    assert(info1.diskSize == info2.diskSize)
+    assert(info1.id === info2.id)
+    assert(info1.name === info2.name)
+    assert(info1.numPartitions === info2.numPartitions)
+    assert(info1.numCachedPartitions === info2.numCachedPartitions)
+    assert(info1.memSize === info2.memSize)
+    assert(info1.diskSize === info2.diskSize)
     assertEquals(info1.storageLevel, info2.storageLevel)
   }
 
   private def assertEquals(level1: StorageLevel, level2: StorageLevel) {
-    assert(level1.useDisk == level2.useDisk)
-    assert(level1.useMemory == level2.useMemory)
-    assert(level1.deserialized == level2.deserialized)
-    assert(level1.replication == level2.replication)
+    assert(level1.useDisk === level2.useDisk)
+    assert(level1.useMemory === level2.useMemory)
+    assert(level1.deserialized === level2.deserialized)
+    assert(level1.replication === level2.replication)
   }
 
   private def assertEquals(info1: TaskInfo, info2: TaskInfo) {
-    assert(info1.taskId == info2.taskId)
-    assert(info1.index == info2.index)
-    assert(info1.launchTime == info2.launchTime)
-    assert(info1.executorId == info2.executorId)
-    assert(info1.host == info2.host)
-    assert(info1.taskLocality == info2.taskLocality)
-    assert(info1.gettingResultTime == info2.gettingResultTime)
-    assert(info1.finishTime == info2.finishTime)
-    assert(info1.failed == info2.failed)
-    assert(info1.serializedSize == info2.serializedSize)
+    assert(info1.taskId === info2.taskId)
+    assert(info1.index === info2.index)
+    assert(info1.launchTime === info2.launchTime)
+    assert(info1.executorId === info2.executorId)
+    assert(info1.host === info2.host)
+    assert(info1.taskLocality === info2.taskLocality)
+    assert(info1.gettingResultTime === info2.gettingResultTime)
+    assert(info1.finishTime === info2.finishTime)
+    assert(info1.failed === info2.failed)
+    assert(info1.serializedSize === info2.serializedSize)
   }
 
   private def assertEquals(metrics1: TaskMetrics, metrics2: TaskMetrics) {
-    assert(metrics1.hostname == metrics2.hostname)
-    assert(metrics1.executorDeserializeTime == 
metrics2.executorDeserializeTime)
-    assert(metrics1.resultSize == metrics2.resultSize)
-    assert(metrics1.jvmGCTime == metrics2.jvmGCTime)
-    assert(metrics1.resultSerializationTime == 
metrics2.resultSerializationTime)
-    assert(metrics1.memoryBytesSpilled == metrics2.memoryBytesSpilled)
-    assert(metrics1.diskBytesSpilled == metrics2.diskBytesSpilled)
+    assert(metrics1.hostname === metrics2.hostname)
+    assert(metrics1.executorDeserializeTime === 
metrics2.executorDeserializeTime)
+    assert(metrics1.resultSize === metrics2.resultSize)
+    assert(metrics1.jvmGCTime === metrics2.jvmGCTime)
+    assert(metrics1.resultSerializationTime === 
metrics2.resultSerializationTime)
+    assert(metrics1.memoryBytesSpilled === metrics2.memoryBytesSpilled)
+    assert(metrics1.diskBytesSpilled === metrics2.diskBytesSpilled)
     assertOptionEquals(
       metrics1.shuffleReadMetrics, metrics2.shuffleReadMetrics, 
assertShuffleReadEquals)
     assertOptionEquals(
@@ -272,31 +271,31 @@ class JsonProtocolSuite extends FunSuite {
   }
 
   private def assertEquals(metrics1: ShuffleReadMetrics, metrics2: 
ShuffleReadMetrics) {
-    assert(metrics1.shuffleFinishTime == metrics2.shuffleFinishTime)
-    assert(metrics1.totalBlocksFetched == metrics2.totalBlocksFetched)
-    assert(metrics1.remoteBlocksFetched == metrics2.remoteBlocksFetched)
-    assert(metrics1.localBlocksFetched == metrics2.localBlocksFetched)
-    assert(metrics1.fetchWaitTime == metrics2.fetchWaitTime)
-    assert(metrics1.remoteBytesRead == metrics2.remoteBytesRead)
+    assert(metrics1.shuffleFinishTime === metrics2.shuffleFinishTime)
+    assert(metrics1.totalBlocksFetched === metrics2.totalBlocksFetched)
+    assert(metrics1.remoteBlocksFetched === metrics2.remoteBlocksFetched)
+    assert(metrics1.localBlocksFetched === metrics2.localBlocksFetched)
+    assert(metrics1.fetchWaitTime === metrics2.fetchWaitTime)
+    assert(metrics1.remoteBytesRead === metrics2.remoteBytesRead)
   }
 
   private def assertEquals(metrics1: ShuffleWriteMetrics, metrics2: 
ShuffleWriteMetrics) {
-    assert(metrics1.shuffleBytesWritten == metrics2.shuffleBytesWritten)
-    assert(metrics1.shuffleWriteTime == metrics2.shuffleWriteTime)
+    assert(metrics1.shuffleBytesWritten === metrics2.shuffleBytesWritten)
+    assert(metrics1.shuffleWriteTime === metrics2.shuffleWriteTime)
   }
 
   private def assertEquals(bm1: BlockManagerId, bm2: BlockManagerId) {
-    assert(bm1.executorId == bm2.executorId)
-    assert(bm1.host == bm2.host)
-    assert(bm1.port == bm2.port)
-    assert(bm1.nettyPort == bm2.nettyPort)
+    assert(bm1.executorId === bm2.executorId)
+    assert(bm1.host === bm2.host)
+    assert(bm1.port === bm2.port)
+    assert(bm1.nettyPort === bm2.nettyPort)
   }
 
   private def assertEquals(result1: JobResult, result2: JobResult) {
     (result1, result2) match {
       case (JobSucceeded, JobSucceeded) =>
       case (r1: JobFailed, r2: JobFailed) =>
-        assert(r1.failedStageId == r2.failedStageId)
+        assert(r1.failedStageId === r2.failedStageId)
         assertEquals(r1.exception, r2.exception)
       case _ => fail("Job results don't match in types!")
     }
@@ -307,13 +306,13 @@ class JsonProtocolSuite extends FunSuite {
       case (Success, Success) =>
       case (Resubmitted, Resubmitted) =>
       case (r1: FetchFailed, r2: FetchFailed) =>
-        assert(r1.shuffleId == r2.shuffleId)
-        assert(r1.mapId == r2.mapId)
-        assert(r1.reduceId == r2.reduceId)
+        assert(r1.shuffleId === r2.shuffleId)
+        assert(r1.mapId === r2.mapId)
+        assert(r1.reduceId === r2.reduceId)
         assertEquals(r1.bmAddress, r2.bmAddress)
       case (r1: ExceptionFailure, r2: ExceptionFailure) =>
-        assert(r1.className == r2.className)
-        assert(r1.description == r2.description)
+        assert(r1.className === r2.className)
+        assert(r1.description === r2.description)
         assertSeqEquals(r1.stackTrace, r2.stackTrace, 
assertStackTraceElementEquals)
         assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals)
       case (TaskResultLost, TaskResultLost) =>
@@ -329,13 +328,13 @@ class JsonProtocolSuite extends FunSuite {
       details2: Map[String, Seq[(String, String)]]) {
     details1.zip(details2).foreach {
       case ((key1, values1: Seq[(String, String)]), (key2, values2: 
Seq[(String, String)])) =>
-        assert(key1 == key2)
-        values1.zip(values2).foreach { case (v1, v2) => assert(v1 == v2) }
+        assert(key1 === key2)
+        values1.zip(values2).foreach { case (v1, v2) => assert(v1 === v2) }
     }
   }
 
   private def assertEquals(exception1: Exception, exception2: Exception) {
-    assert(exception1.getMessage == exception2.getMessage)
+    assert(exception1.getMessage === exception2.getMessage)
     assertSeqEquals(
       exception1.getStackTrace,
       exception2.getStackTrace,
@@ -344,11 +343,11 @@ class JsonProtocolSuite extends FunSuite {
 
   private def assertJsonStringEquals(json1: String, json2: String) {
     val formatJsonString = (json: String) => json.replaceAll("[\\s|]", "")
-    formatJsonString(json1) == formatJsonString(json2)
+    formatJsonString(json1) === formatJsonString(json2)
   }
 
   private def assertSeqEquals[T](seq1: Seq[T], seq2: Seq[T], assertEquals: (T, 
T) => Unit) {
-    assert(seq1.length == seq2.length)
+    assert(seq1.length === seq2.length)
     seq1.zip(seq2).foreach { case (t1, t2) =>
       assertEquals(t1, t2)
     }
@@ -389,11 +388,11 @@ class JsonProtocolSuite extends FunSuite {
   }
 
   private def assertBlockEquals(b1: (BlockId, BlockStatus), b2: (BlockId, 
BlockStatus)) {
-    assert(b1 == b2)
+    assert(b1 === b2)
   }
 
   private def assertStackTraceElementEquals(ste1: StackTraceElement, ste2: 
StackTraceElement) {
-    assert(ste1 == ste2)
+    assert(ste1 === ste2)
   }
 
 

Reply via email to