Repository: spark Updated Branches: refs/heads/master 83fe3b5e1 -> b78cf13bf
[SPARK-21276][CORE] Update lz4-java to the latest (v1.4.0) ## What changes were proposed in this pull request? This pr updated `lz4-java` to the latest (v1.4.0) and removed custom `LZ4BlockInputStream`. We currently use custom `LZ4BlockInputStream` to read concatenated byte stream in shuffle. But, this functionality has been implemented in the latest lz4-java (https://github.com/lz4/lz4-java/pull/105). So, we might update the latest to remove the custom `LZ4BlockInputStream`. Major diffs between the latest release and v1.3.0 in the master are as follows (https://github.com/lz4/lz4-java/compare/62f7547abb0819d1ca1e669645ee1a9d26cd60b0...6d4693f56253fcddfad7b441bb8d917b182efa2d); - fixed NPE in XXHashFactory similarly - Don't place resources in default package to support shading - Fixes ByteBuffer methods failing to apply arrayOffset() for array-backed - Try to load lz4-java from java.library.path, then fallback to bundled - Add ppc64le binary - Add s390x JNI binding - Add basic LZ4 Frame v1.5.0 support - enable aarch64 support for lz4-java - Allow unsafeInstance() for ppc64le archiecture - Add unsafeInstance support for AArch64 - Support 64-bit JNI build on Solaris - Avoid over-allocating a buffer - Allow EndMark to be incompressible for LZ4FrameInputStream. - Concat byte stream ## How was this patch tested? Existing tests. Author: Takeshi Yamamuro <yamam...@apache.org> Closes #18883 from maropu/SPARK-21276. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b78cf13b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b78cf13b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b78cf13b Branch: refs/heads/master Commit: b78cf13bf05f0eadd7ae97df84b6e1505dc5ff9f Parents: 83fe3b5 Author: Takeshi Yamamuro <yamam...@apache.org> Authored: Wed Aug 9 17:31:52 2017 +0200 Committer: Sean Owen <so...@cloudera.com> Committed: Wed Aug 9 17:31:52 2017 +0200 ---------------------------------------------------------------------- core/pom.xml | 4 +- .../apache/spark/io/LZ4BlockInputStream.java | 260 ------------------- .../org/apache/spark/io/CompressionCodec.scala | 7 +- dev/deps/spark-deps-hadoop-2.6 | 2 +- dev/deps/spark-deps-hadoop-2.7 | 2 +- external/kafka-0-10-assembly/pom.xml | 4 +- external/kafka-0-8-assembly/pom.xml | 4 +- pom.xml | 6 +- project/MimaExcludes.scala | 5 +- 9 files changed, 20 insertions(+), 274 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b78cf13b/core/pom.xml ---------------------------------------------------------------------- diff --git a/core/pom.xml b/core/pom.xml index bc6b1c4..431967e 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -190,8 +190,8 @@ <artifactId>snappy-java</artifactId> </dependency> <dependency> - <groupId>net.jpountz.lz4</groupId> - <artifactId>lz4</artifactId> + <groupId>org.lz4</groupId> + <artifactId>lz4-java</artifactId> </dependency> <dependency> <groupId>org.roaringbitmap</groupId> http://git-wip-us.apache.org/repos/asf/spark/blob/b78cf13b/core/src/main/java/org/apache/spark/io/LZ4BlockInputStream.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/io/LZ4BlockInputStream.java b/core/src/main/java/org/apache/spark/io/LZ4BlockInputStream.java deleted file mode 100644 index 9d6f06e..0000000 --- a/core/src/main/java/org/apache/spark/io/LZ4BlockInputStream.java +++ /dev/null @@ -1,260 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.io; - -import java.io.EOFException; -import java.io.FilterInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.util.zip.Checksum; - -import net.jpountz.lz4.LZ4Exception; -import net.jpountz.lz4.LZ4Factory; -import net.jpountz.lz4.LZ4FastDecompressor; -import net.jpountz.util.SafeUtils; -import net.jpountz.xxhash.XXHashFactory; - -/** - * {@link InputStream} implementation to decode data written with - * {@link net.jpountz.lz4.LZ4BlockOutputStream}. This class is not thread-safe and does not - * support {@link #mark(int)}/{@link #reset()}. - * @see net.jpountz.lz4.LZ4BlockOutputStream - * - * This is based on net.jpountz.lz4.LZ4BlockInputStream - * - * changes: https://github.com/davies/lz4-java/commit/cc1fa940ac57cc66a0b937300f805d37e2bf8411 - * - * TODO: merge this into upstream - */ -public final class LZ4BlockInputStream extends FilterInputStream { - - // Copied from net.jpountz.lz4.LZ4BlockOutputStream - static final byte[] MAGIC = new byte[] { 'L', 'Z', '4', 'B', 'l', 'o', 'c', 'k' }; - static final int MAGIC_LENGTH = MAGIC.length; - - static final int HEADER_LENGTH = - MAGIC_LENGTH // magic bytes - + 1 // token - + 4 // compressed length - + 4 // decompressed length - + 4; // checksum - - static final int COMPRESSION_LEVEL_BASE = 10; - - static final int COMPRESSION_METHOD_RAW = 0x10; - static final int COMPRESSION_METHOD_LZ4 = 0x20; - - static final int DEFAULT_SEED = 0x9747b28c; - - private final LZ4FastDecompressor decompressor; - private final Checksum checksum; - private byte[] buffer; - private byte[] compressedBuffer; - private int originalLen; - private int o; - private boolean finished; - - /** - * Create a new {@link InputStream}. - * - * @param in the {@link InputStream} to poll - * @param decompressor the {@link LZ4FastDecompressor decompressor} instance to - * use - * @param checksum the {@link Checksum} instance to use, must be - * equivalent to the instance which has been used to - * write the stream - */ - public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor, Checksum checksum) { - super(in); - this.decompressor = decompressor; - this.checksum = checksum; - this.buffer = new byte[0]; - this.compressedBuffer = new byte[HEADER_LENGTH]; - o = originalLen = 0; - finished = false; - } - - /** - * Create a new instance using {@link net.jpountz.xxhash.XXHash32} for checksuming. - * @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor, Checksum) - * @see net.jpountz.xxhash.StreamingXXHash32#asChecksum() - */ - public LZ4BlockInputStream(InputStream in, LZ4FastDecompressor decompressor) { - this(in, decompressor, - XXHashFactory.fastestInstance().newStreamingHash32(DEFAULT_SEED).asChecksum()); - } - - /** - * Create a new instance which uses the fastest {@link LZ4FastDecompressor} available. - * @see LZ4Factory#fastestInstance() - * @see #LZ4BlockInputStream(InputStream, LZ4FastDecompressor) - */ - public LZ4BlockInputStream(InputStream in) { - this(in, LZ4Factory.fastestInstance().fastDecompressor()); - } - - @Override - public int available() throws IOException { - refill(); - return originalLen - o; - } - - @Override - public int read() throws IOException { - refill(); - if (finished) { - return -1; - } - return buffer[o++] & 0xFF; - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - SafeUtils.checkRange(b, off, len); - refill(); - if (finished) { - return -1; - } - len = Math.min(len, originalLen - o); - System.arraycopy(buffer, o, b, off, len); - o += len; - return len; - } - - @Override - public int read(byte[] b) throws IOException { - return read(b, 0, b.length); - } - - @Override - public long skip(long n) throws IOException { - refill(); - if (finished) { - return -1; - } - final int skipped = (int) Math.min(n, originalLen - o); - o += skipped; - return skipped; - } - - private void refill() throws IOException { - if (finished || o < originalLen) { - return; - } - try { - readFully(compressedBuffer, HEADER_LENGTH); - } catch (EOFException e) { - finished = true; - return; - } - for (int i = 0; i < MAGIC_LENGTH; ++i) { - if (compressedBuffer[i] != MAGIC[i]) { - throw new IOException("Stream is corrupted"); - } - } - final int token = compressedBuffer[MAGIC_LENGTH] & 0xFF; - final int compressionMethod = token & 0xF0; - final int compressionLevel = COMPRESSION_LEVEL_BASE + (token & 0x0F); - if (compressionMethod != COMPRESSION_METHOD_RAW && compressionMethod != COMPRESSION_METHOD_LZ4) - { - throw new IOException("Stream is corrupted"); - } - final int compressedLen = SafeUtils.readIntLE(compressedBuffer, MAGIC_LENGTH + 1); - originalLen = SafeUtils.readIntLE(compressedBuffer, MAGIC_LENGTH + 5); - final int check = SafeUtils.readIntLE(compressedBuffer, MAGIC_LENGTH + 9); - assert HEADER_LENGTH == MAGIC_LENGTH + 13; - if (originalLen > 1 << compressionLevel - || originalLen < 0 - || compressedLen < 0 - || (originalLen == 0 && compressedLen != 0) - || (originalLen != 0 && compressedLen == 0) - || (compressionMethod == COMPRESSION_METHOD_RAW && originalLen != compressedLen)) { - throw new IOException("Stream is corrupted"); - } - if (originalLen == 0 && compressedLen == 0) { - if (check != 0) { - throw new IOException("Stream is corrupted"); - } - refill(); - return; - } - if (buffer.length < originalLen) { - buffer = new byte[Math.max(originalLen, buffer.length * 3 / 2)]; - } - switch (compressionMethod) { - case COMPRESSION_METHOD_RAW: - readFully(buffer, originalLen); - break; - case COMPRESSION_METHOD_LZ4: - if (compressedBuffer.length < originalLen) { - compressedBuffer = new byte[Math.max(compressedLen, compressedBuffer.length * 3 / 2)]; - } - readFully(compressedBuffer, compressedLen); - try { - final int compressedLen2 = - decompressor.decompress(compressedBuffer, 0, buffer, 0, originalLen); - if (compressedLen != compressedLen2) { - throw new IOException("Stream is corrupted"); - } - } catch (LZ4Exception e) { - throw new IOException("Stream is corrupted", e); - } - break; - default: - throw new AssertionError(); - } - checksum.reset(); - checksum.update(buffer, 0, originalLen); - if ((int) checksum.getValue() != check) { - throw new IOException("Stream is corrupted"); - } - o = 0; - } - - private void readFully(byte[] b, int len) throws IOException { - int read = 0; - while (read < len) { - final int r = in.read(b, read, len - read); - if (r < 0) { - throw new EOFException("Stream ended prematurely"); - } - read += r; - } - assert len == read; - } - - @Override - public boolean markSupported() { - return false; - } - - @SuppressWarnings("sync-override") - @Override - public void mark(int readlimit) { - // unsupported - } - - @SuppressWarnings("sync-override") - @Override - public void reset() throws IOException { - throw new IOException("mark/reset not supported"); - } - - @Override - public String toString() { - return getClass().getSimpleName() + "(in=" + in - + ", decompressor=" + decompressor + ", checksum=" + checksum + ")"; - } - -} http://git-wip-us.apache.org/repos/asf/spark/blob/b78cf13b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index 0cb16f0..27f2e42 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -21,7 +21,7 @@ import java.io._ import java.util.Locale import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} -import net.jpountz.lz4.LZ4BlockOutputStream +import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream} import org.xerial.snappy.{Snappy, SnappyInputStream, SnappyOutputStream} import org.apache.spark.SparkConf @@ -115,7 +115,10 @@ class LZ4CompressionCodec(conf: SparkConf) extends CompressionCodec { new LZ4BlockOutputStream(s, blockSize) } - override def compressedInputStream(s: InputStream): InputStream = new LZ4BlockInputStream(s) + override def compressedInputStream(s: InputStream): InputStream = { + val disableConcatenationOfByteStream = false + new LZ4BlockInputStream(s, disableConcatenationOfByteStream) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/b78cf13b/dev/deps/spark-deps-hadoop-2.6 ---------------------------------------------------------------------- diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6 index d7587fb..83070a9 100644 --- a/dev/deps/spark-deps-hadoop-2.6 +++ b/dev/deps/spark-deps-hadoop-2.6 @@ -133,7 +133,7 @@ leveldbjni-all-1.8.jar libfb303-0.9.3.jar libthrift-0.9.3.jar log4j-1.2.17.jar -lz4-1.3.0.jar +lz4-java-1.4.0.jar machinist_2.11-0.6.1.jar macro-compat_2.11-1.1.1.jar mail-1.4.7.jar http://git-wip-us.apache.org/repos/asf/spark/blob/b78cf13b/dev/deps/spark-deps-hadoop-2.7 ---------------------------------------------------------------------- diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7 index 887eeca..5481e25 100644 --- a/dev/deps/spark-deps-hadoop-2.7 +++ b/dev/deps/spark-deps-hadoop-2.7 @@ -134,7 +134,7 @@ leveldbjni-all-1.8.jar libfb303-0.9.3.jar libthrift-0.9.3.jar log4j-1.2.17.jar -lz4-1.3.0.jar +lz4-java-1.4.0.jar machinist_2.11-0.6.1.jar macro-compat_2.11-1.1.1.jar mail-1.4.7.jar http://git-wip-us.apache.org/repos/asf/spark/blob/b78cf13b/external/kafka-0-10-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-assembly/pom.xml b/external/kafka-0-10-assembly/pom.xml index 75df886..d6f9731 100644 --- a/external/kafka-0-10-assembly/pom.xml +++ b/external/kafka-0-10-assembly/pom.xml @@ -65,8 +65,8 @@ <scope>provided</scope> </dependency> <dependency> - <groupId>net.jpountz.lz4</groupId> - <artifactId>lz4</artifactId> + <groupId>org.lz4</groupId> + <artifactId>lz4-java</artifactId> <scope>provided</scope> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/spark/blob/b78cf13b/external/kafka-0-8-assembly/pom.xml ---------------------------------------------------------------------- diff --git a/external/kafka-0-8-assembly/pom.xml b/external/kafka-0-8-assembly/pom.xml index f9c2dcb..7863494 100644 --- a/external/kafka-0-8-assembly/pom.xml +++ b/external/kafka-0-8-assembly/pom.xml @@ -65,8 +65,8 @@ <scope>provided</scope> </dependency> <dependency> - <groupId>net.jpountz.lz4</groupId> - <artifactId>lz4</artifactId> + <groupId>org.lz4</groupId> + <artifactId>lz4-java</artifactId> <scope>provided</scope> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/spark/blob/b78cf13b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 500fa1c..9616f6d 100644 --- a/pom.xml +++ b/pom.xml @@ -531,9 +531,9 @@ <scope>${hadoop.deps.scope}</scope> </dependency> <dependency> - <groupId>net.jpountz.lz4</groupId> - <artifactId>lz4</artifactId> - <version>1.3.0</version> + <groupId>org.lz4</groupId> + <artifactId>lz4-java</artifactId> + <version>1.4.0</version> </dependency> <dependency> <groupId>com.clearspring.analytics</groupId> http://git-wip-us.apache.org/repos/asf/spark/blob/b78cf13b/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 1793da0..7ba85bd 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -41,7 +41,10 @@ object MimaExcludes { // [SPARK-19937] Add remote bytes read to disk. ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetrics.this"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetricDistributions.this") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ShuffleReadMetricDistributions.this"), + + // [SPARK-21276] Update lz4-java to the latest (v1.4.0) + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.io.LZ4BlockInputStream") ) // Exclude rules for 2.2.x --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org