Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/3832#discussion_r22447244
--- Diff:
streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---
@@ -255,6 +255,45 @@ class CheckpointSuite extends TestSuiteBase {
}
}
+ test("recovery with saveAsHadoopFile inside transform operation") {
+ // Regression test for SPARK-4835.
+ //
+ // In that issue, the problem was that `saveAsHadoopFile(s)` would
fail when the last batch
+ // was restarted from a checkpoint since the output directory would
already exist. However,
+ // the other saveAsHadoopFile* tests couldn't catch this because they
only tested whether the
+ // output matched correctly and not whether the post-restart batch had
successfully finished
+ // without throwing any errors. The following test reproduces the
same bug with a test that
+ // actually fails because the error in saveAsHadoopFile causes
transform() to fail, which
+ // prevents the expected output from being written to the output
stream.
+ //
+ // This is not actually a valid use of transform, but it's being used
here so that we can test
+ // the fix for SPARK-4835 independently of additional test cleanup.
+ //
+ // After SPARK-5079 is addressed, should be able to remove this test
since a strengthened
+ // version of the other saveAsHadoopFile* tests would prevent
regressions for this issue.
+ val tempDir = Files.createTempDir()
+ try {
+ testCheckpointedOperation(
+ Seq(Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"),
Seq("", ""), Seq()),
+ (s: DStream[String]) => {
+ s.transform { (rdd, time) =>
+ val output = rdd.map(x => (x, 1)).reduceByKey(_ + _)
+ output.saveAsHadoopFile(
+ new File(tempDir, "result-" +
time.milliseconds).getAbsolutePath,
+ classOf[Text],
--- End diff --
No it isnt, sorry.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]