[ https://issues.apache.org/jira/browse/SPARK-8086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Josh Rosen resolved SPARK-8086. ------------------------------- Resolution: Invalid I'm closing this as "Invalid" for now, since it turns out that I wasn't paying close enough attention to where the exception was being thrown; it turns out that `saveAsTextFile` wasn't actually throwing here, which explains why the addition of that method affected things. Sorry for being sloppy :( > InputOutputMetricsSuite should not call side-effecting > getFSBytesWrittenOnThreadCallback to detect whether we're running on Hadoop > 2.5+ > --------------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-8086 > URL: https://issues.apache.org/jira/browse/SPARK-8086 > Project: Spark > Issue Type: Bug > Components: Spark Core > Reporter: Josh Rosen > Priority: Critical > > (This JIRA is a spinoff from SPARK-8062) > While working to try to reproduce SPARK-8062 I noticed something rather > curious in {{InputOutputMetricsSuite}}: the output metrics tests are guarded > by {{if}} statements that check whether the bytesWrittenOnThreadCallback is > defined: > {code} > test("output metrics when writing text file") { > val fs = FileSystem.getLocal(new Configuration()) > val outPath = new Path(fs.getWorkingDirectory, "outdir") > if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(outPath, > fs.getConf).isDefined) { > // ... Body of test case ... > } > } > {code} > AFAIK this test was introduced in order to prevent this test's assertions > from failing under pre-Hadoop-2.5 versions of Hadoop. > Now, take a look at the regression test that I added to try to reproduce this > bug: > {code} > test("exceptions while getting IO thread statistics should not fail tasks / > jobs (SPARK-8062)") { > FileSystem.getStatistics(null, classOf[FileSystem]) > val fs = FileSystem.getLocal(new Configuration()) > val outPath = new Path(fs.getWorkingDirectory, "outdir") > // This test passes unless the following line is commented out. The > following line therefore > // has some side-effects that are impacting the system under test: > SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(outPath, > fs.getConf).isDefined > val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2) > try { > rdd.saveAsTextFile(outPath.toString) > } finally { > fs.delete(outPath, true) > } > } > {code} > In this test, I try to pollute the global FileSystem statistics registry by > storing a statistics entry for a filesystem with a null URI. For this test, > all I care about is Spark not crashing, so I didn't add the {{if}} check (I > don't need to worry about the assertions failing on pre-Hadoop-2.5 versions > here since there aren't any assertions that check the metrics for this test). > Surprisingly, though, my test was unable to fail until I added a > {code} > SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback(outPath, > fs.getConf).isDefined > {code} > check outside of an {{if}} statement. This implies that this method has side > effects which influence whether other metrics retrieval code is called. I > worry that this may imply that our other InputOutputMetrics code could be > broken for real production jobs. I'd like to investigate this and fix this > issue, while also hardening this code: I think that we should be performing > significantly more null checks for the input and output of Hadoop methods and > should be using a pure function to determine whether our Hadoop version > supports these metrics rather than calling a method that might have > side-effects (I think we can do this purely via reflection without actually > creating any objects / calling any methods). -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org