Github user JoshRosen commented on the pull request:
https://github.com/apache/spark/pull/5868#issuecomment-101954962
If anyone's been watching the Spark master Maven build, you may have
noticed that it's started failing since this patch. I spent a bunch of time
investigating this and narrowed it down to an issue where JavaAPISuite seemed
to fail a pair of its sort tests if JavaAPISuite was run immediately after
UnsafeShuffleWriterSuite. It turns out that this failure was **not** caused by
this patch but was merely exposed by its impact on timing / scheduling, so
**please don't revert this!**
It looks like there's a bug in snappy-java that can lead to stream
corruption issues and the timing changes in the tests were sufficient to expose
it: https://github.com/xerial/snappy-java/issues/107. The same failures can be
reproduced by adding small Thread.sleep calls in
SnappyCompressionCodec.createOutputStream:
```diff
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 0756cdb..cc2a912 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -154,6 +154,7 @@ class SnappyCompressionCodec(conf: SparkConf) extends
CompressionCodec {
override def compressedOutputStream(s: OutputStream): OutputStream = {
val blockSize =
conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt
+ Thread.sleep(10)
new SnappyOutputStream(s, blockSize)
}
```
I have a patch to fix this issue upstream in snappy-java. In the meantime,
if we want to unbreak the Maven tests we can simply modify
SnappyCompressionCodec to use the DefaultBufferAllocator instead of the caching
one:
```diff
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 0756cdb..e26db56 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -21,6 +21,7 @@ import java.io.{InputStream, OutputStream}
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
+import org.xerial.snappy.buffer.DefaultBufferAllocator
import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream}
import org.apache.spark.SparkConf
@@ -154,7 +155,7 @@ class SnappyCompressionCodec(conf: SparkConf) extends
CompressionCodec {
override def compressedOutputStream(s: OutputStream): OutputStream = {
val blockSize =
conf.getSizeAsBytes("spark.io.compression.snappy.blockSize", "32k").toInt
- new SnappyOutputStream(s, blockSize)
+ new SnappyOutputStream(s, blockSize, DefaultBufferAllocator.factory)
}
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]