wgtmac commented on code in PR #1015:
URL: https://github.com/apache/parquet-mr/pull/1015#discussion_r1062474157


##########
parquet-hadoop/src/main/java/org/apache/parquet/crypto/ColumnDecryptionProperties.java:
##########
@@ -1,104 +1,109 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.parquet.crypto;
-
-import org.apache.parquet.hadoop.metadata.ColumnPath;
-
-/**
- * This class is only required for setting explicit column decryption keys -
- * to override key retriever (or to provide keys when key metadata and/or
- * key retriever are not available)
- */
-public class ColumnDecryptionProperties {
-
-  private final ColumnPath columnPath;
-  private final byte[] keyBytes;
-
-  private ColumnDecryptionProperties(ColumnPath columnPath, byte[] keyBytes) {
-    if (null == columnPath) {
-      throw new IllegalArgumentException("Null column path");
-    }
-    if (null == keyBytes) {
-      throw new IllegalArgumentException("Null key for column " + columnPath);
-    }
-    if (!(keyBytes.length == 16 || keyBytes.length == 24 || keyBytes.length == 
32)) {
-      throw new IllegalArgumentException("Wrong key length: " + 
keyBytes.length + 
-          " on column: " + columnPath);
-    }
-
-    this.columnPath = columnPath;
-    this.keyBytes = keyBytes;
-  }
-
-  /**
-   * Convenience builder for regular (not nested) columns.
-   * 
-   * @param name Flat column name
-   * @return Builder
-   */
-  public static Builder builder(String name) {
-    return builder(ColumnPath.get(name));
-  }
-
-  public static Builder builder(ColumnPath path) {
-    return new Builder(path);
-  }
-
-  public static class Builder {
-    private final ColumnPath columnPath;
-    private byte[] keyBytes;
-
-    private Builder(ColumnPath path) {
-      this.columnPath = path;
-    }
-
-    /**
-     * Set an explicit column key. 
-     * If applied on a file that contains key metadata for this column - 
-     * the metadata will be ignored, the column will be decrypted with this 
key.
-     * However, if the column was encrypted with the footer key, it will also 
be decrypted with the
-     * footer key, and the column key passed in this method will be ignored.
-     * 
-     * @param columnKey Key length must be either 16, 24 or 32 bytes.
-     * @return Builder
-     */
-    public Builder withKey(byte[] columnKey) {
-      if (null != this.keyBytes) {
-        throw new IllegalStateException("Key already set on column: " + 
columnPath);
-      }
-      this.keyBytes = new byte[columnKey.length];
-      System.arraycopy(columnKey, 0, this.keyBytes, 0, columnKey.length);
-
-      return this;
-    }
-
-    public ColumnDecryptionProperties build() {
-      return new ColumnDecryptionProperties(columnPath, keyBytes);
-    }
-  }
-
-  public ColumnPath getPath() {
-    return columnPath;
-  }
-
-  public byte[] getKeyBytes() {
-    return keyBytes;
-  }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto;
+
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+/**
+ * This class is only required for setting explicit column decryption keys -
+ * to override key retriever (or to provide keys when key metadata and/or
+ * key retriever are not available)
+ */
+public class ColumnDecryptionProperties {
+
+  private final ColumnPath columnPath;
+  private final byte[] keyBytes;
+
+  private ColumnDecryptionProperties(ColumnPath columnPath, byte[] keyBytes) {
+    if (null == columnPath) {
+      throw new IllegalArgumentException("Null column path");
+    }
+    if (null == keyBytes) {
+      throw new IllegalArgumentException("Null key for column " + columnPath);
+    }
+    if (!(keyBytes.length == 16 || keyBytes.length == 24 || keyBytes.length == 
32)) {
+      throw new IllegalArgumentException("Wrong key length: " + 
keyBytes.length + 
+          " on column: " + columnPath);
+    }
+
+    this.columnPath = columnPath;
+    this.keyBytes = keyBytes;
+  }
+
+  /**
+   * Convenience builder for regular (not nested) columns.
+   * 
+   * @param name Flat column name
+   * @return Builder
+   */
+  public static Builder builder(String name) {
+    return builder(ColumnPath.get(name));
+  }
+
+  public static Builder builder(ColumnPath path) {
+    return new Builder(path);
+  }
+
+  public static class Builder {
+    private final ColumnPath columnPath;
+    private byte[] keyBytes;
+
+    private Builder(ColumnPath path) {
+      this.columnPath = path;
+    }
+
+    /**
+     * Set an explicit column key. 
+     * If applied on a file that contains key metadata for this column - 
+     * the metadata will be ignored, the column will be decrypted with this 
key.
+     * However, if the column was encrypted with the footer key, it will also 
be decrypted with the
+     * footer key, and the column key passed in this method will be ignored.
+     * 
+     * @param columnKey Key length must be either 16, 24 or 32 bytes.
+     * @return Builder
+     */
+    public Builder withKey(byte[] columnKey) {
+      if (null != this.keyBytes) {
+        throw new IllegalStateException("Key already set on column: " + 
columnPath);
+      }
+      this.keyBytes = new byte[columnKey.length];
+      System.arraycopy(columnKey, 0, this.keyBytes, 0, columnKey.length);
+
+      return this;
+    }
+
+    public ColumnDecryptionProperties build() {
+      return new ColumnDecryptionProperties(columnPath, keyBytes);
+    }
+  }
+
+  public ColumnPath getPath() {
+    return columnPath;
+  }
+
+  public byte[] getKeyBytes() {
+    return keyBytes;
+  }
+
+  ColumnDecryptionProperties deepClone() {

Review Comment:
   It seems that the current file has only added a new method, but the 
formatter accidentally formatted the whole file. Can you please revert that?



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java:
##########
@@ -136,12 +142,88 @@ public byte[] getDictPageAAD() {
     }
   }
 
+  private static class DecryptorRunTime {
+    private final BlockCipher.Decryptor headerBlockDecryptor;
+    private final BlockCipher.Decryptor dataDecryptor;
+    private final byte[] fileAAD;
+    private final Short columnOrdinal;
+
+    public DecryptorRunTime(InternalFileDecryptor fileDecryptor, 
ColumnChunkMetaData chunk) throws IOException  {
+      if (fileDecryptor == null) {

Review Comment:
   Should we throw if fileDecryptor is null? When will this branch be hit? 



##########
parquet-hadoop/src/main/java/org/apache/parquet/crypto/FileDecryptionProperties.java:
##########
@@ -1,254 +1,280 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.parquet.crypto;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.parquet.hadoop.metadata.ColumnPath;
-
-public class FileDecryptionProperties {
-
-  private static final boolean CHECK_SIGNATURE = true;
-  private static final boolean ALLOW_PLAINTEXT_FILES = false;
-
-  private final byte[] footerKey;
-  private final DecryptionKeyRetriever keyRetriever;
-  private final byte[] aadPrefix;
-  private final AADPrefixVerifier aadPrefixVerifier;
-  private final Map<ColumnPath, ColumnDecryptionProperties> columnPropertyMap;
-  private final boolean checkPlaintextFooterIntegrity;
-  private final boolean allowPlaintextFiles;
-
-  private FileDecryptionProperties(byte[] footerKey, DecryptionKeyRetriever 
keyRetriever,
-      boolean checkPlaintextFooterIntegrity,  byte[] aadPrefix, 
AADPrefixVerifier aadPrefixVerifier,
-      Map<ColumnPath, ColumnDecryptionProperties> columnPropertyMap, boolean 
allowPlaintextFiles) {
-
-    if ((null == footerKey) && (null == keyRetriever) && (null == 
columnPropertyMap)) {
-      throw new IllegalArgumentException("No decryption properties are 
specified");
-    }
-    if ((null != footerKey) && 
-        !(footerKey.length == 16 || footerKey.length == 24 || footerKey.length 
== 32)) {
-      throw new IllegalArgumentException("Wrong footer key length " + 
footerKey.length);
-    }
-    if ((null == footerKey) && checkPlaintextFooterIntegrity && (null == 
keyRetriever)) {
-      throw new IllegalArgumentException("Can't check footer integrity with 
null footer key and null key retriever");
-    }
-
-    this.footerKey = footerKey;
-    this.checkPlaintextFooterIntegrity = checkPlaintextFooterIntegrity;
-    this.keyRetriever = keyRetriever;
-    this.aadPrefix = aadPrefix;
-    this.columnPropertyMap = columnPropertyMap;
-    this.aadPrefixVerifier = aadPrefixVerifier;
-    this.allowPlaintextFiles = allowPlaintextFiles;
-  }
-
-  public static Builder builder() {
-    return new Builder();
-  }
-
-  public static class Builder {
-    private byte[] footerKeyBytes;
-    private DecryptionKeyRetriever keyRetriever;
-    private byte[] aadPrefixBytes;
-    private AADPrefixVerifier aadPrefixVerifier;
-    private Map<ColumnPath, ColumnDecryptionProperties> columnPropertyMap;
-    private boolean checkPlaintextFooterIntegrity;
-    private boolean plaintextFilesAllowed;
-
-    private Builder() {
-      this.checkPlaintextFooterIntegrity = CHECK_SIGNATURE;
-      this.plaintextFilesAllowed = ALLOW_PLAINTEXT_FILES;
-    }
-
-    /**
-     * Set an explicit footer key. If applied on a file that contains footer 
key metadata - 
-     * the metadata will be ignored, the footer will be decrypted/verified 
with this key.
-     * If explicit key is not set, footer key will be fetched from key 
retriever.
-     * 
-     * @param footerKey Key length must be either 16, 24 or 32 bytes.
-     * @return Builder 
-     */
-    public Builder withFooterKey(byte[] footerKey) {
-      if (null == footerKey) {
-        return this;
-      }
-      if (null != this.footerKeyBytes) {
-        throw new IllegalStateException("Footer key already set");
-      }
-      this.footerKeyBytes = new byte[footerKey.length];
-      System.arraycopy(footerKey, 0, this.footerKeyBytes, 0, footerKey.length);
-
-      return this;
-    }
-
-    /**
-     * Set explicit column keys (decryption properties).
-     * Its also possible to set a key retriever on this file decryption 
properties object. 
-     * Upon reading, availability of explicit keys is checked before 
invocation of the retriever callback.
-     * If an explicit key is available for a footer or a column, its key 
metadata will be ignored.
-     * 
-     * @param columnProperties Explicit column decryption keys
-     * @return Builder
-     */
-    public Builder withColumnKeys(Map<ColumnPath, ColumnDecryptionProperties> 
columnProperties) {
-      if (null == columnProperties) {
-        return this;
-      }
-      if (null != this.columnPropertyMap) {
-        throw new IllegalStateException("Column properties already set");
-      }
-      // Copy the map to make column properties immutable
-      this.columnPropertyMap = new HashMap<ColumnPath, 
ColumnDecryptionProperties>(columnProperties);
-
-      return this;
-    }
-
-    /**
-     * Set a key retriever callback. It is also possible to
-     * set explicit footer or column keys on this file property object. Upon 
file decryption, 
-     * availability of explicit keys is checked before invocation of the 
retriever callback.
-     * If an explicit key is available for a footer or a column, its key 
metadata will
-     * be ignored. 
-     * 
-     * @param keyRetriever Key retriever object
-     * @return Builder
-     */
-    public Builder withKeyRetriever(DecryptionKeyRetriever keyRetriever) {
-      if (null == keyRetriever) {
-        return this;
-      }
-      if (null != this.keyRetriever) {
-        throw new IllegalStateException("Key retriever already set");
-      }
-      this.keyRetriever = keyRetriever;
-
-      return this;
-    }
-
-    /**
-     * Skip integrity verification of plaintext footers.
-     * If not called, integrity of plaintext footers will be checked in 
runtime, and an exception will 
-     * be thrown in the following situations:
-     * - footer signing key is not available (not passed, or not found by key 
retriever)
-     * - footer content doesn't match the signature
-     * 
-     * @return Builder
-     */
-    public Builder withoutFooterSignatureVerification() {
-      this.checkPlaintextFooterIntegrity = false;
-      return this;
-    }
-
-    /**
-     * Explicitly supply the file AAD prefix.
-     * A must when a prefix is used for file encryption, but not stored in 
file.
-     * If AAD prefix is stored in file, it will be compared to the explicitly 
supplied value 
-     * and an exception will be thrown if they differ.
-     * 
-     * @param aadPrefixBytes AAD Prefix
-     * @return Builder
-     */
-    public Builder withAADPrefix(byte[] aadPrefixBytes) {
-      if (null == aadPrefixBytes) {
-        return this;
-      }
-      if (null != this.aadPrefixBytes) {
-        throw new IllegalStateException("AAD Prefix already set");
-      }
-      this.aadPrefixBytes = aadPrefixBytes;
-
-      return this;
-    }
-
-    /**
-     * Set callback for verification of AAD Prefixes stored in file.
-     * 
-     * @param aadPrefixVerifier AAD prefix verification object
-     * @return Builder
-     */
-    public Builder withAADPrefixVerifier(AADPrefixVerifier aadPrefixVerifier) {
-      if (null == aadPrefixVerifier) {
-        return this;
-      }
-      if (null != this.aadPrefixVerifier) {
-        throw new IllegalStateException("AAD Prefix verifier already set");
-      }
-      this.aadPrefixVerifier = aadPrefixVerifier;
-
-      return this;
-    }
-
-    /**
-     * By default, reading plaintext (unencrypted) files is not allowed when 
using a decryptor 
-     * - in order to detect files that were not encrypted by mistake. 
-     * However, the default behavior can be overriden by calling this method.
-     * The caller should use then a different method to ensure encryption of 
files with sensitive data.
-     * 
-     * @return Builder
-     */
-    public Builder withPlaintextFilesAllowed() {
-      this.plaintextFilesAllowed  = true;
-      return this;
-    }
-
-    public FileDecryptionProperties build() {
-      return new FileDecryptionProperties(footerKeyBytes, keyRetriever, 
checkPlaintextFooterIntegrity, 
-          aadPrefixBytes, aadPrefixVerifier, columnPropertyMap, 
plaintextFilesAllowed);
-    }
-  }
-
-  public byte[] getFooterKey() {
-    return footerKey;
-  }
-
-  public byte[] getColumnKey(ColumnPath path) {
-    if (null == columnPropertyMap) {
-      return null;
-    }
-    ColumnDecryptionProperties columnDecryptionProperties = 
columnPropertyMap.get(path);
-    if (null == columnDecryptionProperties) {
-      return null;
-    }
-
-    return columnDecryptionProperties.getKeyBytes();
-  }
-
-  public DecryptionKeyRetriever getKeyRetriever() {
-    return keyRetriever;
-  }
-
-  public byte[] getAADPrefix() {
-    return aadPrefix;
-  }
-
-  public boolean checkFooterIntegrity() {
-    return checkPlaintextFooterIntegrity;
-  }
-
-  boolean plaintextFilesAllowed() {
-    return allowPlaintextFiles;
-  }
-
-  AADPrefixVerifier getAADPrefixVerifier() {
-    return aadPrefixVerifier;
-  }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.crypto;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+public class FileDecryptionProperties {
+
+  private static final boolean CHECK_SIGNATURE = true;
+  private static final boolean ALLOW_PLAINTEXT_FILES = false;
+
+  private final byte[] footerKey;
+  private final DecryptionKeyRetriever keyRetriever;
+  private final byte[] aadPrefix;
+  private final AADPrefixVerifier aadPrefixVerifier;
+  private final Map<ColumnPath, ColumnDecryptionProperties> columnPropertyMap;
+  private final boolean checkPlaintextFooterIntegrity;
+  private final boolean allowPlaintextFiles;
+
+  private FileDecryptionProperties(byte[] footerKey, DecryptionKeyRetriever 
keyRetriever,
+      boolean checkPlaintextFooterIntegrity,  byte[] aadPrefix, 
AADPrefixVerifier aadPrefixVerifier,
+      Map<ColumnPath, ColumnDecryptionProperties> columnPropertyMap, boolean 
allowPlaintextFiles) {
+
+    if ((null == footerKey) && (null == keyRetriever) && (null == 
columnPropertyMap)) {
+      throw new IllegalArgumentException("No decryption properties are 
specified");
+    }
+    if ((null != footerKey) && 
+        !(footerKey.length == 16 || footerKey.length == 24 || footerKey.length 
== 32)) {
+      throw new IllegalArgumentException("Wrong footer key length " + 
footerKey.length);
+    }
+    if ((null == footerKey) && checkPlaintextFooterIntegrity && (null == 
keyRetriever)) {
+      throw new IllegalArgumentException("Can't check footer integrity with 
null footer key and null key retriever");
+    }
+
+    this.footerKey = footerKey;
+    this.checkPlaintextFooterIntegrity = checkPlaintextFooterIntegrity;
+    this.keyRetriever = keyRetriever;
+    this.aadPrefix = aadPrefix;
+    this.columnPropertyMap = columnPropertyMap;
+    this.aadPrefixVerifier = aadPrefixVerifier;
+    this.allowPlaintextFiles = allowPlaintextFiles;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private byte[] footerKeyBytes;
+    private DecryptionKeyRetriever keyRetriever;
+    private byte[] aadPrefixBytes;
+    private AADPrefixVerifier aadPrefixVerifier;
+    private Map<ColumnPath, ColumnDecryptionProperties> columnPropertyMap;
+    private boolean checkPlaintextFooterIntegrity;
+    private boolean plaintextFilesAllowed;
+
+    private Builder() {
+      this.checkPlaintextFooterIntegrity = CHECK_SIGNATURE;
+      this.plaintextFilesAllowed = ALLOW_PLAINTEXT_FILES;
+    }
+
+    /**
+     * Set an explicit footer key. If applied on a file that contains footer 
key metadata - 
+     * the metadata will be ignored, the footer will be decrypted/verified 
with this key.
+     * If explicit key is not set, footer key will be fetched from key 
retriever.
+     * 
+     * @param footerKey Key length must be either 16, 24 or 32 bytes.
+     * @return Builder 
+     */
+    public Builder withFooterKey(byte[] footerKey) {
+      if (null == footerKey) {
+        return this;
+      }
+      if (null != this.footerKeyBytes) {
+        throw new IllegalStateException("Footer key already set");
+      }
+      this.footerKeyBytes = new byte[footerKey.length];
+      System.arraycopy(footerKey, 0, this.footerKeyBytes, 0, footerKey.length);
+
+      return this;
+    }
+
+    /**
+     * Set explicit column keys (decryption properties).
+     * Its also possible to set a key retriever on this file decryption 
properties object. 
+     * Upon reading, availability of explicit keys is checked before 
invocation of the retriever callback.
+     * If an explicit key is available for a footer or a column, its key 
metadata will be ignored.
+     * 
+     * @param columnProperties Explicit column decryption keys
+     * @return Builder
+     */
+    public Builder withColumnKeys(Map<ColumnPath, ColumnDecryptionProperties> 
columnProperties) {
+      if (null == columnProperties) {
+        return this;
+      }
+      if (null != this.columnPropertyMap) {
+        throw new IllegalStateException("Column properties already set");
+      }
+      // Copy the map to make column properties immutable
+      this.columnPropertyMap = new HashMap<ColumnPath, 
ColumnDecryptionProperties>(columnProperties);
+
+      return this;
+    }
+
+    /**
+     * Set a key retriever callback. It is also possible to
+     * set explicit footer or column keys on this file property object. Upon 
file decryption, 
+     * availability of explicit keys is checked before invocation of the 
retriever callback.
+     * If an explicit key is available for a footer or a column, its key 
metadata will
+     * be ignored. 
+     * 
+     * @param keyRetriever Key retriever object
+     * @return Builder
+     */
+    public Builder withKeyRetriever(DecryptionKeyRetriever keyRetriever) {
+      if (null == keyRetriever) {
+        return this;
+      }
+      if (null != this.keyRetriever) {
+        throw new IllegalStateException("Key retriever already set");
+      }
+      this.keyRetriever = keyRetriever;
+
+      return this;
+    }
+
+    /**
+     * Skip integrity verification of plaintext footers.
+     * If not called, integrity of plaintext footers will be checked in 
runtime, and an exception will 
+     * be thrown in the following situations:
+     * - footer signing key is not available (not passed, or not found by key 
retriever)
+     * - footer content doesn't match the signature
+     * 
+     * @return Builder
+     */
+    public Builder withoutFooterSignatureVerification() {
+      this.checkPlaintextFooterIntegrity = false;
+      return this;
+    }
+
+    /**
+     * Explicitly supply the file AAD prefix.
+     * A must when a prefix is used for file encryption, but not stored in 
file.
+     * If AAD prefix is stored in file, it will be compared to the explicitly 
supplied value 
+     * and an exception will be thrown if they differ.
+     * 
+     * @param aadPrefixBytes AAD Prefix
+     * @return Builder
+     */
+    public Builder withAADPrefix(byte[] aadPrefixBytes) {
+      if (null == aadPrefixBytes) {
+        return this;
+      }
+      if (null != this.aadPrefixBytes) {
+        throw new IllegalStateException("AAD Prefix already set");
+      }
+      this.aadPrefixBytes = aadPrefixBytes;
+
+      return this;
+    }
+
+    /**
+     * Set callback for verification of AAD Prefixes stored in file.
+     * 
+     * @param aadPrefixVerifier AAD prefix verification object
+     * @return Builder
+     */
+    public Builder withAADPrefixVerifier(AADPrefixVerifier aadPrefixVerifier) {
+      if (null == aadPrefixVerifier) {
+        return this;
+      }
+      if (null != this.aadPrefixVerifier) {
+        throw new IllegalStateException("AAD Prefix verifier already set");
+      }
+      this.aadPrefixVerifier = aadPrefixVerifier;
+
+      return this;
+    }
+
+    /**
+     * By default, reading plaintext (unencrypted) files is not allowed when 
using a decryptor 
+     * - in order to detect files that were not encrypted by mistake. 
+     * However, the default behavior can be overriden by calling this method.
+     * The caller should use then a different method to ensure encryption of 
files with sensitive data.
+     * 
+     * @return Builder
+     */
+    public Builder withPlaintextFilesAllowed() {
+      this.plaintextFilesAllowed  = true;
+      return this;
+    }
+
+    public FileDecryptionProperties build() {
+      return new FileDecryptionProperties(footerKeyBytes, keyRetriever, 
checkPlaintextFooterIntegrity, 
+          aadPrefixBytes, aadPrefixVerifier, columnPropertyMap, 
plaintextFilesAllowed);
+    }
+  }
+
+  public byte[] getFooterKey() {
+    return footerKey;
+  }
+
+  public byte[] getColumnKey(ColumnPath path) {
+    if (null == columnPropertyMap) {
+      return null;
+    }
+    ColumnDecryptionProperties columnDecryptionProperties = 
columnPropertyMap.get(path);
+    if (null == columnDecryptionProperties) {
+      return null;
+    }
+
+    return columnDecryptionProperties.getKeyBytes();
+  }
+
+  public DecryptionKeyRetriever getKeyRetriever() {
+    return keyRetriever;
+  }
+
+  public byte[] getAADPrefix() {
+    return aadPrefix;
+  }
+
+  public boolean checkFooterIntegrity() {
+    return checkPlaintextFooterIntegrity;
+  }
+
+  boolean plaintextFilesAllowed() {
+    return allowPlaintextFiles;
+  }
+
+  AADPrefixVerifier getAADPrefixVerifier() {
+    return aadPrefixVerifier;
+  }
+
+
+  /** DecryptionProperties object can be used for reading one file only.

Review Comment:
   Same for this file.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java:
##########
@@ -52,8 +52,6 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.stream.Collectors;

Review Comment:
   Why are these imports removed?



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java:
##########
@@ -151,26 +233,34 @@ public ColumnEncryptor(Configuration conf) {
    * @param fileEncryptionProperties FileEncryptionProperties of the file
    * @throws IOException
    */
-  public void encryptColumns(String inputFile, String outputFile, List<String> 
paths, FileEncryptionProperties fileEncryptionProperties) throws IOException {
+  public void encryptColumns(String inputFile, String outputFile, List<String> 
paths, FileEncryptionProperties fileEncryptionProperties,
+                             FileDecryptionProperties 
fileDecryptionProperties) throws IOException {
     Path inPath = new Path(inputFile);
     Path outPath = new Path(outputFile);
 
-    ParquetMetadata metaData = ParquetFileReader.readFooter(conf, inPath, 
NO_FILTER);
+    ParquetMetadata metaData = readFooter(inputFile, NO_FILTER, 
fileDecryptionProperties, conf);
     MessageType schema = metaData.getFileMetaData().getSchema();
 
+    Set<ColumnPath> alreadyEncrColumnPaths = encryptedColumnPaths(inputFile, 
fileDecryptionProperties.getFooterKey(), conf);
+
+    FileDecryptionProperties fdp = fileDecryptionProperties.deepClone(null);
+    InternalFileDecryptor internalFileDecryptor = 
getFileDecryptorOrNull(inPath, fdp, conf);
+
     ParquetFileWriter writer = new 
ParquetFileWriter(HadoopOutputFile.fromPath(outPath, conf), schema, 
ParquetFileWriter.Mode.OVERWRITE,
       DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT, 
DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH, DEFAULT_STATISTICS_TRUNCATE_LENGTH,
       ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED, 
fileEncryptionProperties);
     writer.start();
 
-    try (TransParquetFileReader reader = new 
TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf), 
HadoopReadOptions.builder(conf).build())) {
-      processBlocks(reader, writer, metaData, schema, paths);
+    try (TransParquetFileReader reader = new 
TransParquetFileReader(HadoopInputFile.fromPath(inPath, conf)
+      , HadoopReadOptions.builder(conf).withDecryption(fdp).build())) {
+      processBlocks(reader, writer, metaData, schema, paths, 
alreadyEncrColumnPaths, internalFileDecryptor);
     }
     writer.end(metaData.getFileMetaData().getKeyValueMetaData());
   }
 
   private void processBlocks(TransParquetFileReader reader, ParquetFileWriter 
writer, ParquetMetadata meta,
-                            MessageType schema, List<String> encryptPaths) 
throws IOException {
+                             MessageType schema, List<String> encryptPaths, 
Set<ColumnPath> alreadyEncrColumnPaths,

Review Comment:
   ```suggestion
                                MessageType schema, List<String> encryptPaths, 
Set<ColumnPath> encryptedPaths,
   ```



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/ColumnEncryptor.java:
##########
@@ -136,12 +142,88 @@ public byte[] getDictPageAAD() {
     }
   }
 
+  private static class DecryptorRunTime {
+    private final BlockCipher.Decryptor headerBlockDecryptor;
+    private final BlockCipher.Decryptor dataDecryptor;
+    private final byte[] fileAAD;
+    private final Short columnOrdinal;
+
+    public DecryptorRunTime(InternalFileDecryptor fileDecryptor, 
ColumnChunkMetaData chunk) throws IOException  {
+      if (fileDecryptor == null) {
+        this.headerBlockDecryptor = null;
+        this.dataDecryptor = null;
+        this.fileAAD = null;
+        this.columnOrdinal = null;
+      } else {
+        InternalColumnDecryptionSetup columnDecryptionSetup = 
fileDecryptor.getColumnSetup(chunk.getPath());
+        this.headerBlockDecryptor = 
columnDecryptionSetup.getMetaDataDecryptor();
+        this.dataDecryptor = columnDecryptionSetup.getDataDecryptor();
+        this.fileAAD = fileDecryptor.getFileAAD();
+        this.columnOrdinal = columnDecryptionSetup.getOrdinal();
+      }
+    }
+
+    public byte[] getFileAAD() {
+      return this.fileAAD;
+    }
+
+    public short getColumnOrdinal() {
+      return this.columnOrdinal.shortValue();
+    }
+
+    public BlockCipher.Decryptor getHeaderBlockDecryptor() {
+      return this.headerBlockDecryptor;
+    }
+
+    public BlockCipher.Decryptor getDataDecryptor() {
+      return this.dataDecryptor;
+    }
+  }
+
   private Configuration conf;
 
   public ColumnEncryptor(Configuration conf) {
     this.conf = conf;
   }
 
+  public static ParquetMetadata readFooter(String inputFile, 
ParquetMetadataConverter.MetadataFilter filter,

Review Comment:
   `filter` is not used here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@parquet.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to