[GitHub] [parquet-mr] gszadovszky commented on a change in pull request #776: PARQUET-1229: Parquet MR encryption
gszadovszky commented on a change in pull request #776: URL: https://github.com/apache/parquet-mr/pull/776#discussion_r428673721 ## File path: parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCipher.java ## @@ -90,6 +102,10 @@ // Update last two bytes with new page ordinal (instead of creating new page AAD from scratch) public static void quickUpdatePageAAD(byte[] pageAAD, short newPageOrdinal) { +if (newPageOrdinal < 0) { Review comment: I agree on validating the arguments is important. But the proper exception to be thrown for a `null` is a `NullPointerException` that would be thrown at line 134 anyway. If you really want to validate the argument for `null` at the first line of the method I would suggest using `java.util.Objects.requireNonNull(Object)`. ## File path: parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCipher.java ## @@ -78,11 +78,22 @@ if (rowGroupOrdinal < 0) { throw new IllegalArgumentException("Wrong row group ordinal: " + rowGroupOrdinal); } -byte[] rowGroupOrdinalBytes = shortToBytesLE(rowGroupOrdinal); +short shortRGOrdinal = (short) rowGroupOrdinal; +if (shortRGOrdinal != rowGroupOrdinal) { + throw new ParquetCryptoRuntimeException("Encrypted parquet files can't have " + + "more than Short.MAX_VALUE row groups: " + rowGroupOrdinal); Review comment: I think, writing the actual value instead of referencing a java constant is more informative. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [parquet-mr] gszadovszky commented on a change in pull request #776: PARQUET-1229: Parquet MR encryption
gszadovszky commented on a change in pull request #776: URL: https://github.com/apache/parquet-mr/pull/776#discussion_r428562074 ## File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java ## @@ -1131,7 +1329,19 @@ public OffsetIndex readOffsetIndex(ColumnChunkMetaData column) throws IOExceptio return null; } f.seek(ref.getOffset()); -return ParquetMetadataConverter.fromParquetOffsetIndex(Util.readOffsetIndex(f)); + +column.decryptIfNeededed(); Review comment: Same as above. ## File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java ## @@ -1035,20 +1154,89 @@ private static void serializeBloomFilters( long offset = out.getPos(); column.setBloomFilterOffset(offset); - Util.writeBloomFilterHeader(ParquetMetadataConverter.toBloomFilterHeader(bloomFilter), out); -bloomFilter.writeTo(out); + +BlockCipher.Encryptor bloomFilterEncryptor = null; +byte[] bloomFilterHeaderAAD = null; +byte[] bloomFilterBitsetAAD = null; +if (null != fileEncryptor) { + InternalColumnEncryptionSetup columnEncryptionSetup = fileEncryptor.getColumnSetup(column.getPath(), false, (short) cIndex); + if (columnEncryptionSetup.isEncrypted()) { +bloomFilterEncryptor = columnEncryptionSetup.getMetaDataEncryptor(); +short columnOrdinal = columnEncryptionSetup.getOrdinal(); +bloomFilterHeaderAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.BloomFilterHeader, +block.getOrdinal(), columnOrdinal, (short)-1); +bloomFilterBitsetAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.BloomFilterBitset, +block.getOrdinal(), columnOrdinal, (short)-1); + } +} + + Util.writeBloomFilterHeader(ParquetMetadataConverter.toBloomFilterHeader(bloomFilter), out, +bloomFilterEncryptor, bloomFilterHeaderAAD); + +ByteArrayOutputStream tempOutStream = new ByteArrayOutputStream(); +bloomFilter.writeTo(tempOutStream); +byte[] serializedBitset = tempOutStream.toByteArray(); +if (null != bloomFilterEncryptor) { + serializedBitset = bloomFilterEncryptor.encrypt(serializedBitset, bloomFilterBitsetAAD); +} +out.write(serializedBitset); } } } - - private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out) throws IOException { -long footerIndex = out.getPos(); + + private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out, + InternalFileEncryptor fileEncryptor) throws IOException { + ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(); -org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION, footer); -writeFileMetaData(parquetMetadata, out); -LOG.debug("{}: footer length = {}" , out.getPos(), (out.getPos() - footerIndex)); -BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex)); -out.write(MAGIC); + +// Unencrypted file +if (null == fileEncryptor) { + long footerIndex = out.getPos(); + org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION, footer); + writeFileMetaData(parquetMetadata, out); + LOG.debug("{}: footer length = {}" , out.getPos(), (out.getPos() - footerIndex)); + BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex)); + out.write(MAGIC); + return; +} + +org.apache.parquet.format.FileMetaData parquetMetadata = +metadataConverter.toParquetMetadata(CURRENT_VERSION, footer, fileEncryptor); + +// Encrypted file with plaintext footer +if (!fileEncryptor.isFooterEncrypted()) { Review comment: I don't know why we think that the footer length is an important information but this is the only case where we do not log it. We might want to add it here as well. ## File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java ## @@ -1071,12 +1229,31 @@ public BloomFilterReader getBloomFilterDataReader(BlockMetaData block) { */ public BloomFilter readBloomFilter(ColumnChunkMetaData meta) throws IOException { long bloomFilterOffset = meta.getBloomFilterOffset(); + +if (0 == bloomFilterOffset) { // TODO Junjie - is there a better way to handle this? Review comment: @chenjunjiedada, could you please check this? ## File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java ## @@ -1114,7 +1299,20 @@ public ColumnIndex readColumnIndex(ColumnChunkMetaData column) throws IOExceptio return null; }
[GitHub] [parquet-mr] gszadovszky commented on a change in pull request #776: PARQUET-1229: Parquet MR encryption
gszadovszky commented on a change in pull request #776: URL: https://github.com/apache/parquet-mr/pull/776#discussion_r428537220 ## File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java ## @@ -705,22 +797,39 @@ public ParquetFileReader(Configuration conf, Path file, ParquetMetadata footer) paths.put(ColumnPath.get(col.getPath()), col); } this.crc = options.usePageChecksumVerification() ? new CRC32() : null; +this.fileDecryptor = fileMetaData.getFileDecryptor(); } public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException { +this(file, options, null); + } + + public ParquetFileReader(InputFile file, ParquetReadOptions options, + FileDecryptionProperties fileDecryptionProperties) throws IOException { this.converter = new ParquetMetadataConverter(options); this.file = file; this.f = file.newStream(); this.options = options; +if ((null == fileDecryptionProperties) && (file instanceof HadoopInputFile)) { + HadoopInputFile hadoopFile = (HadoopInputFile) file; + fileDecryptionProperties = getDecryptionProperties(hadoopFile.getPath(), hadoopFile.getConfiguration()); Review comment: `ParquetReadOptions` was created to carry all the required properties for reading a parquet file. If the `Path` is necessary for the decryption then we might add it to the options as well. If we decide to not to add it still, I would use the options object to carry any other decryption properties. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [parquet-mr] gszadovszky commented on a change in pull request #776: PARQUET-1229: Parquet MR encryption
gszadovszky commented on a change in pull request #776: URL: https://github.com/apache/parquet-mr/pull/776#discussion_r428535767 ## File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java ## @@ -442,7 +460,13 @@ static ParquetMetadata readSummaryMetadata(Configuration configuration, Path bas */ @Deprecated public static final ParquetMetadata readFooter(Configuration configuration, Path file) throws IOException { -return readFooter(configuration, file, NO_FILTER); +return readFooter(configuration, file, getDecryptionProperties(file, configuration)); + } + + @Deprecated + public static final ParquetMetadata readFooter(Configuration configuration, Path file, Review comment: So, all of these new methods are used inside parquet-mr? If not, then I don't think we need them. If yes, then please, try to refactor the caller part to use the non-deprecated ones instead. If it does not require too much effort. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [parquet-mr] gszadovszky commented on a change in pull request #776: PARQUET-1229: Parquet MR encryption
gszadovszky commented on a change in pull request #776: URL: https://github.com/apache/parquet-mr/pull/776#discussion_r428008353 ## File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java ## @@ -499,55 +528,100 @@ public static final ParquetMetadata readFooter(Configuration configuration, File */ @Deprecated public static final ParquetMetadata readFooter(InputFile file, MetadataFilter filter) throws IOException { +return readFooter(file, filter, null); + } + + @Deprecated + public static final ParquetMetadata readFooter(InputFile file, MetadataFilter filter, Review comment: Why do we introduce new public methods that are deprecated already? ## File path: parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java ## @@ -1465,23 +1674,38 @@ public void writeDataPageV2Header( dataEncoding, rlByteLength, dlByteLength), to); } - + Review comment: Please, check your code to not to introduce any trailing whitespaces. ## File path: parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java ## @@ -442,7 +460,13 @@ static ParquetMetadata readSummaryMetadata(Configuration configuration, Path bas */ @Deprecated public static final ParquetMetadata readFooter(Configuration configuration, Path file) throws IOException { -return readFooter(configuration, file, NO_FILTER); +return readFooter(configuration, file, getDecryptionProperties(file, configuration)); + } + + @Deprecated + public static final ParquetMetadata readFooter(Configuration configuration, Path file, Review comment: Why do we introduce new public methods that are deprecated already? ## File path: parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java ## @@ -1185,70 +1275,189 @@ static long getOffset(ColumnChunk columnChunk) { return offset; } + private static void verifyFooterIntegrity(InputStream from, InternalFileDecryptor fileDecryptor, + int combinedFooterLength) throws IOException { + +byte[] nonce = new byte[AesCipher.NONCE_LENGTH]; +from.read(nonce); +byte[] gcmTag = new byte[AesCipher.GCM_TAG_LENGTH]; +from.read(gcmTag); + +AesGcmEncryptor footerSigner = fileDecryptor.createSignedFooterEncryptor(); + +byte[] footerAndSignature = ((ByteBufferInputStream) from).slice(0).array(); +int footerSignatureLength = AesCipher.NONCE_LENGTH + AesCipher.GCM_TAG_LENGTH; +byte[] serializedFooter = new byte[combinedFooterLength - footerSignatureLength]; +System.arraycopy(footerAndSignature, 0, serializedFooter, 0, serializedFooter.length); + +byte[] signedFooterAAD = AesCipher.createFooterAAD(fileDecryptor.getFileAAD()); +byte[] encryptedFooterBytes = footerSigner.encrypt(false, serializedFooter, nonce, signedFooterAAD); +byte[] calculatedTag = new byte[AesCipher.GCM_TAG_LENGTH]; +System.arraycopy(encryptedFooterBytes, encryptedFooterBytes.length - AesCipher.GCM_TAG_LENGTH, +calculatedTag, 0, AesCipher.GCM_TAG_LENGTH); +if (!Arrays.equals(gcmTag, calculatedTag)) { + throw new TagVerificationException("Signature mismatch in plaintext footer"); +} + } + public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter) throws IOException { +return readParquetMetadata(from, filter, null, false, 0); + } + + public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter, + final InternalFileDecryptor fileDecryptor, final boolean encryptedFooter, + final int combinedFooterLength) throws IOException { + +final BlockCipher.Decryptor footerDecryptor = (encryptedFooter? fileDecryptor.fetchFooterDecryptor() : null); +final byte[] encryptedFooterAAD = (encryptedFooter? AesCipher.createFooterAAD(fileDecryptor.getFileAAD()) : null); + FileMetaData fileMetaData = filter.accept(new MetadataFilterVisitor() { @Override public FileMetaData visit(NoFilter filter) throws IOException { -return readFileMetaData(from); +return readFileMetaData(from, footerDecryptor, encryptedFooterAAD); } @Override public FileMetaData visit(SkipMetadataFilter filter) throws IOException { -return readFileMetaData(from, true); +return readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD); } @Override public FileMetaData visit(OffsetMetadataFilter filter) throws IOException { -return filterFileMetaDataByStart(readFileMetaData(from), filter); +return filterFileMetaDataByStart(readFileMetaData(from, footerDecryptor, encryptedFooterAAD), filter); } @Override public FileMetaData visit(RangeMetadataFilter filter) throws IOException { -return
[GitHub] [parquet-mr] gszadovszky commented on a change in pull request #776: PARQUET-1229: Parquet MR encryption
gszadovszky commented on a change in pull request #776: URL: https://github.com/apache/parquet-mr/pull/776#discussion_r427934346 ## File path: parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCipher.java ## @@ -68,19 +67,32 @@ public static byte[] createModuleAAD(byte[] fileAAD, ModuleType moduleType, short rowGroupOrdinal, short columnOrdinal, short pageOrdinal) { Review comment: Sounds good to me. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [parquet-mr] gszadovszky commented on a change in pull request #776: PARQUET-1229: Parquet MR encryption
gszadovszky commented on a change in pull request #776: URL: https://github.com/apache/parquet-mr/pull/776#discussion_r427925111 ## File path: parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCipher.java ## @@ -68,19 +67,32 @@ public static byte[] createModuleAAD(byte[] fileAAD, ModuleType moduleType, short rowGroupOrdinal, short columnOrdinal, short pageOrdinal) { Review comment: I understand that large numbers of pages/row groups or columns would lead to significant performance drawbacks but it should not limit what the spec allows otherwise. Since it is discussed and approved already, I am fine with using `short` values for these. What I would suggest adding though is to have the conversion from `int` to `short` centralized and and have specific error messages so it is clear that the limit reached is a hard limit for the encryption feature. Also, if we will publish any description/example for the encryption feature these limitations shall be listed there. One more thing: the check of `intValue > Short.MAX_VALUE` is not complete. In case of `intValue` is negative the cast may result in a valid positive `short` value. I would suggest using `(short) intValue != intValue` instead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [parquet-mr] gszadovszky commented on a change in pull request #776: PARQUET-1229: Parquet MR encryption
gszadovszky commented on a change in pull request #776: URL: https://github.com/apache/parquet-mr/pull/776#discussion_r427284527 ## File path: parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java ## @@ -49,6 +49,13 @@ * @return the index of the first row in the page */ public long getFirstRowIndex(int pageIndex); + + /** + * @param pageIndex + * the index of the page + * @return the original ordinal of the page in the column chunk + */ + public short getPageOrdinal(int pageIndex); Review comment: Why do we need it as a `short` instead of keeping it as an `int`? As per the parquet.thrift spec we never say that we cannot have more pages than `32767` even if it is unlikely to have such many. ## File path: parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCipher.java ## @@ -68,19 +67,32 @@ public static byte[] createModuleAAD(byte[] fileAAD, ModuleType moduleType, short rowGroupOrdinal, short columnOrdinal, short pageOrdinal) { Review comment: Theoretically we don't give hard limits for the number of row groups, number of columns or the number of pages in the spec. There is a de facto limit that we use thrift lists where the size is an i32 meaning that we should allow java int values here. Also, there was a post commit discussion in a [related PR](https://github.com/apache/parquet-format/pull/142#issuecomment-600294754). It is unfortunate that that time parquet-format was already released so I don't know if there is a way to properly fix this issue in the format. Anyway, I would not restrict these values to a `short`. ## File path: parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java ## @@ -98,16 +104,21 @@ ((lengthBuffer[0] & 0xff)); if (ciphertextLength < 1) { - throw new IOException("Wrong length of encrypted metadata: " + ciphertextLength); + throw new ParquetCryptoRuntimeException("Wrong length of encrypted metadata: " + ciphertextLength); } byte[] ciphertextBuffer = new byte[ciphertextLength]; gotBytes = 0; // Read the encrypted structure contents while (gotBytes < ciphertextLength) { - int n = from.read(ciphertextBuffer, gotBytes, ciphertextLength - gotBytes); + int n; + try { +n = from.read(ciphertextBuffer, gotBytes, ciphertextLength - gotBytes); + } catch (IOException e) { Review comment: We should let the `IOException` thrown out. ## File path: parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java ## @@ -70,23 +69,30 @@ if (null != AAD) cipher.updateAAD(AAD); cipher.doFinal(ciphertext, inputOffset, inputLength, plainText, outputOffset); -} catch (GeneralSecurityException e) { - throw new IOException("Failed to decrypt", e); +} catch (AEADBadTagException e) { + throw new TagVerificationException("GCM tag check failed", e); +} catch (GeneralSecurityException e) { + throw new ParquetCryptoRuntimeException("Failed to decrypt", e); } return plainText; } @Override - public byte[] decrypt(InputStream from, byte[] AAD) throws IOException { + public byte[] decrypt(InputStream from, byte[] AAD) { byte[] lengthBuffer = new byte[SIZE_LENGTH]; int gotBytes = 0; // Read the length of encrypted Thrift structure while (gotBytes < SIZE_LENGTH) { - int n = from.read(lengthBuffer, gotBytes, SIZE_LENGTH - gotBytes); + int n; + try { +n = from.read(lengthBuffer, gotBytes, SIZE_LENGTH - gotBytes); + } catch (IOException e) { Review comment: We should let the `IOException` thrown out. ## File path: parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java ## @@ -1185,70 +1275,189 @@ static long getOffset(ColumnChunk columnChunk) { return offset; } + private static void verifyFooterIntegrity(InputStream from, InternalFileDecryptor fileDecryptor, + int combinedFooterLength) throws IOException { + +byte[] nonce = new byte[AesCipher.NONCE_LENGTH]; +from.read(nonce); +byte[] gcmTag = new byte[AesCipher.GCM_TAG_LENGTH]; +from.read(gcmTag); + +AesGcmEncryptor footerSigner = fileDecryptor.createSignedFooterEncryptor(); + +byte[] footerAndSignature = ((ByteBufferInputStream) from).slice(0).array(); +int footerSignatureLength = AesCipher.NONCE_LENGTH + AesCipher.GCM_TAG_LENGTH; +byte[] serializedFooter = new byte[combinedFooterLength - footerSignatureLength]; +System.arraycopy(footerAndSignature, 0, serializedFooter, 0, serializedFooter.length); + +byte[] signedFooterAAD = AesCipher.createFooterAAD(fileDecryptor.getFileAAD()); +byte[] encryptedFooterBytes = footerSigner.encrypt(false, serializedFooter, nonce, signedFooterAAD); +byte[]