[GitHub] [parquet-mr] gszadovszky commented on a change in pull request #776: PARQUET-1229: Parquet MR encryption

2020-05-21 Thread GitBox


gszadovszky commented on a change in pull request #776:
URL: https://github.com/apache/parquet-mr/pull/776#discussion_r428673721



##
File path: parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCipher.java
##
@@ -90,6 +102,10 @@
 
   // Update last two bytes with new page ordinal (instead of creating new page 
AAD from scratch)
   public static void quickUpdatePageAAD(byte[] pageAAD, short newPageOrdinal) {
+if (newPageOrdinal < 0) {

Review comment:
   I agree on validating the arguments is important. But the proper 
exception to be thrown for a `null` is a `NullPointerException` that would be 
thrown at line 134 anyway. If you really want to validate the argument for 
`null` at the first line of the method I would suggest using 
`java.util.Objects.requireNonNull(Object)`.

##
File path: parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCipher.java
##
@@ -78,11 +78,22 @@
 if (rowGroupOrdinal < 0) {
   throw new IllegalArgumentException("Wrong row group ordinal: " + 
rowGroupOrdinal);
 }
-byte[] rowGroupOrdinalBytes = shortToBytesLE(rowGroupOrdinal);
+short shortRGOrdinal = (short) rowGroupOrdinal;
+if (shortRGOrdinal != rowGroupOrdinal) {
+  throw new ParquetCryptoRuntimeException("Encrypted parquet files can't 
have "
+  + "more than Short.MAX_VALUE row groups: " + rowGroupOrdinal);

Review comment:
   I think, writing the actual value instead of referencing a java constant 
is more informative.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [parquet-mr] gszadovszky commented on a change in pull request #776: PARQUET-1229: Parquet MR encryption

2020-05-21 Thread GitBox


gszadovszky commented on a change in pull request #776:
URL: https://github.com/apache/parquet-mr/pull/776#discussion_r428562074



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
##
@@ -1131,7 +1329,19 @@ public OffsetIndex readOffsetIndex(ColumnChunkMetaData 
column) throws IOExceptio
   return null;
 }
 f.seek(ref.getOffset());
-return 
ParquetMetadataConverter.fromParquetOffsetIndex(Util.readOffsetIndex(f));
+
+column.decryptIfNeededed();

Review comment:
   Same as above.

##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
##
@@ -1035,20 +1154,89 @@ private static void serializeBloomFilters(
 
 long offset = out.getPos();
 column.setBloomFilterOffset(offset);
-
Util.writeBloomFilterHeader(ParquetMetadataConverter.toBloomFilterHeader(bloomFilter),
 out);
-bloomFilter.writeTo(out);
+
+BlockCipher.Encryptor bloomFilterEncryptor = null;
+byte[] bloomFilterHeaderAAD = null;
+byte[] bloomFilterBitsetAAD = null;
+if (null != fileEncryptor) {
+  InternalColumnEncryptionSetup columnEncryptionSetup = 
fileEncryptor.getColumnSetup(column.getPath(), false, (short) cIndex);
+  if (columnEncryptionSetup.isEncrypted()) {
+bloomFilterEncryptor = 
columnEncryptionSetup.getMetaDataEncryptor();
+short columnOrdinal = columnEncryptionSetup.getOrdinal();
+bloomFilterHeaderAAD = 
AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), 
ModuleType.BloomFilterHeader, 
+block.getOrdinal(), columnOrdinal, (short)-1);
+bloomFilterBitsetAAD = 
AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), 
ModuleType.BloomFilterBitset, 
+block.getOrdinal(), columnOrdinal, (short)-1);
+  }
+}
+
+
Util.writeBloomFilterHeader(ParquetMetadataConverter.toBloomFilterHeader(bloomFilter),
 out, 
+bloomFilterEncryptor, bloomFilterHeaderAAD);
+
+ByteArrayOutputStream tempOutStream = new ByteArrayOutputStream();
+bloomFilter.writeTo(tempOutStream);
+byte[] serializedBitset = tempOutStream.toByteArray();
+if (null != bloomFilterEncryptor) {
+  serializedBitset = bloomFilterEncryptor.encrypt(serializedBitset, 
bloomFilterBitsetAAD);
+}
+out.write(serializedBitset);
   }
 }
   }
-
-  private static void serializeFooter(ParquetMetadata footer, 
PositionOutputStream out) throws IOException {
-long footerIndex = out.getPos();
+  
+  private static void serializeFooter(ParquetMetadata footer, 
PositionOutputStream out,
+  InternalFileEncryptor fileEncryptor) throws IOException {
+
 ParquetMetadataConverter metadataConverter = new 
ParquetMetadataConverter();
-org.apache.parquet.format.FileMetaData parquetMetadata = 
metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
-writeFileMetaData(parquetMetadata, out);
-LOG.debug("{}: footer length = {}" , out.getPos(), (out.getPos() - 
footerIndex));
-BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex));
-out.write(MAGIC);
+
+// Unencrypted file
+if (null == fileEncryptor) {
+  long footerIndex = out.getPos();
+  org.apache.parquet.format.FileMetaData parquetMetadata = 
metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
+  writeFileMetaData(parquetMetadata, out);
+  LOG.debug("{}: footer length = {}" , out.getPos(), (out.getPos() - 
footerIndex));
+  BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex));
+  out.write(MAGIC);
+  return;
+}
+
+org.apache.parquet.format.FileMetaData parquetMetadata =
+metadataConverter.toParquetMetadata(CURRENT_VERSION, footer, 
fileEncryptor);
+
+// Encrypted file with plaintext footer 
+if (!fileEncryptor.isFooterEncrypted()) {

Review comment:
   I don't know why we think that the footer length is an important 
information but this is the only case where we do not log it. We might want to 
add it here as well.

##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
##
@@ -1071,12 +1229,31 @@ public BloomFilterReader 
getBloomFilterDataReader(BlockMetaData block) {
*/
   public BloomFilter readBloomFilter(ColumnChunkMetaData meta) throws 
IOException {
 long bloomFilterOffset = meta.getBloomFilterOffset();
+
+if (0 == bloomFilterOffset) { // TODO Junjie - is there a better way to 
handle this?

Review comment:
   @chenjunjiedada, could you please check this?

##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
##
@@ -1114,7 +1299,20 @@ public ColumnIndex readColumnIndex(ColumnChunkMetaData 
column) throws IOExceptio
   return null;
 }
 

[GitHub] [parquet-mr] gszadovszky commented on a change in pull request #776: PARQUET-1229: Parquet MR encryption

2020-05-21 Thread GitBox


gszadovszky commented on a change in pull request #776:
URL: https://github.com/apache/parquet-mr/pull/776#discussion_r428537220



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
##
@@ -705,22 +797,39 @@ public ParquetFileReader(Configuration conf, Path file, 
ParquetMetadata footer)
   paths.put(ColumnPath.get(col.getPath()), col);
 }
 this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
+this.fileDecryptor = fileMetaData.getFileDecryptor();
   }
 
   public ParquetFileReader(InputFile file, ParquetReadOptions options) throws 
IOException {
+this(file, options, null);
+  }
+
+  public ParquetFileReader(InputFile file, ParquetReadOptions options, 
+  FileDecryptionProperties fileDecryptionProperties) throws IOException {
 this.converter = new ParquetMetadataConverter(options);
 this.file = file;
 this.f = file.newStream();
 this.options = options;
+if ((null == fileDecryptionProperties) && (file instanceof 
HadoopInputFile)) {
+  HadoopInputFile hadoopFile = (HadoopInputFile) file;
+  fileDecryptionProperties = getDecryptionProperties(hadoopFile.getPath(), 
hadoopFile.getConfiguration());

Review comment:
   `ParquetReadOptions` was created to carry all the required properties 
for reading a parquet file. If the `Path` is necessary for the decryption then 
we might add it to the options as well. If we decide to not to add it still, I 
would use the options object to carry any other decryption properties.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [parquet-mr] gszadovszky commented on a change in pull request #776: PARQUET-1229: Parquet MR encryption

2020-05-21 Thread GitBox


gszadovszky commented on a change in pull request #776:
URL: https://github.com/apache/parquet-mr/pull/776#discussion_r428535767



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
##
@@ -442,7 +460,13 @@ static ParquetMetadata readSummaryMetadata(Configuration 
configuration, Path bas
*/
   @Deprecated
   public static final ParquetMetadata readFooter(Configuration configuration, 
Path file) throws IOException {
-return readFooter(configuration, file, NO_FILTER);
+return readFooter(configuration, file, getDecryptionProperties(file, 
configuration));
+  }
+
+  @Deprecated
+  public static final ParquetMetadata readFooter(Configuration configuration, 
Path file, 

Review comment:
   So, all of these new methods are used inside parquet-mr? If not, then I 
don't think we need them. If yes, then please, try to refactor the caller part 
to use the non-deprecated ones instead. If it does not require too much effort.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [parquet-mr] gszadovszky commented on a change in pull request #776: PARQUET-1229: Parquet MR encryption

2020-05-20 Thread GitBox


gszadovszky commented on a change in pull request #776:
URL: https://github.com/apache/parquet-mr/pull/776#discussion_r428008353



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
##
@@ -499,55 +528,100 @@ public static final ParquetMetadata 
readFooter(Configuration configuration, File
*/
   @Deprecated
   public static final ParquetMetadata readFooter(InputFile file, 
MetadataFilter filter) throws IOException {
+return readFooter(file, filter, null);
+  }
+
+  @Deprecated
+  public static final ParquetMetadata readFooter(InputFile file, 
MetadataFilter filter, 

Review comment:
   Why do we introduce new public methods that are deprecated already?

##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
##
@@ -1465,23 +1674,38 @@ public void writeDataPageV2Header(
 dataEncoding,
 rlByteLength, dlByteLength), to);
   }
-
+  

Review comment:
   Please, check your code to not to introduce any trailing whitespaces.

##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
##
@@ -442,7 +460,13 @@ static ParquetMetadata readSummaryMetadata(Configuration 
configuration, Path bas
*/
   @Deprecated
   public static final ParquetMetadata readFooter(Configuration configuration, 
Path file) throws IOException {
-return readFooter(configuration, file, NO_FILTER);
+return readFooter(configuration, file, getDecryptionProperties(file, 
configuration));
+  }
+
+  @Deprecated
+  public static final ParquetMetadata readFooter(Configuration configuration, 
Path file, 

Review comment:
   Why do we introduce new public methods that are deprecated already?

##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
##
@@ -1185,70 +1275,189 @@ static long getOffset(ColumnChunk columnChunk) {
 return offset;
   }
 
+  private static void verifyFooterIntegrity(InputStream from, 
InternalFileDecryptor fileDecryptor, 
+  int combinedFooterLength) throws IOException {
+
+byte[] nonce = new byte[AesCipher.NONCE_LENGTH];
+from.read(nonce);
+byte[] gcmTag = new byte[AesCipher.GCM_TAG_LENGTH];
+from.read(gcmTag);
+
+AesGcmEncryptor footerSigner =  
fileDecryptor.createSignedFooterEncryptor();
+
+byte[] footerAndSignature = ((ByteBufferInputStream) 
from).slice(0).array();
+int footerSignatureLength = AesCipher.NONCE_LENGTH + 
AesCipher.GCM_TAG_LENGTH;
+byte[] serializedFooter = new byte[combinedFooterLength - 
footerSignatureLength];
+System.arraycopy(footerAndSignature, 0, serializedFooter, 0, 
serializedFooter.length);
+
+byte[] signedFooterAAD = 
AesCipher.createFooterAAD(fileDecryptor.getFileAAD());
+byte[] encryptedFooterBytes = footerSigner.encrypt(false, 
serializedFooter, nonce, signedFooterAAD);
+byte[] calculatedTag = new byte[AesCipher.GCM_TAG_LENGTH];
+System.arraycopy(encryptedFooterBytes, encryptedFooterBytes.length - 
AesCipher.GCM_TAG_LENGTH, 
+calculatedTag, 0, AesCipher.GCM_TAG_LENGTH);
+if (!Arrays.equals(gcmTag, calculatedTag)) {
+  throw new TagVerificationException("Signature mismatch in plaintext 
footer");
+}
+  }
+
   public ParquetMetadata readParquetMetadata(final InputStream from, 
MetadataFilter filter) throws IOException {
+return readParquetMetadata(from, filter, null, false, 0);
+  }
+
+  public ParquetMetadata readParquetMetadata(final InputStream from, 
MetadataFilter filter,
+  final InternalFileDecryptor fileDecryptor, final boolean 
encryptedFooter, 
+  final int combinedFooterLength) throws IOException {
+
+final BlockCipher.Decryptor footerDecryptor = (encryptedFooter? 
fileDecryptor.fetchFooterDecryptor() : null);
+final byte[] encryptedFooterAAD = (encryptedFooter? 
AesCipher.createFooterAAD(fileDecryptor.getFileAAD()) : null);
+
 FileMetaData fileMetaData = filter.accept(new 
MetadataFilterVisitor() {
   @Override
   public FileMetaData visit(NoFilter filter) throws IOException {
-return readFileMetaData(from);
+return readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
   }
 
   @Override
   public FileMetaData visit(SkipMetadataFilter filter) throws IOException {
-return readFileMetaData(from, true);
+return readFileMetaData(from, true, footerDecryptor, 
encryptedFooterAAD);
   }
 
   @Override
   public FileMetaData visit(OffsetMetadataFilter filter) throws 
IOException {
-return filterFileMetaDataByStart(readFileMetaData(from), filter);
+return filterFileMetaDataByStart(readFileMetaData(from, 
footerDecryptor, encryptedFooterAAD), filter);
   }
 
   @Override
   public FileMetaData visit(RangeMetadataFilter filter) throws IOException 
{
-return 

[GitHub] [parquet-mr] gszadovszky commented on a change in pull request #776: PARQUET-1229: Parquet MR encryption

2020-05-20 Thread GitBox


gszadovszky commented on a change in pull request #776:
URL: https://github.com/apache/parquet-mr/pull/776#discussion_r427934346



##
File path: parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCipher.java
##
@@ -68,19 +67,32 @@
 
   public static byte[] createModuleAAD(byte[] fileAAD, ModuleType moduleType, 
   short rowGroupOrdinal, short columnOrdinal, short pageOrdinal) {

Review comment:
   Sounds good to me.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [parquet-mr] gszadovszky commented on a change in pull request #776: PARQUET-1229: Parquet MR encryption

2020-05-20 Thread GitBox


gszadovszky commented on a change in pull request #776:
URL: https://github.com/apache/parquet-mr/pull/776#discussion_r427925111



##
File path: parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCipher.java
##
@@ -68,19 +67,32 @@
 
   public static byte[] createModuleAAD(byte[] fileAAD, ModuleType moduleType, 
   short rowGroupOrdinal, short columnOrdinal, short pageOrdinal) {

Review comment:
   I understand that large numbers of pages/row groups or columns would 
lead to significant performance drawbacks but it should not limit what the spec 
allows otherwise.
   
   Since it is discussed and approved already, I am fine with using `short` 
values for these. What I would suggest adding though is to have the conversion 
from `int` to `short` centralized and and have specific error messages so it is 
clear that the limit reached is a hard limit for the encryption feature. Also, 
if we will publish any description/example for the encryption feature these 
limitations shall be listed there.
   
   One more thing: the check of `intValue > Short.MAX_VALUE` is not complete. 
In case of `intValue` is negative the cast may result in a valid positive 
`short` value. I would suggest using `(short) intValue != intValue` instead.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [parquet-mr] gszadovszky commented on a change in pull request #776: PARQUET-1229: Parquet MR encryption

2020-05-19 Thread GitBox


gszadovszky commented on a change in pull request #776:
URL: https://github.com/apache/parquet-mr/pull/776#discussion_r427284527



##
File path: 
parquet-column/src/main/java/org/apache/parquet/internal/column/columnindex/OffsetIndex.java
##
@@ -49,6 +49,13 @@
* @return the index of the first row in the page
*/
   public long getFirstRowIndex(int pageIndex);
+  
+  /**
+   * @param pageIndex
+   * the index of the page
+   * @return the original ordinal of the page in the column chunk
+   */
+  public short getPageOrdinal(int pageIndex);

Review comment:
   Why do we need it as a `short` instead of keeping it as an `int`? As per 
the parquet.thrift spec we never say that we cannot have more pages than 
`32767` even if it is unlikely to have such many.

##
File path: parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesCipher.java
##
@@ -68,19 +67,32 @@
 
   public static byte[] createModuleAAD(byte[] fileAAD, ModuleType moduleType, 
   short rowGroupOrdinal, short columnOrdinal, short pageOrdinal) {

Review comment:
   Theoretically we don't give hard limits for the number of row groups, 
number of columns or the number of pages in the spec. There is a de facto limit 
that we use thrift lists where the size is an i32 meaning that we should allow 
java int values here.
   Also, there was a post commit discussion in a [related 
PR](https://github.com/apache/parquet-format/pull/142#issuecomment-600294754). 
It is unfortunate that that time parquet-format was already released so I don't 
know if there is a way to properly fix this issue in the format. Anyway, I 
would not restrict these values to a `short`.

##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java
##
@@ -98,16 +104,21 @@
 ((lengthBuffer[0] & 0xff));
 
 if (ciphertextLength < 1) {
-  throw new IOException("Wrong length of encrypted metadata: " + 
ciphertextLength);
+  throw new ParquetCryptoRuntimeException("Wrong length of encrypted 
metadata: " + ciphertextLength);
 }
 
 byte[] ciphertextBuffer = new byte[ciphertextLength];
 gotBytes = 0;
 // Read the encrypted structure contents
 while (gotBytes < ciphertextLength) {
-  int n = from.read(ciphertextBuffer, gotBytes, ciphertextLength - 
gotBytes);
+  int n;
+  try {
+n = from.read(ciphertextBuffer, gotBytes, ciphertextLength - gotBytes);
+  } catch (IOException e) {

Review comment:
   We should let the `IOException` thrown out.

##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/crypto/AesGcmDecryptor.java
##
@@ -70,23 +69,30 @@
   if (null != AAD) cipher.updateAAD(AAD);
 
   cipher.doFinal(ciphertext, inputOffset, inputLength, plainText, 
outputOffset);
-}  catch (GeneralSecurityException e) {
-  throw new IOException("Failed to decrypt", e);
+}  catch (AEADBadTagException e) {
+  throw new TagVerificationException("GCM tag check failed", e);
+} catch (GeneralSecurityException e) {
+  throw new ParquetCryptoRuntimeException("Failed to decrypt", e);
 }
 
 return plainText;
   }
 
   @Override
-  public byte[] decrypt(InputStream from, byte[] AAD) throws IOException {
+  public byte[] decrypt(InputStream from, byte[] AAD) {
 byte[] lengthBuffer = new byte[SIZE_LENGTH];
 int gotBytes = 0;
 
 // Read the length of encrypted Thrift structure
 while (gotBytes < SIZE_LENGTH) {
-  int n = from.read(lengthBuffer, gotBytes, SIZE_LENGTH - gotBytes);
+  int n;
+  try {
+n = from.read(lengthBuffer, gotBytes, SIZE_LENGTH - gotBytes);
+  } catch (IOException e) {

Review comment:
   We should let the `IOException` thrown out.

##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
##
@@ -1185,70 +1275,189 @@ static long getOffset(ColumnChunk columnChunk) {
 return offset;
   }
 
+  private static void verifyFooterIntegrity(InputStream from, 
InternalFileDecryptor fileDecryptor, 
+  int combinedFooterLength) throws IOException {
+
+byte[] nonce = new byte[AesCipher.NONCE_LENGTH];
+from.read(nonce);
+byte[] gcmTag = new byte[AesCipher.GCM_TAG_LENGTH];
+from.read(gcmTag);
+
+AesGcmEncryptor footerSigner =  
fileDecryptor.createSignedFooterEncryptor();
+
+byte[] footerAndSignature = ((ByteBufferInputStream) 
from).slice(0).array();
+int footerSignatureLength = AesCipher.NONCE_LENGTH + 
AesCipher.GCM_TAG_LENGTH;
+byte[] serializedFooter = new byte[combinedFooterLength - 
footerSignatureLength];
+System.arraycopy(footerAndSignature, 0, serializedFooter, 0, 
serializedFooter.length);
+
+byte[] signedFooterAAD = 
AesCipher.createFooterAAD(fileDecryptor.getFileAAD());
+byte[] encryptedFooterBytes = footerSigner.encrypt(false, 
serializedFooter, nonce, signedFooterAAD);
+byte[]