Repository: spark
Updated Branches:
  refs/heads/master fec10f0c6 -> 031d7d414


[SPARK-6304] [STREAMING] Fix checkpointing doesn't retain driver port issue.

Author: jerryshao <saisai.s...@intel.com>
Author: Saisai Shao <saisai.s...@intel.com>

Closes #5060 from jerryshao/SPARK-6304 and squashes the following commits:

89b01f5 [jerryshao] Update the unit test to add more cases
275d252 [jerryshao] Address the comments
7cc146d [jerryshao] Address the comments
2624723 [jerryshao] Fix rebase conflict
45befaa [Saisai Shao] Update the unit test
bbc1c9c [Saisai Shao] Fix checkpointing doesn't retain driver port issue


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/031d7d41
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/031d7d41
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/031d7d41

Branch: refs/heads/master
Commit: 031d7d41430ec1f3c3353e33eab4821a9bcd58a5
Parents: fec10f0
Author: jerryshao <saisai.s...@intel.com>
Authored: Thu Jul 16 16:55:46 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Thu Jul 16 16:55:46 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/streaming/Checkpoint.scala |  2 +
 .../spark/streaming/CheckpointSuite.scala       | 45 +++++++++++++++++++-
 2 files changed, 46 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/031d7d41/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 5279331..65d4e93 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -48,6 +48,8 @@ class Checkpoint(@transient ssc: StreamingContext, val 
checkpointTime: Time)
     // Reload properties for the checkpoint application since user wants to 
set a reload property
     // or spark had changed its value and user wants to set it back.
     val propertiesToReload = List(
+      "spark.driver.host",
+      "spark.driver.port",
       "spark.master",
       "spark.yarn.keytab",
       "spark.yarn.principal")

http://git-wip-us.apache.org/repos/asf/spark/blob/031d7d41/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 6a94928..d308ac0 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -191,8 +191,51 @@ class CheckpointSuite extends TestSuiteBase {
     }
   }
 
+  // This tests if "spark.driver.host" and "spark.driver.port" is set by user, 
can be recovered
+  // with correct value.
+  test("get correct spark.driver.[host|port] from checkpoint") {
+    val conf = Map("spark.driver.host" -> "localhost", "spark.driver.port" -> 
"9999")
+    conf.foreach(kv => System.setProperty(kv._1, kv._2))
+    ssc = new StreamingContext(master, framework, batchDuration)
+    val originalConf = ssc.conf
+    assert(originalConf.get("spark.driver.host") === "localhost")
+    assert(originalConf.get("spark.driver.port") === "9999")
+
+    val cp = new Checkpoint(ssc, Time(1000))
+    ssc.stop()
+
+    // Serialize/deserialize to simulate write to storage and reading it back
+    val newCp = Utils.deserialize[Checkpoint](Utils.serialize(cp))
+
+    val newCpConf = newCp.createSparkConf()
+    assert(newCpConf.contains("spark.driver.host"))
+    assert(newCpConf.contains("spark.driver.port"))
+    assert(newCpConf.get("spark.driver.host") === "localhost")
+    assert(newCpConf.get("spark.driver.port") === "9999")
+
+    // Check if all the parameters have been restored
+    ssc = new StreamingContext(null, newCp, null)
+    val restoredConf = ssc.conf
+    assert(restoredConf.get("spark.driver.host") === "localhost")
+    assert(restoredConf.get("spark.driver.port") === "9999")
+    ssc.stop()
+
+    // If spark.driver.host and spark.driver.host is not set in system 
property, these two
+    // parameters should not be presented in the newly recovered conf.
+    conf.foreach(kv => System.clearProperty(kv._1))
+    val newCpConf1 = newCp.createSparkConf()
+    assert(!newCpConf1.contains("spark.driver.host"))
+    assert(!newCpConf1.contains("spark.driver.port"))
+
+    // Spark itself will dispatch a random, not-used port for 
spark.driver.port if it is not set
+    // explicitly.
+    ssc = new StreamingContext(null, newCp, null)
+    val restoredConf1 = ssc.conf
+    assert(restoredConf1.get("spark.driver.host") === "localhost")
+    assert(restoredConf1.get("spark.driver.port") !== "9999")
+  }
 
-  // This tests whether the systm can recover from a master failure with simple
+  // This tests whether the system can recover from a master failure with 
simple
   // non-stateful operations. This assumes as reliable, replayable input
   // source - TestInputDStream.
   test("recovery with map and reduceByKey operations") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to