[GitHub] [parquet-mr] parthchandra commented on a diff in pull request #1008: PARQUET-2212: Add ByteBuffer api for decryptors to allow direct memory to be decrypted

2023-01-13 Thread GitBox


parthchandra commented on code in PR #1008:
URL: https://github.com/apache/parquet-mr/pull/1008#discussion_r1069182893


##
parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java:
##
@@ -51,17 +52,26 @@
  * @param AAD - Additional Authenticated Data for the decryption (ignored 
in case of CTR cipher)
  * @return plaintext - starts at offset 0 of the output value, and fills 
up the entire byte array.
  */
-public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);
+byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);
 
 /**
+ * Convenience decryption method that reads the length and ciphertext from 
a ByteBuffer
+ *
+ * @param from ByteBuffer with length and ciphertext.
+ * @param AAD - Additional Authenticated Data for the decryption (ignored 
in case of CTR cipher)
+ * @return plaintext -  starts at offset 0 of the output, and fills up the 
entire byte buffer.

Review Comment:
   Done



##
parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java:
##
@@ -36,11 +37,11 @@
  * The ciphertext includes the nonce and (in case of GCM cipher) the tag, 
as detailed in the 
  * Parquet Modular Encryption specification.
  */
-public byte[] encrypt(byte[] plaintext, byte[] AAD);
+byte[] encrypt(byte[] plaintext, byte[] AAD);

Review Comment:
   Probably. This pr only focusses on the read path though. I can add the 
encryptor api when I look at the write path. 



##
parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java:
##
@@ -86,6 +87,48 @@ public byte[] decrypt(byte[] ciphertext, int 
cipherTextOffset, int cipherTextLen
 
 return plainText;
   }
+  public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) {
+

Review Comment:
   Done



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java:
##
@@ -144,6 +144,11 @@
*/
   public static final String BLOOM_FILTERING_ENABLED = 
"parquet.filter.bloom.enabled";
 
+  /**
+* Key to configure if off-heap buffer should be used for decryption
+*/
+  public static final String OFF_HEAP_DECRYPT_BUFFER_ENABLED = 
"parquet.decrypt.off-heap.buffer.enabled";

Review Comment:
   Done



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##
@@ -133,11 +135,36 @@ public DataPage readPage() {
 public DataPage visit(DataPageV1 dataPageV1) {

Review Comment:
   Actually, it does. Added it for V2 and updated the unit tests as well. 



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java:
##
@@ -242,6 +245,12 @@ public Builder withFilter(Filter filter) {
   return this;
 }
 
+public Builder withAllocator(ByteBufferAllocator allocator) {

Review Comment:
   Well, yes. ParquetReader has an instance of `ParquetReadOptions` and this 
builder simply passes the allocator down to that instance. Adding this here is 
mirroring the other options being set in `ParquetReader.Builder`. Used in the 
unit tests.



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##
@@ -133,11 +135,36 @@ public DataPage readPage() {
 public DataPage visit(DataPageV1 dataPageV1) {
   try {
 BytesInput bytes = dataPageV1.getBytes();
-if (null != blockDecryptor) {
-  bytes = 
BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
+BytesInput decompressed;
+
+if (options.getAllocator().isDirect() && 
options.useOffHeapDecryptBuffer()) {
+  ByteBuffer byteBuffer = bytes.toByteBuffer();
+  if (!byteBuffer.isDirect()) {

Review Comment:
   I think there was a previous discussion on this and we simplified the code 
to throw an error. Basically, we expect that a user the ParquetReader will be 
using either a direct buffer or a heap buffer all the way thru, so that 
encountering both a direct buffer and a heap buffer is an error.



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java:
##
@@ -187,6 +189,7 @@ public static  Builder builder(ReadSupport 
readSupport, Path path) {
 private final InputFile file;
 private final Path path;
 private Filter filter = null;
+private ByteBufferAllocator  allocator = new HeapByteBufferAllocator();

Review Comment:
   Done



##
parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java:
##
@@ -78,6 +79,38 @@ public byte[] decrypt(byte[] ciphertext, int 
cipherTextOffset, int cipherTextLen
 return plainText;
   }
 
+  public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) {

Review Comment:
   Done



##
parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java:
##
@@ -86,6 +87,48 @@ public byte[] decrypt(byte[] ciphertext, 

[GitHub] [parquet-mr] parthchandra commented on a diff in pull request #1008: PARQUET-2212: Add ByteBuffer api for decryptors to allow direct memory to be decrypted

2022-12-05 Thread GitBox


parthchandra commented on code in PR #1008:
URL: https://github.com/apache/parquet-mr/pull/1008#discussion_r1039958501


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##
@@ -133,11 +135,36 @@ public DataPage readPage() {
 public DataPage visit(DataPageV1 dataPageV1) {
   try {
 BytesInput bytes = dataPageV1.getBytes();
-if (null != blockDecryptor) {
-  bytes = 
BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
+BytesInput decompressed;
+
+if (options.getAllocator().isDirect() && 
options.useOffHeapDecryptBuffer()) {
+  ByteBuffer byteBuffer = bytes.toByteBuffer();
+  if (!byteBuffer.isDirect()) {
+throw new ParquetDecodingException("Expected a direct buffer");
+  }
+  if (blockDecryptor != null) {
+byteBuffer = blockDecryptor.decrypt(byteBuffer, dataPageAAD);
+  }
+  long compressedSize = byteBuffer.limit();
+
+  ByteBuffer decompressedBuffer =
+  
options.getAllocator().allocate(dataPageV1.getUncompressedSize());
+  decompressor.decompress(byteBuffer, (int) compressedSize, 
decompressedBuffer,
+  dataPageV1.getUncompressedSize());
+
+  // HACKY: sometimes we need to do `flip` because the position of 
output bytebuffer is

Review Comment:
   The output buffer is set, but the position is not reset after the call to 
some direct buffer decompressors. (Not clear to me where in the direct 
decompression it happens; it might be worth looking into). It is safe (and not 
expensive) to call flip.



##
parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java:
##
@@ -44,6 +44,8 @@ public class ParquetReadOptions {
   private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB
   private static final boolean PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT = false;
   private static final boolean BLOOM_FILTER_ENABLED_DEFAULT = true;
+  // Default to true if JDK 17 or newer.

Review Comment:
   Oops. Comment got left behind from the original. I changed the 
initialization after some review comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [parquet-mr] parthchandra commented on a diff in pull request #1008: PARQUET-2212: Add ByteBuffer api for decryptors to allow direct memory to be decrypted

2022-11-15 Thread GitBox


parthchandra commented on code in PR #1008:
URL: https://github.com/apache/parquet-mr/pull/1008#discussion_r1023355113


##
parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java:
##
@@ -44,6 +46,9 @@ public class ParquetReadOptions {
   private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB
   private static final boolean PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT = false;
   private static final boolean BLOOM_FILTER_ENABLED_DEFAULT = true;
+  // Default to true if JDK 17 or newer.
+  private static final boolean USE_OFF_HEAP_DECRYPT_BUFFER_DEFAULT =
+SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_17);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [parquet-mr] parthchandra commented on a diff in pull request #1008: PARQUET-2212: Add ByteBuffer api for decryptors to allow direct memory to be decrypted

2022-11-14 Thread GitBox


parthchandra commented on code in PR #1008:
URL: https://github.com/apache/parquet-mr/pull/1008#discussion_r1021927002


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##
@@ -133,11 +135,33 @@ public DataPage readPage() {
 public DataPage visit(DataPageV1 dataPageV1) {
   try {
 BytesInput bytes = dataPageV1.getBytes();
-if (null != blockDecryptor) {
-  bytes = 
BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
+ByteBuffer byteBuffer = bytes.toByteBuffer();
+BytesInput decompressed;
+
+if (byteBuffer.isDirect() && options.useOffHeapDecryptBuffer()) {
+  if (blockDecryptor != null) {
+byteBuffer = blockDecryptor.decrypt(byteBuffer, dataPageAAD);
+  }
+  long compressedSize = byteBuffer.limit();
+
+  ByteBuffer decompressedBuffer =
+  ByteBuffer.allocateDirect(dataPageV1.getUncompressedSize());
+  decompressor.decompress(byteBuffer, (int) compressedSize, 
decompressedBuffer,

Review Comment:
   Not really adding direct buffer support for decompresson. That is already 
implemented ((see `FullDirectDecompressor`).
   For this path only (ie direct buffer and direct buffer decryption is 
enabled), the buffer returned by decrypt is a direct byte buffer. In this case, 
it is efficient to call the direct byte buffer decompress api which is already 
available.



##
parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java:
##
@@ -196,13 +205,13 @@ public static Collection data() {
 .append(COLUMN_MASTER_KEY_IDS[5]).append(": 
").append(SingleRow.FIXED_LENGTH_BINARY_FIELD_NAME)
 .toString();
 
-  private static final int NUM_THREADS = 4;
+  private static final int NUM_THREADS = 1;

Review Comment:
   Oops. My mistake. 



##
parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java:
##
@@ -44,6 +46,9 @@ public class ParquetReadOptions {
   private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB
   private static final boolean PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT = false;
   private static final boolean BLOOM_FILTER_ENABLED_DEFAULT = true;
+  // Default to true if JDK 17 or newer.
+  private static final boolean USE_OFF_HEAP_DECRYPT_BUFFER_DEFAULT =
+SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_17);

Review Comment:
   It looks like ByteBuffers do not have hardware acceleration enabled until 
JDK17. So, if and only if, the file read buffer is  a direct byte buffer and 
JDK is greater than 17, decryption will be as fast as byte arrays, but we will 
avoid the copy from a direct byte buffer to a byte array in 
`ColumnChunkPageReadStore`
   I can default to false if you prefer.
   



##
parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java:
##
@@ -51,17 +52,26 @@
  * @param AAD - Additional Authenticated Data for the decryption (ignored 
in case of CTR cipher)
  * @return plaintext - starts at offset 0 of the output value, and fills 
up the entire byte array.
  */
-public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);
+byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD);
 
 /**
+ * Convenience decryption method that reads the length and ciphertext from 
a ByteBuffer
+ *
+ * @param from ByteBuffer with length and ciphertext.
+ * @param AAD - Additional Authenticated Data for the decryption (ignored 
in case of CTR cipher)
+ * @return plaintext -  starts at offset 0 of the output, and fills up the 
entire byte array.

Review Comment:
   Done



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##
@@ -133,11 +135,33 @@ public DataPage readPage() {
 public DataPage visit(DataPageV1 dataPageV1) {
   try {
 BytesInput bytes = dataPageV1.getBytes();
-if (null != blockDecryptor) {
-  bytes = 
BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
+ByteBuffer byteBuffer = bytes.toByteBuffer();

Review Comment:
   Done.



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java:
##
@@ -133,11 +135,33 @@ public DataPage readPage() {
 public DataPage visit(DataPageV1 dataPageV1) {
   try {
 BytesInput bytes = dataPageV1.getBytes();
-if (null != blockDecryptor) {
-  bytes = 
BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD));
+ByteBuffer byteBuffer = bytes.toByteBuffer();
+BytesInput decompressed;
+
+if (byteBuffer.isDirect() && options.useOffHeapDecryptBuffer()) {
+  if (blockDecryptor != null) {
+byteBuffer = blockDecryptor.decrypt(byteBuffer,