[GitHub] [parquet-mr] parthchandra commented on a diff in pull request #1008: PARQUET-2212: Add ByteBuffer api for decryptors to allow direct memory to be decrypted
parthchandra commented on code in PR #1008: URL: https://github.com/apache/parquet-mr/pull/1008#discussion_r1069182893 ## parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java: ## @@ -51,17 +52,26 @@ * @param AAD - Additional Authenticated Data for the decryption (ignored in case of CTR cipher) * @return plaintext - starts at offset 0 of the output value, and fills up the entire byte array. */ -public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD); +byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD); /** + * Convenience decryption method that reads the length and ciphertext from a ByteBuffer + * + * @param from ByteBuffer with length and ciphertext. + * @param AAD - Additional Authenticated Data for the decryption (ignored in case of CTR cipher) + * @return plaintext - starts at offset 0 of the output, and fills up the entire byte buffer. Review Comment: Done ## parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java: ## @@ -36,11 +37,11 @@ * The ciphertext includes the nonce and (in case of GCM cipher) the tag, as detailed in the * Parquet Modular Encryption specification. */ -public byte[] encrypt(byte[] plaintext, byte[] AAD); +byte[] encrypt(byte[] plaintext, byte[] AAD); Review Comment: Probably. This pr only focusses on the read path though. I can add the encryptor api when I look at the write path. ## parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java: ## @@ -86,6 +87,48 @@ public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int cipherTextLen return plainText; } + public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) { + Review Comment: Done ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java: ## @@ -144,6 +144,11 @@ */ public static final String BLOOM_FILTERING_ENABLED = "parquet.filter.bloom.enabled"; + /** +* Key to configure if off-heap buffer should be used for decryption +*/ + public static final String OFF_HEAP_DECRYPT_BUFFER_ENABLED = "parquet.decrypt.off-heap.buffer.enabled"; Review Comment: Done ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java: ## @@ -133,11 +135,36 @@ public DataPage readPage() { public DataPage visit(DataPageV1 dataPageV1) { Review Comment: Actually, it does. Added it for V2 and updated the unit tests as well. ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java: ## @@ -242,6 +245,12 @@ public Builder withFilter(Filter filter) { return this; } +public Builder withAllocator(ByteBufferAllocator allocator) { Review Comment: Well, yes. ParquetReader has an instance of `ParquetReadOptions` and this builder simply passes the allocator down to that instance. Adding this here is mirroring the other options being set in `ParquetReader.Builder`. Used in the unit tests. ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java: ## @@ -133,11 +135,36 @@ public DataPage readPage() { public DataPage visit(DataPageV1 dataPageV1) { try { BytesInput bytes = dataPageV1.getBytes(); -if (null != blockDecryptor) { - bytes = BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD)); +BytesInput decompressed; + +if (options.getAllocator().isDirect() && options.useOffHeapDecryptBuffer()) { + ByteBuffer byteBuffer = bytes.toByteBuffer(); + if (!byteBuffer.isDirect()) { Review Comment: I think there was a previous discussion on this and we simplified the code to throw an error. Basically, we expect that a user the ParquetReader will be using either a direct buffer or a heap buffer all the way thru, so that encountering both a direct buffer and a heap buffer is an error. ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java: ## @@ -187,6 +189,7 @@ public static Builder builder(ReadSupport readSupport, Path path) { private final InputFile file; private final Path path; private Filter filter = null; +private ByteBufferAllocator allocator = new HeapByteBufferAllocator(); Review Comment: Done ## parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java: ## @@ -78,6 +79,38 @@ public byte[] decrypt(byte[] ciphertext, int cipherTextOffset, int cipherTextLen return plainText; } + public ByteBuffer decrypt(ByteBuffer ciphertext, byte[] AAD) { Review Comment: Done ## parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCtrDecryptor.java: ## @@ -86,6 +87,48 @@ public byte[] decrypt(byte[] ciphertext,
[GitHub] [parquet-mr] parthchandra commented on a diff in pull request #1008: PARQUET-2212: Add ByteBuffer api for decryptors to allow direct memory to be decrypted
parthchandra commented on code in PR #1008: URL: https://github.com/apache/parquet-mr/pull/1008#discussion_r1039958501 ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java: ## @@ -133,11 +135,36 @@ public DataPage readPage() { public DataPage visit(DataPageV1 dataPageV1) { try { BytesInput bytes = dataPageV1.getBytes(); -if (null != blockDecryptor) { - bytes = BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD)); +BytesInput decompressed; + +if (options.getAllocator().isDirect() && options.useOffHeapDecryptBuffer()) { + ByteBuffer byteBuffer = bytes.toByteBuffer(); + if (!byteBuffer.isDirect()) { +throw new ParquetDecodingException("Expected a direct buffer"); + } + if (blockDecryptor != null) { +byteBuffer = blockDecryptor.decrypt(byteBuffer, dataPageAAD); + } + long compressedSize = byteBuffer.limit(); + + ByteBuffer decompressedBuffer = + options.getAllocator().allocate(dataPageV1.getUncompressedSize()); + decompressor.decompress(byteBuffer, (int) compressedSize, decompressedBuffer, + dataPageV1.getUncompressedSize()); + + // HACKY: sometimes we need to do `flip` because the position of output bytebuffer is Review Comment: The output buffer is set, but the position is not reset after the call to some direct buffer decompressors. (Not clear to me where in the direct decompression it happens; it might be worth looking into). It is safe (and not expensive) to call flip. ## parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java: ## @@ -44,6 +44,8 @@ public class ParquetReadOptions { private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB private static final boolean PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT = false; private static final boolean BLOOM_FILTER_ENABLED_DEFAULT = true; + // Default to true if JDK 17 or newer. Review Comment: Oops. Comment got left behind from the original. I changed the initialization after some review comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [parquet-mr] parthchandra commented on a diff in pull request #1008: PARQUET-2212: Add ByteBuffer api for decryptors to allow direct memory to be decrypted
parthchandra commented on code in PR #1008: URL: https://github.com/apache/parquet-mr/pull/1008#discussion_r1023355113 ## parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java: ## @@ -44,6 +46,9 @@ public class ParquetReadOptions { private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB private static final boolean PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT = false; private static final boolean BLOOM_FILTER_ENABLED_DEFAULT = true; + // Default to true if JDK 17 or newer. + private static final boolean USE_OFF_HEAP_DECRYPT_BUFFER_DEFAULT = +SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_17); Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [parquet-mr] parthchandra commented on a diff in pull request #1008: PARQUET-2212: Add ByteBuffer api for decryptors to allow direct memory to be decrypted
parthchandra commented on code in PR #1008: URL: https://github.com/apache/parquet-mr/pull/1008#discussion_r1021927002 ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java: ## @@ -133,11 +135,33 @@ public DataPage readPage() { public DataPage visit(DataPageV1 dataPageV1) { try { BytesInput bytes = dataPageV1.getBytes(); -if (null != blockDecryptor) { - bytes = BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD)); +ByteBuffer byteBuffer = bytes.toByteBuffer(); +BytesInput decompressed; + +if (byteBuffer.isDirect() && options.useOffHeapDecryptBuffer()) { + if (blockDecryptor != null) { +byteBuffer = blockDecryptor.decrypt(byteBuffer, dataPageAAD); + } + long compressedSize = byteBuffer.limit(); + + ByteBuffer decompressedBuffer = + ByteBuffer.allocateDirect(dataPageV1.getUncompressedSize()); + decompressor.decompress(byteBuffer, (int) compressedSize, decompressedBuffer, Review Comment: Not really adding direct buffer support for decompresson. That is already implemented ((see `FullDirectDecompressor`). For this path only (ie direct buffer and direct buffer decryption is enabled), the buffer returned by decrypt is a direct byte buffer. In this case, it is efficient to call the direct byte buffer decompress api which is already available. ## parquet-hadoop/src/test/java/org/apache/parquet/crypto/TestPropertiesDrivenEncryption.java: ## @@ -196,13 +205,13 @@ public static Collection data() { .append(COLUMN_MASTER_KEY_IDS[5]).append(": ").append(SingleRow.FIXED_LENGTH_BINARY_FIELD_NAME) .toString(); - private static final int NUM_THREADS = 4; + private static final int NUM_THREADS = 1; Review Comment: Oops. My mistake. ## parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java: ## @@ -44,6 +46,9 @@ public class ParquetReadOptions { private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB private static final boolean PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT = false; private static final boolean BLOOM_FILTER_ENABLED_DEFAULT = true; + // Default to true if JDK 17 or newer. + private static final boolean USE_OFF_HEAP_DECRYPT_BUFFER_DEFAULT = +SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_17); Review Comment: It looks like ByteBuffers do not have hardware acceleration enabled until JDK17. So, if and only if, the file read buffer is a direct byte buffer and JDK is greater than 17, decryption will be as fast as byte arrays, but we will avoid the copy from a direct byte buffer to a byte array in `ColumnChunkPageReadStore` I can default to false if you prefer. ## parquet-format-structures/src/main/java/org/apache/parquet/format/BlockCipher.java: ## @@ -51,17 +52,26 @@ * @param AAD - Additional Authenticated Data for the decryption (ignored in case of CTR cipher) * @return plaintext - starts at offset 0 of the output value, and fills up the entire byte array. */ -public byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD); +byte[] decrypt(byte[] lengthAndCiphertext, byte[] AAD); /** + * Convenience decryption method that reads the length and ciphertext from a ByteBuffer + * + * @param from ByteBuffer with length and ciphertext. + * @param AAD - Additional Authenticated Data for the decryption (ignored in case of CTR cipher) + * @return plaintext - starts at offset 0 of the output, and fills up the entire byte array. Review Comment: Done ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java: ## @@ -133,11 +135,33 @@ public DataPage readPage() { public DataPage visit(DataPageV1 dataPageV1) { try { BytesInput bytes = dataPageV1.getBytes(); -if (null != blockDecryptor) { - bytes = BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD)); +ByteBuffer byteBuffer = bytes.toByteBuffer(); Review Comment: Done. ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java: ## @@ -133,11 +135,33 @@ public DataPage readPage() { public DataPage visit(DataPageV1 dataPageV1) { try { BytesInput bytes = dataPageV1.getBytes(); -if (null != blockDecryptor) { - bytes = BytesInput.from(blockDecryptor.decrypt(bytes.toByteArray(), dataPageAAD)); +ByteBuffer byteBuffer = bytes.toByteBuffer(); +BytesInput decompressed; + +if (byteBuffer.isDirect() && options.useOffHeapDecryptBuffer()) { + if (blockDecryptor != null) { +byteBuffer = blockDecryptor.decrypt(byteBuffer,