[GitHub] [parquet-mr] ggershinsky commented on a diff in pull request #1014: PARQUET-2075: Implement unified file rewriter
ggershinsky commented on code in PR #1014: URL: https://github.com/apache/parquet-mr/pull/1014#discussion_r1067893929 ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java: ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.rewrite; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.Preconditions; +import org.apache.parquet.crypto.FileEncryptionProperties; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +import java.util.List; +import java.util.Map; + +// A set of options to create a ParquetRewriter. +public class RewriteOptions { + + final Configuration conf; + final Path inputFile; + final Path outputFile; + final List pruneColumns; + final CompressionCodecName newCodecName; + final Map maskColumns; + final List encryptColumns; + final FileEncryptionProperties fileEncryptionProperties; + + private RewriteOptions(Configuration conf, + Path inputFile, + Path outputFile, + List pruneColumns, + CompressionCodecName newCodecName, + Map maskColumns, + List encryptColumns, + FileEncryptionProperties fileEncryptionProperties) { +this.conf = conf; +this.inputFile = inputFile; +this.outputFile = outputFile; +this.pruneColumns = pruneColumns; +this.newCodecName = newCodecName; +this.maskColumns = maskColumns; +this.encryptColumns = encryptColumns; +this.fileEncryptionProperties = fileEncryptionProperties; + } + + public Configuration getConf() { +return conf; + } + + public Path getInputFile() { +return inputFile; + } + + public Path getOutputFile() { +return outputFile; + } + + public List getPruneColumns() { +return pruneColumns; + } + + public CompressionCodecName getNewCodecName() { +return newCodecName; + } + + public Map getMaskColumns() { +return maskColumns; + } + + public List getEncryptColumns() { +return encryptColumns; + } + + public FileEncryptionProperties getFileEncryptionProperties() { +return fileEncryptionProperties; + } + + // Builder to create a RewriterOptions. + public static class Builder { +private Configuration conf; +private Path inputFile; +private Path outputFile; +private List pruneColumns; +private CompressionCodecName newCodecName; +private Map maskColumns; +private List encryptColumns; +private FileEncryptionProperties fileEncryptionProperties; + +public Builder(Configuration conf, Path inputFile, Path outputFile) { + this.conf = conf; + this.inputFile = inputFile; + this.outputFile = outputFile; +} + +public Builder prune(List columns) { + this.pruneColumns = columns; + return this; +} + +public Builder transform(CompressionCodecName newCodecName) { + this.newCodecName = newCodecName; + return this; +} + +public Builder mask(Map maskColumns) { + this.maskColumns = maskColumns; + return this; +} + +public Builder encrypt(List encryptColumns) { + this.encryptColumns = encryptColumns; + return this; +} + +public Builder encryptionProperties(FileEncryptionProperties fileEncryptionProperties) { + this.fileEncryptionProperties = fileEncryptionProperties; + return this; +} + +public RewriteOptions build() { + Preconditions.checkArgument(inputFile != null, "Input file is required"); + Preconditions.checkArgument(outputFile != null, "Output file is required"); + + if (pruneColumns != null) { +if (maskColumns != null) { + for (String pruneColumn : pruneColumns) { +Preconditions.checkArgument(!maskColumns.containsKey(pruneColumn), +"Cannot prune and mask same column"); + } +} + +if (encryptColumns != null) { + for (String pruneColumn : pruneColumns) { +Preconditions.checkArgument(!encryptColumns.contains(pruneColumn), +
[GitHub] [parquet-mr] ggershinsky commented on a diff in pull request #1014: PARQUET-2075: Implement unified file rewriter
ggershinsky commented on code in PR #1014: URL: https://github.com/apache/parquet-mr/pull/1014#discussion_r1064348254 ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RewriteOptions.java: ## @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.rewrite; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.Preconditions; +import org.apache.parquet.crypto.FileEncryptionProperties; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +import java.util.List; +import java.util.Map; + +// A set of options to create a ParquetRewriter. +public class RewriteOptions { + + final Configuration conf; + final Path inputFile; + final Path outputFile; + final List pruneColumns; + final CompressionCodecName newCodecName; + final Map maskColumns; + final List encryptColumns; + final FileEncryptionProperties fileEncryptionProperties; + + private RewriteOptions(Configuration conf, + Path inputFile, + Path outputFile, + List pruneColumns, + CompressionCodecName newCodecName, + Map maskColumns, + List encryptColumns, + FileEncryptionProperties fileEncryptionProperties) { +this.conf = conf; +this.inputFile = inputFile; +this.outputFile = outputFile; +this.pruneColumns = pruneColumns; +this.newCodecName = newCodecName; +this.maskColumns = maskColumns; +this.encryptColumns = encryptColumns; +this.fileEncryptionProperties = fileEncryptionProperties; + } + + public Configuration getConf() { +return conf; + } + + public Path getInputFile() { +return inputFile; + } + + public Path getOutputFile() { +return outputFile; + } + + public List getPruneColumns() { +return pruneColumns; + } + + public CompressionCodecName getNewCodecName() { +return newCodecName; + } + + public Map getMaskColumns() { +return maskColumns; + } + + public List getEncryptColumns() { +return encryptColumns; + } + + public FileEncryptionProperties getFileEncryptionProperties() { +return fileEncryptionProperties; + } + + // Builder to create a RewriterOptions. + public static class Builder { +private Configuration conf; +private Path inputFile; +private Path outputFile; +private List pruneColumns; +private CompressionCodecName newCodecName; +private Map maskColumns; +private List encryptColumns; +private FileEncryptionProperties fileEncryptionProperties; + +public Builder(Configuration conf, Path inputFile, Path outputFile) { + this.conf = conf; + this.inputFile = inputFile; + this.outputFile = outputFile; +} + +public Builder prune(List columns) { + this.pruneColumns = columns; + return this; +} + +public Builder transform(CompressionCodecName newCodecName) { + this.newCodecName = newCodecName; + return this; +} + +public Builder mask(Map maskColumns) { + this.maskColumns = maskColumns; + return this; +} + +public Builder encrypt(List encryptColumns) { + this.encryptColumns = encryptColumns; + return this; +} + +public Builder encryptionProperties(FileEncryptionProperties fileEncryptionProperties) { + this.fileEncryptionProperties = fileEncryptionProperties; + return this; +} + +public RewriteOptions build() { + Preconditions.checkArgument(inputFile != null, "Input file is required"); + Preconditions.checkArgument(outputFile != null, "Output file is required"); + + if (pruneColumns != null) { +if (maskColumns != null) { + for (String pruneColumn : pruneColumns) { +Preconditions.checkArgument(!maskColumns.containsKey(pruneColumn), +"Cannot prune and mask same column"); + } +} + +if (encryptColumns != null) { + for (String pruneColumn : pruneColumns) { +Preconditions.checkArgument(!encryptColumns.contains(pruneColumn), +
[GitHub] [parquet-mr] ggershinsky commented on a diff in pull request #1014: PARQUET-2075: Implement unified file rewriter
ggershinsky commented on code in PR #1014: URL: https://github.com/apache/parquet-mr/pull/1014#discussion_r105961 ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java: ## @@ -0,0 +1,733 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.rewrite; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.ColumnReader; +import org.apache.parquet.column.ColumnWriteStore; +import org.apache.parquet.column.ColumnWriter; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.column.impl.ColumnReadStoreImpl; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.crypto.AesCipher; +import org.apache.parquet.crypto.InternalColumnEncryptionSetup; +import org.apache.parquet.crypto.InternalFileEncryptor; +import org.apache.parquet.format.BlockCipher; +import org.apache.parquet.format.DataPageHeader; +import org.apache.parquet.format.DataPageHeaderV2; +import org.apache.parquet.format.DictionaryPageHeader; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.ColumnChunkPageWriteStore; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopCodecs; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.hadoop.util.HadoopOutputFile; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.io.ParquetEncodingException; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; +import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH; +import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH; +import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType; +import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE; +import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT; + +public class ParquetRewriter implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(ParquetRewriter.class); + private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2; + private final byte[] pageBuffer = new byte[pageBufferSize]; + private TransParquetFileReader reader; + private ParquetFileWriter writer; + private ParquetMetadata meta; + private MessageType schema; + private String createdBy; + private CompressionCodecName codecName =
[GitHub] [parquet-mr] ggershinsky commented on a diff in pull request #1014: PARQUET-2075: Implement unified file rewriter
ggershinsky commented on code in PR #1014: URL: https://github.com/apache/parquet-mr/pull/1014#discussion_r105961 ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java: ## @@ -0,0 +1,733 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.rewrite; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.ColumnReader; +import org.apache.parquet.column.ColumnWriteStore; +import org.apache.parquet.column.ColumnWriter; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.column.impl.ColumnReadStoreImpl; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.crypto.AesCipher; +import org.apache.parquet.crypto.InternalColumnEncryptionSetup; +import org.apache.parquet.crypto.InternalFileEncryptor; +import org.apache.parquet.format.BlockCipher; +import org.apache.parquet.format.DataPageHeader; +import org.apache.parquet.format.DataPageHeaderV2; +import org.apache.parquet.format.DictionaryPageHeader; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.CodecFactory; +import org.apache.parquet.hadoop.ColumnChunkPageWriteStore; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetFileWriter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.hadoop.util.CompressionConverter.TransParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopCodecs; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.hadoop.util.HadoopOutputFile; +import org.apache.parquet.internal.column.columnindex.ColumnIndex; +import org.apache.parquet.internal.column.columnindex.OffsetIndex; +import org.apache.parquet.io.ParquetEncodingException; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER; +import static org.apache.parquet.column.ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH; +import static org.apache.parquet.column.ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH; +import static org.apache.parquet.crypto.ModuleCipherFactory.ModuleType; +import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE; +import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT; + +public class ParquetRewriter implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(ParquetRewriter.class); + private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2; + private final byte[] pageBuffer = new byte[pageBufferSize]; + private TransParquetFileReader reader; + private ParquetFileWriter writer; + private ParquetMetadata meta; + private MessageType schema; + private String createdBy; + private CompressionCodecName codecName =
[GitHub] [parquet-mr] ggershinsky commented on a diff in pull request #1014: PARQUET-2075: Implement unified file rewriter
ggershinsky commented on code in PR #1014: URL: https://github.com/apache/parquet-mr/pull/1014#discussion_r1057472437 ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/MaskMode.java: ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.rewrite; + +public enum MaskMode { + NULLIFY("nullify"), + HASH("hash"), + REDACT("redact"); + + private String mode; + + MaskMode(String text) { +this.mode = text; + } + + public String getMode() { +return this.mode; + } + + public static MaskMode fromString(String mode) { Review Comment: I think you can use the `MaskMode.valueOf` method ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/MaskMode.java: ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.rewrite; + +public enum MaskMode { + NULLIFY("nullify"), + HASH("hash"), + REDACT("redact"); + + private String mode; + + MaskMode(String text) { +this.mode = text; Review Comment: string is not checked. Also, you can use the `MaskMode.valueOf` method ## parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java: ## @@ -0,0 +1,733 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop.rewrite; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.bytes.BytesInput; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.ColumnReader; +import org.apache.parquet.column.ColumnWriteStore; +import org.apache.parquet.column.ColumnWriter; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.column.impl.ColumnReadStoreImpl; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.column.values.bloomfilter.BloomFilter; +import org.apache.parquet.compression.CompressionCodecFactory; +import org.apache.parquet.crypto.AesCipher; +import org.apache.parquet.crypto.InternalColumnEncryptionSetup; +import org.apache.parquet.crypto.InternalFileEncryptor; +import org.apache.parquet.format.BlockCipher; +import org.apache.parquet.format.DataPageHeader; +import org.apache.parquet.format.DataPageHeaderV2; +import org.apache.parquet.format.DictionaryPageHeader; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import