[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2023-09-29 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17770343#comment-17770343
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

ConeyLiu commented on PR #1121:
URL: https://github.com/apache/parquet-mr/pull/1121#issuecomment-1740550838

   OK, I will deep into it.




> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2023-09-28 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17770122#comment-17770122
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

MaheshGPai commented on PR #1121:
URL: https://github.com/apache/parquet-mr/pull/1121#issuecomment-1739538710

   @ConeyLiu Please feel free to continue. I'll not be able to look at this for 
another week or so.




> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2023-09-28 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17770044#comment-17770044
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

ConeyLiu commented on PR #1121:
URL: https://github.com/apache/parquet-mr/pull/1121#issuecomment-1739107132

   Hi @MaheshGPai, thanks for the contribution. If you don't have time to work 
on this, I can continue with it.




> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2023-09-23 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17768240#comment-17768240
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

MaheshGPai commented on PR #1121:
URL: https://github.com/apache/parquet-mr/pull/1121#issuecomment-1732238952

   > This is a great initiative. Do you still have plan to address the feedback 
@MaheshGPai ?
   
   @shangxinli I do plan to work on it. But I have not had time to get to this.




> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2023-09-21 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17767786#comment-17767786
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

shangxinli commented on PR #1121:
URL: https://github.com/apache/parquet-mr/pull/1121#issuecomment-1730734276

   This is a great initiative. Do you still have plan to address the feedback 
@MaheshGPai ? 




> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2023-07-22 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17745963#comment-17745963
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

wgtmac commented on code in PR #1121:
URL: https://github.com/apache/parquet-mr/pull/1121#discussion_r1271309683


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RowGroupMerger.java:
##
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Map.Entry;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+class RowGroupMerger {
+
+  private final MessageType schema;
+  private final CompressionCodecFactory.BytesInputCompressor compressor;
+  private final ParquetProperties parquetProperties;
+
+  public RowGroupMerger(MessageType schema, CompressionCodecName compression, 
boolean useV2ValueWriter) {
+this(schema, new Configuration(), compression, useV2ValueWriter);
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, boolean useV2ValueWriter) {
+this(schema, conf, compression, createParquetProperties(useV2ValueWriter));
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, ParquetProperties parquetProperties) {
+this.schema = schema;
+this.parquetProperties = parquetProperties;
+this.compressor = new CodecFactory(conf, 
this.parquetProperties.getPageSizeThreshold()).getCompressor(compression);
+  }
+
+  /**
+   * Merges the row groups making sure that new row groups do not exceed the 
supplied maxRowGroupSize
+   *
+   * @param inputFiles  input files to merge
+   * @param maxRowGroupSize the max limit for new blocks
+   * @param writer  writer to write the new blocks to
+   * @throws IOException if an IO error occurs
+   */
+  public void merge(List inputFiles, final long 
maxRowGroupSize,
+ParquetFileWriter writer) throws IOException {
+
+SizeEstimator estimator = new SizeEstimator(compressor.getCodecName() != 
CompressionCodecName.UNCOMPRESSED);
+MutableMergedBlock mergedBlock = null;
+for (ParquetFileReader reader : inputFiles) {
+   

[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2023-07-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744984#comment-17744984
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

advancedxy commented on code in PR #1121:
URL: https://github.com/apache/parquet-mr/pull/1121#discussion_r1269155063


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##
@@ -751,6 +764,27 @@ public GroupConverter asGroupConverter() {
 }
   }
 
+  private void mergeRowGroups() throws IOException {
+if (null == reader) {
+  return;
+}
+
+boolean v2EncodingHint = meta.getBlocks().stream()
+  .flatMap(b -> b.getColumns().stream())
+  .anyMatch(chunk -> {
+EncodingStats stats = chunk.getEncodingStats();
+return stats != null && stats.usesV2Pages();
+  });
+
+List readers = new ArrayList<>();
+do {
+  readers.add(reader);
+  initNextReader();

Review Comment:
   Looks like `v2EncodingHint` only checks the first parquet file..
   
   Should all the files to be checked?



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/RowGroupMerger.java:
##
@@ -0,0 +1,657 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop.rewrite;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Collections;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Map.Entry;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.CodecFactory;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+class RowGroupMerger {
+
+  private final MessageType schema;
+  private final CompressionCodecFactory.BytesInputCompressor compressor;
+  private final ParquetProperties parquetProperties;
+
+  public RowGroupMerger(MessageType schema, CompressionCodecName compression, 
boolean useV2ValueWriter) {
+this(schema, new Configuration(), compression, useV2ValueWriter);
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, boolean useV2ValueWriter) {
+this(schema, conf, compression, createParquetProperties(useV2ValueWriter));
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, ParquetProperties parquetProperties) {
+this.schema = schema;
+this.parquetProperties = parquetProperties;
+this.compressor = new 

[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2023-07-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744597#comment-17744597
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

MaheshGPai commented on code in PR #1121:
URL: https://github.com/apache/parquet-mr/pull/1121#discussion_r1267997802


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##
@@ -751,6 +767,27 @@ public GroupConverter asGroupConverter() {
 }
   }
 
+  private void mergeRowGroups() throws IOException {
+if (null == reader) {
+  return;
+}
+
+boolean v2EncodingHint = meta.getBlocks().stream()
+  .flatMap(b -> b.getColumns().stream())
+  .anyMatch(chunk -> {
+EncodingStats stats = chunk.getEncodingStats();
+return stats != null && stats.usesV2Pages();
+  });
+
+List readers = new ArrayList<>();
+do {
+  readers.add(reader);
+  initNextReader();
+}
+while(reader != null);
+new RowGroupMerger(schema, newCodecName, v2EncodingHint).merge(readers, 
maxRowGroupSize, writer);

Review Comment:
   Yes. Underneath, it uses the same instance of ParquetFileWriter which 
handles these operations.





> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2023-07-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744596#comment-17744596
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

MaheshGPai commented on code in PR #1121:
URL: https://github.com/apache/parquet-mr/pull/1121#discussion_r1267996199


##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java:
##
@@ -0,0 +1,652 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Map.Entry;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.CompressionConverter;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+public class RowGroupMerger {

Review Comment:
   Done





> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was 

[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2023-07-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744594#comment-17744594
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

MaheshGPai commented on code in PR #1121:
URL: https://github.com/apache/parquet-mr/pull/1121#discussion_r1267994336


##
parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java:
##
@@ -72,6 +72,18 @@ public class RewriteCommand extends BaseCommand {
   required = false)
   String codec;
 
+  @Parameter(
+names = {"-m", "--merge-rowgroups"},
+description = "",

Review Comment:
   Done



##
parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java:
##
@@ -72,6 +72,18 @@ public class RewriteCommand extends BaseCommand {
   required = false)
   String codec;
 
+  @Parameter(
+names = {"-m", "--merge-rowgroups"},
+description = "",
+required = false)
+  boolean mergeRowGroups;
+
+  @Parameter(
+names = {"-s", "--max-rowgroup-size"},
+description = "",

Review Comment:
   Done





> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2023-07-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17744595#comment-17744595
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

MaheshGPai commented on code in PR #1121:
URL: https://github.com/apache/parquet-mr/pull/1121#discussion_r1267995725


##
parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java:
##
@@ -108,6 +120,14 @@ private RewriteOptions buildOptionsOrFail() {
   builder.transform(codecName);
 }
 
+if (mergeRowGroups) {
+  Preconditions.checkArgument(maxRowGroupSize > 0,
+"If merge rowgroup is enabled, max rowgroups size should be 
specified");
+  Preconditions.checkArgument(null != codec,
+"If merge rowgroup is enabled, new compression codec needs to be 
specified");
+  builder.enableRowGroupMerge();
+  builder.maxRowGroupSize(maxRowGroupSize);

Review Comment:
   I have made changes as per the comment. I'm fine either way. 



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java:
##
@@ -0,0 +1,652 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.*;

Review Comment:
   Corrected





> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2023-07-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1777#comment-1777
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

wgtmac commented on code in PR #1121:
URL: https://github.com/apache/parquet-mr/pull/1121#discussion_r1267547118


##
parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java:
##
@@ -72,6 +72,18 @@ public class RewriteCommand extends BaseCommand {
   required = false)
   String codec;
 
+  @Parameter(
+names = {"-m", "--merge-rowgroups"},
+description = "",
+required = false)
+  boolean mergeRowGroups;
+
+  @Parameter(
+names = {"-s", "--max-rowgroup-size"},
+description = "",

Review Comment:
   It would be good to say it is used together with `--merge-rowgroups=true` in 
the description.



##
parquet-cli/src/main/java/org/apache/parquet/cli/commands/RewriteCommand.java:
##
@@ -72,6 +72,18 @@ public class RewriteCommand extends BaseCommand {
   required = false)
   String codec;
 
+  @Parameter(
+names = {"-m", "--merge-rowgroups"},
+description = "",

Review Comment:
   Could you please add a brief description?



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/rewrite/ParquetRewriter.java:
##
@@ -751,6 +767,27 @@ public GroupConverter asGroupConverter() {
 }
   }
 
+  private void mergeRowGroups() throws IOException {
+if (null == reader) {
+  return;
+}
+
+boolean v2EncodingHint = meta.getBlocks().stream()
+  .flatMap(b -> b.getColumns().stream())
+  .anyMatch(chunk -> {
+EncodingStats stats = chunk.getEncodingStats();
+return stats != null && stats.usesV2Pages();
+  });
+
+List readers = new ArrayList<>();
+do {
+  readers.add(reader);
+  initNextReader();
+}
+while(reader != null);
+new RowGroupMerger(schema, newCodecName, v2EncodingHint).merge(readers, 
maxRowGroupSize, writer);

Review Comment:
   I didn't review it in depth. Does it handle encryption or masking properties 
internally? 



##
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java:
##
@@ -0,0 +1,652 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Map.Entry;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.CompressionConverter;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import 

[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2023-07-15 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17743379#comment-17743379
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

MaheshGPai commented on PR #1121:
URL: https://github.com/apache/parquet-mr/pull/1121#issuecomment-1636751367

   Taking forward a PR that had remained inactive. Original PR - 
https://github.com/apache/parquet-mr/pull/775




> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2023-07-15 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17743378#comment-17743378
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

MaheshGPai commented on PR #775:
URL: https://github.com/apache/parquet-mr/pull/775#issuecomment-1636751253

   @shangxinli @brimzi Created https://github.com/apache/parquet-mr/pull/1121




> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2023-06-28 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738050#comment-17738050
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

MaheshGPai commented on PR #775:
URL: https://github.com/apache/parquet-mr/pull/775#issuecomment-1611208102

   The tools code has moved over the time. But the fundamental change is still 
valid. Thanks for that. I'll float a new PR with the change. Hope thats fine.




> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2023-06-28 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738027#comment-17738027
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

brimzi commented on PR #775:
URL: https://github.com/apache/parquet-mr/pull/775#issuecomment-1611085716

   @MaheshGPai, this PR stalled. You can pick it up if you like, but I am not 
sure if this is valid anymore




> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2023-06-28 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17737990#comment-17737990
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

MaheshGPai commented on PR #775:
URL: https://github.com/apache/parquet-mr/pull/775#issuecomment-1610887006

   @shangxinli, @brimzi  Is this PR still active? I can pick this and take this 
forward if required.
   We are looking for the capability to merge rowgroups as well.




> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2020-04-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17092880#comment-17092880
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

shangxinli commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r415418003



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
##
@@ -919,6 +895,59 @@ public void appendRowGroup(SeekableInputStream from, 
BlockMetaData rowGroup,
 endBlock();
   }
 
+  /**
+   * Merges adjacent row groups in the supplied files while maintaining that 
the new groups is no more than the specified
+   * maxRowGroupSize
+   * @param inputFiles input files to merge
+   * @param maxRowGroupSize the maximum size in bytes the new created groups 
can be
+   * @param useV2Writer whether to use a V2 encoding based writer when 
rewriting dictionary encoded pages
+   * @param compression compression to use when writing
+   * @throws IOException
+   */
+  public void mergeRowGroups(List inputFiles, long maxRowGroupSize, 
boolean useV2Writer, CompressionCodecName compression) throws IOException {

Review comment:
   I prefer not to unless you strongly think we should. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2020-04-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088145#comment-17088145
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

brimzi commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411741293



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
##
@@ -919,6 +895,59 @@ public void appendRowGroup(SeekableInputStream from, 
BlockMetaData rowGroup,
 endBlock();
   }
 
+  /**
+   * Merges adjacent row groups in the supplied files while maintaining that 
the new groups is no more than the specified
+   * maxRowGroupSize
+   * @param inputFiles input files to merge
+   * @param maxRowGroupSize the maximum size in bytes the new created groups 
can be
+   * @param useV2Writer whether to use a V2 encoding based writer when 
rewriting dictionary encoded pages
+   * @param compression compression to use when writing
+   * @throws IOException
+   */
+  public void mergeRowGroups(List inputFiles, long maxRowGroupSize, 
boolean useV2Writer, CompressionCodecName compression) throws IOException {

Review comment:
   That is a good question. My thinking is that this method is similar(an 
improvement) to _appendRowGroup_ which is a public API. In most use cases the 
user of _appendRowGroup_ actually wants the functionality provided by this 
method.  But it boils down to if _appendRowGroup_ really belonged here in the 
first place. I have no strong position on this, do you think I should not 
expose this new method?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2020-04-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088105#comment-17088105
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

brimzi commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411711834



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java
##
@@ -0,0 +1,634 @@
+/*

Review comment:
   Same issue here as above, Functional Interface forces us to do so

##
File path: 
parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
##
@@ -63,28 +104,70 @@ public MergeCommand() {
   @Override
   public String getCommandDescription() {
 return "Merges multiple Parquet files into one. " +
-  "The command doesn't merge row groups, just places one after the other. 
" +
+  "Without -b option the command doesn't merge row groups, just places one 
after the other. " +
   "When used to merge many small files, the resulting file will still 
contain small row groups, " +
-  "which usually leads to bad query performance.";
+  "which usually leads to bad query performance. " +
+  "To have adjacent blocks(row groups) merged together use -b option. " +
+  "Blocks will be grouped into larger one until the upper bound is 
reached. " +
+  "Default block upper bound 128 MB and default compression SNAPPY can be 
customized using -l and -c options";
   }
 
   @Override
   public void execute(CommandLine options) throws Exception {
+super.execute(options);
+
+boolean mergeBlocks = options.hasOption('b');
+
 // Prepare arguments
 List args = options.getArgList();
-List inputFiles = getInputFiles(args.subList(0, args.size() - 1));
+List files = getInputFiles(args.subList(0, args.size() - 1));
 Path outputFile = new Path(args.get(args.size() - 1));
-
 // Merge schema and extraMeta
-FileMetaData mergedMeta = mergedMetadata(inputFiles);
-PrintWriter out = new PrintWriter(Main.out, true);
-
-// Merge data
+ParquetMetadata parquetMetadata = mergedMetadata(files);
 ParquetFileWriter writer = new ParquetFileWriter(conf,
-mergedMeta.getSchema(), outputFile, ParquetFileWriter.Mode.CREATE);
+  parquetMetadata.getFileMetaData().getSchema(), outputFile, 
ParquetFileWriter.Mode.CREATE);
+PrintWriter stdOut = new PrintWriter(Main.out, true);
+
+if (mergeBlocks) {
+  long maxRowGroupSize = options.hasOption('l')? 
Long.parseLong(options.getOptionValue('l')) * 1024 * 1024 : DEFAULT_BLOCK_SIZE;
+  CompressionCodecName compression = options.hasOption('c') ?
+CompressionCodecName.valueOf(options.getOptionValue('c')) : 
CompressionCodecName.SNAPPY;
+
+  stdOut.println("Merging files and row-groups using " + 
compression.name() + " for compression and " + maxRowGroupSize
++ " bytes as the upper bound for new row groups . ");
+  mergeRowGroups(files, parquetMetadata, writer, maxRowGroupSize, 
compression);
+} else {
+  appendRowGroups(files, parquetMetadata.getFileMetaData(), writer, 
stdOut);
+}
+  }
+
+  private void mergeRowGroups(List files, ParquetMetadata 
parquetMetadata, ParquetFileWriter writer,
+  long maxRowGroupSize, CompressionCodecName 
compression) throws IOException {
+
+boolean v2EncodingHint = parquetMetadata.getBlocks().stream()
+  .flatMap(b -> b.getColumns().stream())
+  .anyMatch(chunk -> {
+EncodingStats stats = chunk.getEncodingStats();
+return stats != null && stats.usesV2Pages();
+  });
+
+List inputFiles = files.stream().map(f -> {
+  try {
+return HadoopInputFile.fromPath(f, conf);
+  } catch (IOException e) {
+throw new UncheckedIOException(e);

Review comment:
   Same issue here as above, Functional Interface forces us to do so





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into 

[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2020-04-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088102#comment-17088102
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

brimzi commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411711834



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java
##
@@ -0,0 +1,634 @@
+/*

Review comment:
   Same issue here as above, Functional Interface forces us to do so





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2020-04-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088099#comment-17088099
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

brimzi commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411708296



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java
##
@@ -0,0 +1,634 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+import static org.apache.parquet.hadoop.ParquetFileWriter.getColumnsInOrder;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+class RowGroupMerger {
+
+  private final MessageType schema;
+  private final CodecFactory.BytesInputCompressor compressor;
+  private final ParquetProperties parquetProperties;
+
+  RowGroupMerger(MessageType schema, CompressionCodecName compression, boolean 
useV2ValueWriter) {
+this(schema, new Configuration(), compression, useV2ValueWriter);
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, boolean useV2ValueWriter) {
+this(schema, conf, compression, createParquetProperties(useV2ValueWriter));
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, ParquetProperties parquetProperties) {
+this.schema = schema;
+this.parquetProperties = parquetProperties;
+this.compressor = new CodecFactory(conf, 
this.parquetProperties.getPageSizeThreshold()).getCompressor(compression);
+  }
+
+  /**
+   * Merges the row groups making sure that new row groups do not exceed the 
supplied maxRowGroupSize
+   *
+   * @param inputFiles  input files to merge
+   * @param maxRowGroupSize the max limit for new blocks
+   * @param writer  writer to write the new blocks to
+   * @throws IOException if an IO error occurs
+   */
+  void merge(List inputFiles, final long maxRowGroupSize, 
ParquetFileWriter writer) throws IOException {
+
+SizeEstimator estimator = new SizeEstimator(compressor.getCodecName() != 
CompressionCodecName.UNCOMPRESSED);
+MutableMergedBlock mergedBlock = null;
+for (InputFile file : inputFiles) {
+  try (ParquetFileReader reader = ParquetFileReader.open(file)) {
+
+for (BlockMetaData blockMeta : 

[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2020-04-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088088#comment-17088088
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

brimzi commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411696939



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java
##
@@ -0,0 +1,634 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+import static org.apache.parquet.hadoop.ParquetFileWriter.getColumnsInOrder;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+class RowGroupMerger {
+
+  private final MessageType schema;
+  private final CodecFactory.BytesInputCompressor compressor;
+  private final ParquetProperties parquetProperties;
+
+  RowGroupMerger(MessageType schema, CompressionCodecName compression, boolean 
useV2ValueWriter) {
+this(schema, new Configuration(), compression, useV2ValueWriter);
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, boolean useV2ValueWriter) {
+this(schema, conf, compression, createParquetProperties(useV2ValueWriter));
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, ParquetProperties parquetProperties) {
+this.schema = schema;
+this.parquetProperties = parquetProperties;
+this.compressor = new CodecFactory(conf, 
this.parquetProperties.getPageSizeThreshold()).getCompressor(compression);
+  }
+
+  /**
+   * Merges the row groups making sure that new row groups do not exceed the 
supplied maxRowGroupSize
+   *
+   * @param inputFiles  input files to merge
+   * @param maxRowGroupSize the max limit for new blocks
+   * @param writer  writer to write the new blocks to
+   * @throws IOException if an IO error occurs
+   */
+  void merge(List inputFiles, final long maxRowGroupSize, 
ParquetFileWriter writer) throws IOException {
+
+SizeEstimator estimator = new SizeEstimator(compressor.getCodecName() != 
CompressionCodecName.UNCOMPRESSED);
+MutableMergedBlock mergedBlock = null;
+for (InputFile file : inputFiles) {
+  try (ParquetFileReader reader = ParquetFileReader.open(file)) {
+
+for (BlockMetaData blockMeta : 

[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2020-04-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088085#comment-17088085
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

brimzi commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411692398



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java
##
@@ -0,0 +1,634 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+import static org.apache.parquet.hadoop.ParquetFileWriter.getColumnsInOrder;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+class RowGroupMerger {
+
+  private final MessageType schema;
+  private final CodecFactory.BytesInputCompressor compressor;
+  private final ParquetProperties parquetProperties;
+
+  RowGroupMerger(MessageType schema, CompressionCodecName compression, boolean 
useV2ValueWriter) {
+this(schema, new Configuration(), compression, useV2ValueWriter);
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, boolean useV2ValueWriter) {
+this(schema, conf, compression, createParquetProperties(useV2ValueWriter));
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, ParquetProperties parquetProperties) {
+this.schema = schema;
+this.parquetProperties = parquetProperties;
+this.compressor = new CodecFactory(conf, 
this.parquetProperties.getPageSizeThreshold()).getCompressor(compression);
+  }
+
+  /**
+   * Merges the row groups making sure that new row groups do not exceed the 
supplied maxRowGroupSize
+   *
+   * @param inputFiles  input files to merge
+   * @param maxRowGroupSize the max limit for new blocks
+   * @param writer  writer to write the new blocks to
+   * @throws IOException if an IO error occurs
+   */
+  void merge(List inputFiles, final long maxRowGroupSize, 
ParquetFileWriter writer) throws IOException {
+
+SizeEstimator estimator = new SizeEstimator(compressor.getCodecName() != 
CompressionCodecName.UNCOMPRESSED);
+MutableMergedBlock mergedBlock = null;
+for (InputFile file : inputFiles) {
+  try (ParquetFileReader reader = ParquetFileReader.open(file)) {
+
+for (BlockMetaData blockMeta : 

[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2020-04-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17087282#comment-17087282
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

shangxinli commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411033348



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java
##
@@ -0,0 +1,634 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+import static org.apache.parquet.hadoop.ParquetFileWriter.getColumnsInOrder;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+class RowGroupMerger {
+
+  private final MessageType schema;
+  private final CodecFactory.BytesInputCompressor compressor;
+  private final ParquetProperties parquetProperties;
+
+  RowGroupMerger(MessageType schema, CompressionCodecName compression, boolean 
useV2ValueWriter) {
+this(schema, new Configuration(), compression, useV2ValueWriter);
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, boolean useV2ValueWriter) {
+this(schema, conf, compression, createParquetProperties(useV2ValueWriter));
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, ParquetProperties parquetProperties) {
+this.schema = schema;
+this.parquetProperties = parquetProperties;
+this.compressor = new CodecFactory(conf, 
this.parquetProperties.getPageSizeThreshold()).getCompressor(compression);
+  }
+
+  /**
+   * Merges the row groups making sure that new row groups do not exceed the 
supplied maxRowGroupSize
+   *
+   * @param inputFiles  input files to merge
+   * @param maxRowGroupSize the max limit for new blocks
+   * @param writer  writer to write the new blocks to
+   * @throws IOException if an IO error occurs
+   */
+  void merge(List inputFiles, final long maxRowGroupSize, 
ParquetFileWriter writer) throws IOException {
+
+SizeEstimator estimator = new SizeEstimator(compressor.getCodecName() != 
CompressionCodecName.UNCOMPRESSED);
+MutableMergedBlock mergedBlock = null;
+for (InputFile file : inputFiles) {
+  try (ParquetFileReader reader = ParquetFileReader.open(file)) {
+
+for (BlockMetaData blockMeta : 

[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2020-04-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17087280#comment-17087280
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

shangxinli commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411031080



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java
##
@@ -0,0 +1,634 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+import static org.apache.parquet.hadoop.ParquetFileWriter.getColumnsInOrder;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+class RowGroupMerger {
+
+  private final MessageType schema;
+  private final CodecFactory.BytesInputCompressor compressor;
+  private final ParquetProperties parquetProperties;
+
+  RowGroupMerger(MessageType schema, CompressionCodecName compression, boolean 
useV2ValueWriter) {
+this(schema, new Configuration(), compression, useV2ValueWriter);
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, boolean useV2ValueWriter) {
+this(schema, conf, compression, createParquetProperties(useV2ValueWriter));
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, ParquetProperties parquetProperties) {
+this.schema = schema;
+this.parquetProperties = parquetProperties;
+this.compressor = new CodecFactory(conf, 
this.parquetProperties.getPageSizeThreshold()).getCompressor(compression);
+  }
+
+  /**
+   * Merges the row groups making sure that new row groups do not exceed the 
supplied maxRowGroupSize
+   *
+   * @param inputFiles  input files to merge
+   * @param maxRowGroupSize the max limit for new blocks
+   * @param writer  writer to write the new blocks to
+   * @throws IOException if an IO error occurs
+   */
+  void merge(List inputFiles, final long maxRowGroupSize, 
ParquetFileWriter writer) throws IOException {
+
+SizeEstimator estimator = new SizeEstimator(compressor.getCodecName() != 
CompressionCodecName.UNCOMPRESSED);
+MutableMergedBlock mergedBlock = null;
+for (InputFile file : inputFiles) {
+  try (ParquetFileReader reader = ParquetFileReader.open(file)) {
+
+for (BlockMetaData blockMeta : 

[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2020-04-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17087279#comment-17087279
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

shangxinli commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411031080



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java
##
@@ -0,0 +1,634 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+import static org.apache.parquet.hadoop.ParquetFileWriter.getColumnsInOrder;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+class RowGroupMerger {
+
+  private final MessageType schema;
+  private final CodecFactory.BytesInputCompressor compressor;
+  private final ParquetProperties parquetProperties;
+
+  RowGroupMerger(MessageType schema, CompressionCodecName compression, boolean 
useV2ValueWriter) {
+this(schema, new Configuration(), compression, useV2ValueWriter);
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, boolean useV2ValueWriter) {
+this(schema, conf, compression, createParquetProperties(useV2ValueWriter));
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, ParquetProperties parquetProperties) {
+this.schema = schema;
+this.parquetProperties = parquetProperties;
+this.compressor = new CodecFactory(conf, 
this.parquetProperties.getPageSizeThreshold()).getCompressor(compression);
+  }
+
+  /**
+   * Merges the row groups making sure that new row groups do not exceed the 
supplied maxRowGroupSize
+   *
+   * @param inputFiles  input files to merge
+   * @param maxRowGroupSize the max limit for new blocks
+   * @param writer  writer to write the new blocks to
+   * @throws IOException if an IO error occurs
+   */
+  void merge(List inputFiles, final long maxRowGroupSize, 
ParquetFileWriter writer) throws IOException {
+
+SizeEstimator estimator = new SizeEstimator(compressor.getCodecName() != 
CompressionCodecName.UNCOMPRESSED);
+MutableMergedBlock mergedBlock = null;
+for (InputFile file : inputFiles) {
+  try (ParquetFileReader reader = ParquetFileReader.open(file)) {
+
+for (BlockMetaData blockMeta : 

[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2020-04-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17087276#comment-17087276
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

shangxinli commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411030497



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java
##
@@ -0,0 +1,634 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+import static org.apache.parquet.hadoop.ParquetFileWriter.getColumnsInOrder;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+class RowGroupMerger {
+
+  private final MessageType schema;
+  private final CodecFactory.BytesInputCompressor compressor;
+  private final ParquetProperties parquetProperties;
+
+  RowGroupMerger(MessageType schema, CompressionCodecName compression, boolean 
useV2ValueWriter) {
+this(schema, new Configuration(), compression, useV2ValueWriter);
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, boolean useV2ValueWriter) {
+this(schema, conf, compression, createParquetProperties(useV2ValueWriter));
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, ParquetProperties parquetProperties) {
+this.schema = schema;
+this.parquetProperties = parquetProperties;
+this.compressor = new CodecFactory(conf, 
this.parquetProperties.getPageSizeThreshold()).getCompressor(compression);
+  }
+
+  /**
+   * Merges the row groups making sure that new row groups do not exceed the 
supplied maxRowGroupSize
+   *
+   * @param inputFiles  input files to merge
+   * @param maxRowGroupSize the max limit for new blocks
+   * @param writer  writer to write the new blocks to
+   * @throws IOException if an IO error occurs
+   */
+  void merge(List inputFiles, final long maxRowGroupSize, 
ParquetFileWriter writer) throws IOException {
+
+SizeEstimator estimator = new SizeEstimator(compressor.getCodecName() != 
CompressionCodecName.UNCOMPRESSED);
+MutableMergedBlock mergedBlock = null;
+for (InputFile file : inputFiles) {
+  try (ParquetFileReader reader = ParquetFileReader.open(file)) {
+
+for (BlockMetaData blockMeta : 

[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2020-04-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17087267#comment-17087267
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

shangxinli commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411026823



##
File path: 
parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
##
@@ -18,27 +18,37 @@
  */
 package org.apache.parquet.tools.command;
 
+import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
 import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.parquet.hadoop.util.HadoopInputFile;
-import org.apache.parquet.hadoop.util.HiddenFileFilter;
+import org.apache.parquet.column.EncodingStats;
 import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.metadata.FileMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HiddenFileFilter;
+import org.apache.parquet.io.InputFile;
 import org.apache.parquet.tools.Main;
 
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.List;
-
 public class MergeCommand extends ArgsOnlyCommand {

Review comment:
   Add tests for it





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2020-03-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17062971#comment-17062971
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

brimzi commented on pull request #775: PARQUET-1381: add parquet block merging 
feature
URL: https://github.com/apache/parquet-mr/pull/775
 
 
   This PR addresses 
[PARQUET-1381](https://issues.apache.org/jira/projects/PARQUET/issues/PARQUET-1381).
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2020-02-24 Thread Gabor Szadovszky (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17043505#comment-17043505
 ] 

Gabor Szadovszky commented on PARQUET-1381:
---

I don't think anyone is working on it. Feel free to open a PR and I'm happy to 
review.
However, when I had to revert the previous implementation I was thinking a 
correct solution. It turned out that it's so complicated to be optimal in 
performance yet correct from statistics point of view. Currently, I am not sure 
if it worth the potential efforts to implement this feature while you can get a 
similar functionality by rewriting a whole table of parquet files in a 
distributed way using a query engine (e.g. Hive/Impala/Spark). Also, you can do 
it as a background process using the Hive compaction feature.

> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2020-02-23 Thread Brian Mwambazi (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17042992#comment-17042992
 ] 

Brian Mwambazi commented on PARQUET-1381:
-

Is anyone working on this issue? I have an implementation I would like to 
contribute. [~Katya]  [~gszadovszky]

> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2019-02-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16772081#comment-16772081
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

gszadovszky commented on pull request #621: Revert "PARQUET-1381: Add merge 
blocks command to parquet-tools (#512)"
URL: https://github.com/apache/parquet-mr/pull/621
 
 
   This reverts commit 863a081850e5638d7b68b478a3bd40779723.
   
   The design of this feature has conceptional problems and also works 
incorrectly. See PARQUET-1381 for more details.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2019-02-19 Thread Gabor Szadovszky (JIRA)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16772080#comment-16772080
 ] 

Gabor Szadovszky commented on PARQUET-1381:
---

The design of this feature has conceptional problems and also works incorrectly 
(see details below). I propose reverting this change from master (and the 
release 1.11.0). If one would like to have this feature in a later release feel 
free to working on it but I think the design concept might require to be 
changed. I am happy to help in both the design or the implementation.

The current parquet-mr API is designed for writing values in a row-by-row 
manner therefore not suitable for this feature which tries to copy the 
column-chunks value-by-value. The current implementation of this feature 
misuses the ColumnWriteStore API by writing only one column at a time and 
signaling end records. This leads to writing empty pages and other serious 
issues according to the different record counts calculated at the different API 
levels. To make this concept work some internal logic should be populated at 
page writing level (closing the page, creating the dictionary, page size 
calculation etc.) or a completely separate API should be created.

> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.11.0
>
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2018-09-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16610495#comment-16610495
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

zivanfi closed pull request #512: PARQUET-1381: Add merge blocks command to 
parquet-tools
URL: https://github.com/apache/parquet-mr/pull/512
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
 
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
index 37845961a..e582908ca 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReadStoreImpl.java
@@ -75,7 +75,7 @@ public ColumnReader getColumnReader(ColumnDescriptor path) {
 return newMemColumnReader(path, pageReadStore.getPageReader(path));
   }
 
-  private ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, 
PageReader pageReader) {
+  public ColumnReaderImpl newMemColumnReader(ColumnDescriptor path, PageReader 
pageReader) {
 PrimitiveConverter converter = getPrimitiveConverter(path);
 return new ColumnReaderImpl(path, pageReader, converter, writerVersion);
   }
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index 82c288fe4..5349dc28a 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -262,4 +262,9 @@ public void flushToFileWriter(ParquetFileWriter writer) 
throws IOException {
 }
   }
 
+  void flushToFileWriter(ColumnDescriptor path, ParquetFileWriter writer) 
throws IOException {
+ColumnChunkPageWriter pageWriter = writers.get(path);
+pageWriter.writeToFileWriter(writer);
+  }
+
 }
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 15fe592db..527c8313b 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -40,6 +40,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -56,6 +57,7 @@
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.page.DictionaryPageReadStore;
+import org.apache.parquet.column.page.PageReader;
 import 
org.apache.parquet.compression.CompressionCodecFactory.BytesInputDecompressor;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.compat.RowGroupFilter;
@@ -1160,27 +1162,8 @@ public void addChunk(ChunkDescriptor descriptor) {
  * @throws IOException if there is an error while reading from the stream
  */
 public List readAll(SeekableInputStream f) throws IOException {
-  List result = new ArrayList(chunks.size());
-  f.seek(offset);
-
-  int fullAllocations = length / options.getMaxAllocationSize();
-  int lastAllocationSize = length % options.getMaxAllocationSize();
-
-  int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
-  List buffers = new ArrayList<>(numAllocations);
-
-  for (int i = 0; i < fullAllocations; i += 1) {
-
buffers.add(options.getAllocator().allocate(options.getMaxAllocationSize()));
-  }
-
-  if (lastAllocationSize > 0) {
-buffers.add(options.getAllocator().allocate(lastAllocationSize));
-  }
-
-  for (ByteBuffer buffer : buffers) {
-f.readFully(buffer);
-buffer.flip();
-  }
+  List result = new ArrayList<>(chunks.size());
+  List buffers = readBlocks(f, offset, length);
 
   // report in a counter the data we just scanned
   BenchmarkCounter.incrementBytesRead(length);
@@ -1206,4 +1189,72 @@ public long endPos() {
 
   }
 
+  /**
+   * @param f file to read the blocks from
+   * @return the ByteBuffer blocks
+   * @throws IOException if there is an error while reading from the stream
+   */
+  List readBlocks(SeekableInputStream f, long offset, int length) 
throws IOException {
+f.seek(offset);
+
+int fullAllocations = length / options.getMaxAllocationSize();
+int lastAllocationSize = length %