Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/6096#discussion_r30179125
--- Diff:
streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
---
@@ -418,76 +418,16 @@ class StreamingContextSuite extends FunSuite with
BeforeAndAfter with Timeouts w
ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction
_)
assert(ssc != null, "no context created")
assert(!newContextCreated, "old context not recovered")
- assert(ssc.conf.get("someKey") === "someValue")
+ assert(ssc.conf.get("someKey") === "someValue", "checkpointed config
not recovered")
}
- }
-
- test("getOrCreate with existing SparkContext") {
- val conf = new SparkConf().setMaster(master).setAppName(appName)
- sc = new SparkContext(conf)
- // Function to create StreamingContext that has a config to identify
it to be new context
- var newContextCreated = false
- def creatingFunction(sparkContext: SparkContext): StreamingContext = {
- newContextCreated = true
- new StreamingContext(sparkContext, batchDuration)
- }
-
- // Call ssc.stop(stopSparkContext = false) after a body of cody
- def testGetOrCreate(body: => Unit): Unit = {
- newContextCreated = false
- try {
- body
- } finally {
- if (ssc != null) {
- ssc.stop(stopSparkContext = false)
- }
- ssc = null
- }
- }
-
- val emptyPath = Utils.createTempDir().getAbsolutePath()
-
- // getOrCreate should create new context with empty path
+ // getOrCreate should recover StreamingContext with existing
SparkContext
testGetOrCreate {
- ssc = StreamingContext.getOrCreate(emptyPath, creatingFunction _,
sc, createOnError = true)
- assert(ssc != null, "no context created")
- assert(newContextCreated, "new context not created")
- assert(ssc.sparkContext === sc, "new StreamingContext does not use
existing SparkContext")
- }
-
- val corrutedCheckpointPath = createCorruptedCheckpoint()
-
- // getOrCreate should throw exception with fake checkpoint file and
createOnError = false
- intercept[Exception] {
- ssc = StreamingContext.getOrCreate(corrutedCheckpointPath,
creatingFunction _, sc)
- }
-
- // getOrCreate should throw exception with fake checkpoint file
- intercept[Exception] {
- ssc = StreamingContext.getOrCreate(
- corrutedCheckpointPath, creatingFunction _, sc, createOnError =
false)
- }
-
- // getOrCreate should create new context with fake checkpoint file and
createOnError = true
- testGetOrCreate {
- ssc = StreamingContext.getOrCreate(
- corrutedCheckpointPath, creatingFunction _, sc, createOnError =
true)
- assert(ssc != null, "no context created")
- assert(newContextCreated, "new context not created")
- assert(ssc.sparkContext === sc, "new StreamingContext does not use
existing SparkContext")
- }
-
- val checkpointPath = createValidCheckpoint()
-
- // StreamingContext.getOrCreate should recover context with checkpoint
path
- testGetOrCreate {
- ssc = StreamingContext.getOrCreate(checkpointPath, creatingFunction
_, sc)
+ sc = new SparkContext(conf)
--- End diff --
Here the whole unit test to test the deleted API has been replaced by a
sub-unit-test that tests older API's ability to use an existing SparkContext.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]