[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

2022-01-25 Thread GitBox


liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r792333746



##
File path: 
java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import 
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import 
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf 
uncompressedBuffer) {
+Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= 
Integer.MAX_VALUE,
+"The uncompressed buffer size exceeds the integer limit");
+
+if (uncompressedBuffer.writerIndex() == 0L) {
+  // shortcut for empty buffer
+  ArrowBuf compressedBuffer = 
allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+  compressedBuffer.setLong(0, 0);
+  
compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+  uncompressedBuffer.close();
+  return compressedBuffer;
+}
+
+try {
+  ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+  long compressedLength = compressedBuffer.writerIndex() - 
CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+  if (compressedLength > uncompressedBuffer.writerIndex()) {
+// compressed buffer is larger, send the raw buffer
+compressedBuffer.close();
+compressedBuffer = CompressionUtil.compressRawBuffer(allocator, 
uncompressedBuffer);
+  }
+
+  uncompressedBuffer.close();
+  return compressedBuffer;
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf 
uncompressedBuffer) throws IOException {
+byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 
0, uncompressedBuffer.writerIndex());
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+try (InputStream in = new ByteArrayInputStream(inBytes);
+ OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+  IOUtils.copy(in, out);
+}
+
+byte[] outBytes = baos.toByteArray();
+
+ArrowBuf compressedBuffer = 
allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+long uncompressedLength = uncompressedBuffer.writerIndex();
+if (!MemoryUtil.LITTLE_ENDIAN) {
+  uncompressedLength = Long.reverseBytes(uncompressedLength);
+}
+// first 8 bytes reserved for uncompressed length, to be consistent with 
the
+// C++ implementation.
+compressedBuffer.setLong(0, uncompressedLength);
+
+PlatformDependent.copyMemory(
+outBytes, 0, compressedBuffer.memoryAddress() + 
CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length);
+compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + 
outBytes.length);
+return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf 
compressedBuffer) {
+Preconditions.checkArgument(compressedBuffer.writerIndex() <= 
Integer.MAX_VALUE,
+"The compressed buffer size exceeds the integer limit");
+
+Preconditions.checkArgument(compressedBuffer.writerIndex() >= 

[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

2021-03-17 Thread GitBox


liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r595805905



##
File path: 
java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import 
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import 
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;

Review comment:
   I guess no for now. Maybe we can have one in the future, so we can 
remove the dependency on Netty (and other dependencies on Netty as well).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

2021-03-11 Thread GitBox


liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r592197707



##
File path: 
java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java
##
@@ -47,14 +57,21 @@ public static ArrowBodyCompression 
createBodyCompression(CompressionCodec codec)
   }
 
   /**
-   * Creates the {@link CompressionCodec} given the compression type.
+   * Process compression by compressing the buffer as is.
*/
-  public static CompressionCodec createCodec(byte compressionType) {
-switch (compressionType) {
-  case NoCompressionCodec.COMPRESSION_TYPE:
-return NoCompressionCodec.INSTANCE;
-  default:
-throw new IllegalArgumentException("Compression type not supported: " 
+ compressionType);
-}
+  public static ArrowBuf compressRawBuffer(BufferAllocator allocator, ArrowBuf 
inputBuffer) {
+ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_UNCOMPRESSED_LENGTH + 
inputBuffer.writerIndex());
+compressedBuffer.setLong(0, NO_COMPRESSION_LENGTH);
+compressedBuffer.setBytes(SIZE_OF_UNCOMPRESSED_LENGTH, inputBuffer, 0, 
inputBuffer.writerIndex());
+compressedBuffer.writerIndex(SIZE_OF_UNCOMPRESSED_LENGTH + 
inputBuffer.writerIndex());
+return compressedBuffer;
+  }
+
+  /**
+   * Process decompression by decompressing the buffer as is.

Review comment:
   I chose `extractUncompressedBuffer`. Thanks for the good suggestion. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

2021-03-11 Thread GitBox


liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r592197440



##
File path: java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java
##
@@ -79,7 +105,12 @@ private void loadBuffers(
 List ownBuffers = new ArrayList<>(bufferLayoutCount);
 for (int j = 0; j < bufferLayoutCount; j++) {
   ArrowBuf nextBuf = buffers.next();
-  ownBuffers.add(codec.decompress(vector.getAllocator(), nextBuf));
+  // for vectors without nulls, the buffer is empty, so there is no need 
to decompress it.
+  ArrowBuf bufferToAdd = nextBuf.writerIndex() > 0 ? 
codec.decompress(vector.getAllocator(), nextBuf) : nextBuf;
+  ownBuffers.add(bufferToAdd);
+  if (decompressionPerformed) {
+decompressedBuffers.add(bufferToAdd);
+  }
 }
 try {
   vector.loadFieldBuffers(fieldNode, ownBuffers);

Review comment:
   Good suggestion. This way, we do not need to change the public interface 
of `VectorLoader`. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

2021-03-11 Thread GitBox


liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r592196563



##
File path: 
java/vector/src/main/java/org/apache/arrow/vector/compression/CompressionUtil.java
##
@@ -47,14 +57,21 @@ public static ArrowBodyCompression 
createBodyCompression(CompressionCodec codec)
   }
 
   /**
-   * Creates the {@link CompressionCodec} given the compression type.
+   * Process compression by compressing the buffer as is.
*/
-  public static CompressionCodec createCodec(byte compressionType) {
-switch (compressionType) {
-  case NoCompressionCodec.COMPRESSION_TYPE:
-return NoCompressionCodec.INSTANCE;
-  default:
-throw new IllegalArgumentException("Compression type not supported: " 
+ compressionType);
-}
+  public static ArrowBuf compressRawBuffer(BufferAllocator allocator, ArrowBuf 
inputBuffer) {
+ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_UNCOMPRESSED_LENGTH + 
inputBuffer.writerIndex());

Review comment:
   This is a good point. It could be useful in many scenarios.
   
   However, so far, I do not have a method to solve it (without changing our 
APIs significantly).
   
   The fundamental reason is that the compression feature is only used in 
`VectorLoader`, which requires a single compressed buffer for each input 
buffer. In addition, we do not have an efficient way to combine two `ArrowBuf` 
objects. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

2021-03-11 Thread GitBox


liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r592192366



##
File path: 
java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import 
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import 
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf 
uncompressedBuffer) {
+Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= 
Integer.MAX_VALUE,
+"The uncompressed buffer size exceeds the integer limit");
+
+if (uncompressedBuffer.writerIndex() == 0L) {
+  // shortcut for empty buffer
+  ArrowBuf compressedBuffer = 
allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+  compressedBuffer.setLong(0, 0);
+  
compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+  uncompressedBuffer.close();
+  return compressedBuffer;
+}
+
+try {
+  ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+  long compressedLength = compressedBuffer.writerIndex() - 
CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+  if (compressedLength > uncompressedBuffer.writerIndex()) {
+// compressed buffer is larger, send the raw buffer
+compressedBuffer.close();
+compressedBuffer = CompressionUtil.compressRawBuffer(allocator, 
uncompressedBuffer);
+  }
+
+  uncompressedBuffer.close();
+  return compressedBuffer;
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf 
uncompressedBuffer) throws IOException {
+byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 
0, uncompressedBuffer.writerIndex());
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+try (InputStream in = new ByteArrayInputStream(inBytes);
+ OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+  IOUtils.copy(in, out);
+}
+
+byte[] outBytes = baos.toByteArray();
+
+ArrowBuf compressedBuffer = 
allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+long uncompressedLength = uncompressedBuffer.writerIndex();
+if (!MemoryUtil.LITTLE_ENDIAN) {
+  uncompressedLength = Long.reverseBytes(uncompressedLength);
+}
+// first 8 bytes reserved for uncompressed length, to be consistent with 
the
+// C++ implementation.

Review comment:
   Revised. Thanks for your kind reminder. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

2021-03-11 Thread GitBox


liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r592192114



##
File path: 
java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import 
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import 
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf 
uncompressedBuffer) {
+Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= 
Integer.MAX_VALUE,
+"The uncompressed buffer size exceeds the integer limit");
+
+if (uncompressedBuffer.writerIndex() == 0L) {
+  // shortcut for empty buffer
+  ArrowBuf compressedBuffer = 
allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+  compressedBuffer.setLong(0, 0);
+  
compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+  uncompressedBuffer.close();
+  return compressedBuffer;
+}
+
+try {
+  ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+  long compressedLength = compressedBuffer.writerIndex() - 
CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+  if (compressedLength > uncompressedBuffer.writerIndex()) {
+// compressed buffer is larger, send the raw buffer
+compressedBuffer.close();
+compressedBuffer = CompressionUtil.compressRawBuffer(allocator, 
uncompressedBuffer);

Review comment:
   Sure. It sounds better. I have changed the name accordingly. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

2021-03-11 Thread GitBox


liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r592190683



##
File path: 
java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.NoCompressionCodec;
+
+/**
+ * A factory implementation based on Apache Commons library.
+ */
+public class CommonsCompressionFactory implements CompressionCodec.Factory {
+
+  public static final CommonsCompressionFactory INSTANCE = new 
CommonsCompressionFactory();
+
+  @Override
+  public CompressionCodec createCodec(byte codecType) {
+switch (codecType) {
+  case NoCompressionCodec.COMPRESSION_TYPE:

Review comment:
   Sounds reasonable. I have revised the code accordingly. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

2021-03-11 Thread GitBox


liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r592190402



##
File path: 
java/compression/src/main/java/org/apache/arrow/compression/CommonsCompressionFactory.java
##
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.NoCompressionCodec;
+
+/**
+ * A factory implementation based on Apache Commons library.
+ */
+public class CommonsCompressionFactory implements CompressionCodec.Factory {
+
+  public static final CommonsCompressionFactory INSTANCE = new 
CommonsCompressionFactory();
+
+  @Override
+  public CompressionCodec createCodec(byte codecType) {

Review comment:
   Sounds reasonable. I have extracted an enum `CompressionUtil#CodecType` 
for this purpose. It also makes some other operations easier. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

2021-03-07 Thread GitBox


liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r589202945



##
File path: 
java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import 
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import 
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf 
uncompressedBuffer) {
+Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= 
Integer.MAX_VALUE,
+"The uncompressed buffer size exceeds the integer limit");
+
+if (uncompressedBuffer.writerIndex() == 0L) {
+  // shortcut for empty buffer
+  ArrowBuf compressedBuffer = 
allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+  compressedBuffer.setLong(0, 0);
+  
compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+  uncompressedBuffer.close();
+  return compressedBuffer;
+}
+
+try {
+  ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+  long compressedLength = compressedBuffer.writerIndex() - 
CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+  if (compressedLength > uncompressedBuffer.writerIndex()) {
+// compressed buffer is larger, send the raw buffer
+compressedBuffer.close();
+compressedBuffer = CompressionUtil.compressRawBuffer(allocator, 
uncompressedBuffer);
+  }
+
+  uncompressedBuffer.close();
+  return compressedBuffer;
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf 
uncompressedBuffer) throws IOException {
+byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];

Review comment:
   Sure. I have opened ARROW-11901 to track it.
   
   I also want to share some thoughts briefly: to use the native memory 
directly, we need a way to wrap `ByteBuffers` as input/output streams (In Java, 
the only "standard" way to access the off-heap memory is through the 
`DirectByteBuffer`). 
   
   We need some third party library to achieve that. We also need to evaluate 
the performance thereafter, because the Commons-compress library also uses 
on-heap data extensively, the copy between on-heap and off-heap data can be 
difficult to avoid. 
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

2021-03-07 Thread GitBox


liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r589171170



##
File path: 
java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import 
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import 
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf 
uncompressedBuffer) {
+Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= 
Integer.MAX_VALUE,
+"The uncompressed buffer size exceeds the integer limit");
+
+if (uncompressedBuffer.writerIndex() == 0L) {
+  // shortcut for empty buffer
+  ArrowBuf compressedBuffer = 
allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+  compressedBuffer.setLong(0, 0);
+  
compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+  uncompressedBuffer.close();
+  return compressedBuffer;
+}
+
+try {
+  ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+  long compressedLength = compressedBuffer.writerIndex() - 
CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+  if (compressedLength > uncompressedBuffer.writerIndex()) {
+// compressed buffer is larger, send the raw buffer
+compressedBuffer.close();
+compressedBuffer = CompressionUtil.compressRawBuffer(allocator, 
uncompressedBuffer);
+  }
+
+  uncompressedBuffer.close();
+  return compressedBuffer;
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf 
uncompressedBuffer) throws IOException {
+byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 
0, uncompressedBuffer.writerIndex());
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+try (InputStream in = new ByteArrayInputStream(inBytes);
+ OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+  IOUtils.copy(in, out);
+}
+
+byte[] outBytes = baos.toByteArray();
+
+ArrowBuf compressedBuffer = 
allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+long uncompressedLength = uncompressedBuffer.writerIndex();
+if (!MemoryUtil.LITTLE_ENDIAN) {
+  uncompressedLength = Long.reverseBytes(uncompressedLength);
+}
+// first 8 bytes reserved for uncompressed length, to be consistent with 
the
+// C++ implementation.
+compressedBuffer.setLong(0, uncompressedLength);
+
+PlatformDependent.copyMemory(
+outBytes, 0, compressedBuffer.memoryAddress() + 
CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length);
+compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + 
outBytes.length);
+return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf 
compressedBuffer) {
+Preconditions.checkArgument(compressedBuffer.writerIndex() <= 
Integer.MAX_VALUE,
+"The compressed buffer size exceeds the integer limit");
+
+Preconditions.checkArgument(compressedBuffer.writerIndex() >= 

[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

2021-03-07 Thread GitBox


liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r589169372



##
File path: 
java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import 
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import 
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {

Review comment:
   Good suggestion. I have opened ARROW-11899 to track it. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

2021-03-04 Thread GitBox


liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r588067931



##
File path: java/compression/pom.xml
##
@@ -0,0 +1,56 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+  4.0.0
+  
+org.apache.arrow
+arrow-java-root
+4.0.0-SNAPSHOT
+  
+  arrow-compression
+  Arrow Compression
+  (Experimental/Contrib) A library for working with the 
compression/decompression of Arrow data.
+
+  
+
+  org.apache.arrow
+  arrow-format
+  ${project.version}
+
+
+  org.apache.arrow
+  arrow-vector
+  ${project.version}
+  ${arrow.vector.classifier}
+
+
+  org.apache.arrow
+  arrow-memory-core
+  ${project.version}
+
+
+  org.apache.arrow
+  arrow-memory-unsafe
+  ${project.version}
+  test
+
+
+  org.apache.commons
+  commons-compress
+  1.20
+
+
+  io.netty

Review comment:
   The `vector` module requires this library. If we do not add it in the 
dependency list, we get an errow when building with maven:
   
   ```
   Used undeclared dependencies found: 
io.netty:netty-common:jar:4.1.48.Final:compile
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

2021-03-02 Thread GitBox


liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r586185940



##
File path: 
java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import 
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import 
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf 
uncompressedBuffer) {
+Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= 
Integer.MAX_VALUE,
+"The uncompressed buffer size exceeds the integer limit");
+
+if (uncompressedBuffer.writerIndex() == 0L) {
+  // shortcut for empty buffer
+  ArrowBuf compressedBuffer = 
allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+  compressedBuffer.setLong(0, 0);
+  
compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+  uncompressedBuffer.close();
+  return compressedBuffer;
+}
+
+try {
+  ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+  long compressedLength = compressedBuffer.writerIndex() - 
CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+  if (compressedLength > uncompressedBuffer.writerIndex()) {
+// compressed buffer is larger, send the raw buffer
+compressedBuffer.close();
+compressedBuffer = CompressionUtil.compressRawBuffer(allocator, 
uncompressedBuffer);
+  }
+
+  uncompressedBuffer.close();
+  return compressedBuffer;
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf 
uncompressedBuffer) throws IOException {
+byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 
0, uncompressedBuffer.writerIndex());
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+try (InputStream in = new ByteArrayInputStream(inBytes);
+ OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+  IOUtils.copy(in, out);
+}
+
+byte[] outBytes = baos.toByteArray();
+
+ArrowBuf compressedBuffer = 
allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+long uncompressedLength = uncompressedBuffer.writerIndex();
+if (!MemoryUtil.LITTLE_ENDIAN) {
+  uncompressedLength = Long.reverseBytes(uncompressedLength);
+}
+// first 8 bytes reserved for uncompressed length, to be consistent with 
the
+// C++ implementation.
+compressedBuffer.setLong(0, uncompressedLength);
+
+PlatformDependent.copyMemory(
+outBytes, 0, compressedBuffer.memoryAddress() + 
CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length);
+compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + 
outBytes.length);
+return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf 
compressedBuffer) {
+Preconditions.checkArgument(compressedBuffer.writerIndex() <= 
Integer.MAX_VALUE,
+"The compressed buffer size exceeds the integer limit");
+
+Preconditions.checkArgument(compressedBuffer.writerIndex() >= 

[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

2021-03-01 Thread GitBox


liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r585290009



##
File path: 
java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import 
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import 
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf 
uncompressedBuffer) {
+Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= 
Integer.MAX_VALUE,
+"The uncompressed buffer size exceeds the integer limit");
+
+if (uncompressedBuffer.writerIndex() == 0L) {
+  // shortcut for empty buffer
+  ArrowBuf compressedBuffer = 
allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+  compressedBuffer.setLong(0, 0);
+  
compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+  uncompressedBuffer.close();
+  return compressedBuffer;
+}
+
+try {
+  ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+  long compressedLength = compressedBuffer.writerIndex() - 
CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+  if (compressedLength > uncompressedBuffer.writerIndex()) {
+// compressed buffer is larger, send the raw buffer
+compressedBuffer.close();
+compressedBuffer = CompressionUtil.compressRawBuffer(allocator, 
uncompressedBuffer);
+  }
+
+  uncompressedBuffer.close();
+  return compressedBuffer;
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf 
uncompressedBuffer) throws IOException {
+byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 
0, uncompressedBuffer.writerIndex());
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+try (InputStream in = new ByteArrayInputStream(inBytes);
+ OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+  IOUtils.copy(in, out);
+}
+
+byte[] outBytes = baos.toByteArray();
+
+ArrowBuf compressedBuffer = 
allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+long uncompressedLength = uncompressedBuffer.writerIndex();
+if (!MemoryUtil.LITTLE_ENDIAN) {
+  uncompressedLength = Long.reverseBytes(uncompressedLength);
+}
+// first 8 bytes reserved for uncompressed length, to be consistent with 
the
+// C++ implementation.
+compressedBuffer.setLong(0, uncompressedLength);
+
+PlatformDependent.copyMemory(
+outBytes, 0, compressedBuffer.memoryAddress() + 
CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length);
+compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + 
outBytes.length);
+return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf 
compressedBuffer) {
+Preconditions.checkArgument(compressedBuffer.writerIndex() <= 
Integer.MAX_VALUE,
+"The compressed buffer size exceeds the integer limit");
+
+Preconditions.checkArgument(compressedBuffer.writerIndex() >= 

[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

2021-03-01 Thread GitBox


liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r585195875



##
File path: 
java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import 
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import 
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf 
uncompressedBuffer) {
+Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= 
Integer.MAX_VALUE,
+"The uncompressed buffer size exceeds the integer limit");
+
+if (uncompressedBuffer.writerIndex() == 0L) {
+  // shortcut for empty buffer
+  ArrowBuf compressedBuffer = 
allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+  compressedBuffer.setLong(0, 0);
+  
compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+  uncompressedBuffer.close();
+  return compressedBuffer;
+}
+
+try {
+  ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+  long compressedLength = compressedBuffer.writerIndex() - 
CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+  if (compressedLength > uncompressedBuffer.writerIndex()) {
+// compressed buffer is larger, send the raw buffer
+compressedBuffer.close();
+compressedBuffer = CompressionUtil.compressRawBuffer(allocator, 
uncompressedBuffer);
+  }
+
+  uncompressedBuffer.close();
+  return compressedBuffer;
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf 
uncompressedBuffer) throws IOException {
+byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 
0, uncompressedBuffer.writerIndex());
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+try (InputStream in = new ByteArrayInputStream(inBytes);
+ OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+  IOUtils.copy(in, out);
+}
+
+byte[] outBytes = baos.toByteArray();
+
+ArrowBuf compressedBuffer = 
allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+long uncompressedLength = uncompressedBuffer.writerIndex();
+if (!MemoryUtil.LITTLE_ENDIAN) {
+  uncompressedLength = Long.reverseBytes(uncompressedLength);
+}
+// first 8 bytes reserved for uncompressed length, to be consistent with 
the
+// C++ implementation.
+compressedBuffer.setLong(0, uncompressedLength);
+
+PlatformDependent.copyMemory(
+outBytes, 0, compressedBuffer.memoryAddress() + 
CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length);
+compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + 
outBytes.length);
+return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf 
compressedBuffer) {
+Preconditions.checkArgument(compressedBuffer.writerIndex() <= 
Integer.MAX_VALUE,
+"The compressed buffer size exceeds the integer limit");
+
+Preconditions.checkArgument(compressedBuffer.writerIndex() >= 

[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

2021-03-01 Thread GitBox


liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r585195604



##
File path: 
java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import 
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import 
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf 
uncompressedBuffer) {
+Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= 
Integer.MAX_VALUE,
+"The uncompressed buffer size exceeds the integer limit");
+
+if (uncompressedBuffer.writerIndex() == 0L) {
+  // shortcut for empty buffer
+  ArrowBuf compressedBuffer = 
allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+  compressedBuffer.setLong(0, 0);
+  
compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+  uncompressedBuffer.close();
+  return compressedBuffer;
+}
+
+try {
+  ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+  long compressedLength = compressedBuffer.writerIndex() - 
CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+  if (compressedLength > uncompressedBuffer.writerIndex()) {
+// compressed buffer is larger, send the raw buffer
+compressedBuffer.close();
+compressedBuffer = CompressionUtil.compressRawBuffer(allocator, 
uncompressedBuffer);
+  }
+
+  uncompressedBuffer.close();
+  return compressedBuffer;
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf 
uncompressedBuffer) throws IOException {
+byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 
0, uncompressedBuffer.writerIndex());
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+try (InputStream in = new ByteArrayInputStream(inBytes);
+ OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+  IOUtils.copy(in, out);
+}
+
+byte[] outBytes = baos.toByteArray();
+
+ArrowBuf compressedBuffer = 
allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+long uncompressedLength = uncompressedBuffer.writerIndex();
+if (!MemoryUtil.LITTLE_ENDIAN) {
+  uncompressedLength = Long.reverseBytes(uncompressedLength);
+}
+// first 8 bytes reserved for uncompressed length, to be consistent with 
the
+// C++ implementation.
+compressedBuffer.setLong(0, uncompressedLength);
+
+PlatformDependent.copyMemory(
+outBytes, 0, compressedBuffer.memoryAddress() + 
CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length);
+compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + 
outBytes.length);
+return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf 
compressedBuffer) {
+Preconditions.checkArgument(compressedBuffer.writerIndex() <= 
Integer.MAX_VALUE,
+"The compressed buffer size exceeds the integer limit");
+
+Preconditions.checkArgument(compressedBuffer.writerIndex() >= 

[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

2021-03-01 Thread GitBox


liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r585195259



##
File path: 
java/compression/src/main/java/org/apache/arrow/compression/Lz4CompressionCodec.java
##
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.compression;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.compression.CompressionCodec;
+import org.apache.arrow.vector.compression.CompressionUtil;
+import 
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorInputStream;
+import 
org.apache.commons.compress.compressors.lz4.FramedLZ4CompressorOutputStream;
+import org.apache.commons.compress.utils.IOUtils;
+
+import io.netty.util.internal.PlatformDependent;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf 
uncompressedBuffer) {
+Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= 
Integer.MAX_VALUE,
+"The uncompressed buffer size exceeds the integer limit");
+
+if (uncompressedBuffer.writerIndex() == 0L) {
+  // shortcut for empty buffer
+  ArrowBuf compressedBuffer = 
allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+  compressedBuffer.setLong(0, 0);
+  
compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH);
+  uncompressedBuffer.close();
+  return compressedBuffer;
+}
+
+try {
+  ArrowBuf compressedBuffer = doCompress(allocator, uncompressedBuffer);
+  long compressedLength = compressedBuffer.writerIndex() - 
CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH;
+  if (compressedLength > uncompressedBuffer.writerIndex()) {
+// compressed buffer is larger, send the raw buffer
+compressedBuffer.close();
+compressedBuffer = CompressionUtil.compressRawBuffer(allocator, 
uncompressedBuffer);
+  }
+
+  uncompressedBuffer.close();
+  return compressedBuffer;
+} catch (IOException e) {
+  throw new RuntimeException(e);
+}
+  }
+
+  private ArrowBuf doCompress(BufferAllocator allocator, ArrowBuf 
uncompressedBuffer) throws IOException {
+byte[] inBytes = new byte[(int) uncompressedBuffer.writerIndex()];
+PlatformDependent.copyMemory(uncompressedBuffer.memoryAddress(), inBytes, 
0, uncompressedBuffer.writerIndex());
+ByteArrayOutputStream baos = new ByteArrayOutputStream();
+try (InputStream in = new ByteArrayInputStream(inBytes);
+ OutputStream out = new FramedLZ4CompressorOutputStream(baos)) {
+  IOUtils.copy(in, out);
+}
+
+byte[] outBytes = baos.toByteArray();
+
+ArrowBuf compressedBuffer = 
allocator.buffer(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + outBytes.length);
+
+long uncompressedLength = uncompressedBuffer.writerIndex();
+if (!MemoryUtil.LITTLE_ENDIAN) {
+  uncompressedLength = Long.reverseBytes(uncompressedLength);
+}
+// first 8 bytes reserved for uncompressed length, to be consistent with 
the
+// C++ implementation.
+compressedBuffer.setLong(0, uncompressedLength);
+
+PlatformDependent.copyMemory(
+outBytes, 0, compressedBuffer.memoryAddress() + 
CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH, outBytes.length);
+compressedBuffer.writerIndex(CompressionUtil.SIZE_OF_UNCOMPRESSED_LENGTH + 
outBytes.length);
+return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf 
compressedBuffer) {
+Preconditions.checkArgument(compressedBuffer.writerIndex() <= 
Integer.MAX_VALUE,
+"The compressed buffer size exceeds the integer limit");
+
+Preconditions.checkArgument(compressedBuffer.writerIndex() >= 

[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

2021-02-12 Thread GitBox


liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r575091316



##
File path: java/vector/pom.xml
##
@@ -74,6 +74,11 @@
   org.slf4j
   slf4j-api
 
+

Review comment:
   @emkornfield Sounds reasonable. I will try to revise the PR accordingly. 
Thanks for your good suggestion. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

2021-01-15 Thread GitBox


liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r558083064



##
File path: 
java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java
##
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.compression;
+
+import static org.apache.arrow.memory.util.MemoryUtil.LITTLE_ENDIAN;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  private static final long SIZE_OF_MESSAGE_LENGTH = 8L;
+
+  private final LZ4Factory factory;
+
+  private LZ4Compressor compressor;
+
+  private LZ4FastDecompressor decompressor;
+
+  public Lz4CompressionCodec() {
+factory = LZ4Factory.fastestInstance();
+  }
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf 
uncompressedBuffer) {
+Preconditions.checkArgument(uncompressedBuffer.writerIndex() <= 
Integer.MAX_VALUE,
+"The uncompressed buffer size exceeds the integer limit");
+
+if (uncompressedBuffer.writerIndex() == 0L) {
+  // shortcut for empty buffer
+  ArrowBuf compressedBuffer = allocator.buffer(SIZE_OF_MESSAGE_LENGTH);
+  compressedBuffer.setLong(0, 0);
+  compressedBuffer.writerIndex(SIZE_OF_MESSAGE_LENGTH);
+  uncompressedBuffer.close();
+  return compressedBuffer;
+}
+
+// create compressor lazily
+if (compressor == null) {
+  compressor = factory.fastCompressor();
+}
+
+int maxCompressedLength = compressor.maxCompressedLength((int) 
uncompressedBuffer.writerIndex());
+
+// first 8 bytes reserved for uncompressed length, to be consistent with 
the
+// C++ implementation.
+ArrowBuf compressedBuffer = allocator.buffer(maxCompressedLength + 
SIZE_OF_MESSAGE_LENGTH);
+long uncompressedLength = uncompressedBuffer.writerIndex();
+if (!LITTLE_ENDIAN) {
+  uncompressedLength = Long.reverseBytes(uncompressedLength);
+}
+compressedBuffer.setLong(0, uncompressedLength);
+
+ByteBuffer uncompressed =
+MemoryUtil.directBuffer(uncompressedBuffer.memoryAddress(), (int) 
uncompressedBuffer.writerIndex());
+ByteBuffer compressed =
+MemoryUtil.directBuffer(compressedBuffer.memoryAddress() + 
SIZE_OF_MESSAGE_LENGTH, maxCompressedLength);
+
+int compressedLength = compressor.compress(
+uncompressed, 0, (int) uncompressedBuffer.writerIndex(), compressed, 
0, maxCompressedLength);
+compressedBuffer.writerIndex(compressedLength + SIZE_OF_MESSAGE_LENGTH);
+
+uncompressedBuffer.close();
+return compressedBuffer;
+  }
+
+  @Override
+  public ArrowBuf decompress(BufferAllocator allocator, ArrowBuf 
compressedBuffer) {
+Preconditions.checkArgument(compressedBuffer.writerIndex() <= 
Integer.MAX_VALUE,
+"The compressed buffer size exceeds the integer limit");
+
+Preconditions.checkArgument(compressedBuffer.writerIndex() >= 
SIZE_OF_MESSAGE_LENGTH,
+"Not enough data to decompress.");
+
+long decompressedLength = compressedBuffer.getLong(0);
+if (!LITTLE_ENDIAN) {
+  decompressedLength = Long.reverseBytes(decompressedLength);
+}
+
+if (decompressedLength == 0L) {
+  // shortcut for empty buffer
+  compressedBuffer.close();
+  return allocator.getEmpty();
+}
+
+// create decompressor lazily
+if (decompressor == null) {
+  decompressor = factory.fastDecompressor();
+}
+
+ByteBuffer compressed = MemoryUtil.directBuffer(
+compressedBuffer.memoryAddress() + SIZE_OF_MESSAGE_LENGTH, (int) 
compressedBuffer.writerIndex());

Review comment:
   Nice catch. Thank you @stczwd





This is an automated message from the Apache 

[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

2021-01-05 Thread GitBox


liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r552335719



##
File path: 
java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.compression;
+
+import static org.apache.arrow.memory.util.MemoryUtil.LITTLE_ENDIAN;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  private static final long SIZE_OF_MESSAGE_LENGTH = 8L;
+
+  private final LZ4Factory factory;
+
+  private LZ4Compressor compressor;
+
+  private LZ4FastDecompressor decompressor;
+
+  public Lz4CompressionCodec() {
+factory = LZ4Factory.fastestInstance();
+  }
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf 
unCompressedBuffer) {
+Preconditions.checkArgument(unCompressedBuffer.writerIndex() <= 
Integer.MAX_VALUE,
+"The uncompressed buffer size exceeds the integer limit");
+
+// create compressor lazily

Review comment:
   For some scenarios (e.g. flight sender), we only need the compressor, 
while for others (e.g. flight receiver), we only need the decompressor. So 
there is no need to create both eagerly. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

2021-01-05 Thread GitBox


liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r552335378



##
File path: 
java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.compression;
+
+import static org.apache.arrow.memory.util.MemoryUtil.LITTLE_ENDIAN;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  private static final long SIZE_OF_MESSAGE_LENGTH = 8L;
+
+  private final LZ4Factory factory;
+
+  private LZ4Compressor compressor;
+
+  private LZ4FastDecompressor decompressor;
+
+  public Lz4CompressionCodec() {
+factory = LZ4Factory.fastestInstance();
+  }
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf 
unCompressedBuffer) {

Review comment:
   Nice catch. Thank you!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

2021-01-05 Thread GitBox


liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r552332372



##
File path: 
java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.compression;
+
+import static org.apache.arrow.memory.util.MemoryUtil.LITTLE_ENDIAN;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+
+import net.jpountz.lz4.LZ4Compressor;

Review comment:
   @kiszk You are right. I chose this library because our C++ 
implementation also depends on this repo (https://github.com/lz4/lz4). 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [arrow] liyafan82 commented on a change in pull request #8949: ARROW-10880: [Java] Support compressing RecordBatch IPC buffers by LZ4

2020-12-20 Thread GitBox


liyafan82 commented on a change in pull request #8949:
URL: https://github.com/apache/arrow/pull/8949#discussion_r546486677



##
File path: 
java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java
##
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.compression;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  private static final long SIZE_OF_MESSAGE_LENGTH = 8L;
+
+  private final LZ4Factory factory;
+
+  private LZ4Compressor compressor;
+
+  private LZ4FastDecompressor decompressor;
+
+  public Lz4CompressionCodec() {
+factory = LZ4Factory.fastestInstance();
+  }
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf 
unCompressedBuffer) {
+Preconditions.checkArgument(unCompressedBuffer.writerIndex() <= 
Integer.MAX_VALUE,
+"The uncompressed buffer size exceeds the integer limit");
+
+// create compressor lazily
+if (compressor == null) {
+  compressor = factory.fastCompressor();
+}
+
+int maxCompressedLength = compressor.maxCompressedLength((int) 
unCompressedBuffer.writerIndex());
+
+// first 8 bytes reserved for uncompressed length, to be consistent with 
the
+// C++ implementation.
+ArrowBuf compressedBuffer = allocator.buffer(maxCompressedLength + 
SIZE_OF_MESSAGE_LENGTH);
+compressedBuffer.setLong(0, unCompressedBuffer.writerIndex());

Review comment:
   Revised accordingly. Thanks for your kind reminder.

##
File path: 
java/vector/src/main/java/org/apache/arrow/vector/compression/Lz4CompressionCodec.java
##
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.vector.compression;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.flatbuf.CompressionType;
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.util.MemoryUtil;
+import org.apache.arrow.util.Preconditions;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+import net.jpountz.lz4.LZ4FastDecompressor;
+
+/**
+ * Compression codec for the LZ4 algorithm.
+ */
+public class Lz4CompressionCodec implements CompressionCodec {
+
+  private static final long SIZE_OF_MESSAGE_LENGTH = 8L;
+
+  private final LZ4Factory factory;
+
+  private LZ4Compressor compressor;
+
+  private LZ4FastDecompressor decompressor;
+
+  public Lz4CompressionCodec() {
+factory = LZ4Factory.fastestInstance();
+  }
+
+  @Override
+  public ArrowBuf compress(BufferAllocator allocator, ArrowBuf 
unCompressedBuffer) {
+Preconditions.checkArgument(unCompressedBuffer.writerIndex() <= 
Integer.MAX_VALUE,
+"The uncompressed buffer size exceeds the integer limit");
+
+// create compressor lazily
+if (compressor == null) {
+  compressor = factory.fastCompressor();
+}
+
+int maxCompressedLength = compressor.maxCompressedLength((int)