Re: Parquet - 41

2020-04-20 Thread Junjie Chen
As far as I know, not implemented yet. The thrift is update-to-date now,
would you like to contribute?

Things we need are:
1. xxhash c++ implementation
2. reader and writer for the bloom filter
3. filtering logic for row group

Implementing the reader would be a good start.

On Tue, Apr 21, 2020 at 8:52 AM  wrote:

> Hi
>
> Is the  C++ version of bloom filter implemented in Arrow Parquet C++?
>
> https://issues.apache.org/jira/browse/PARQUET-41
> [PARQUET-41] Add bloom filters to parquet statistics - ASF JIRA<
> https://issues.apache.org/jira/browse/PARQUET-41>
> For row groups with no dictionary, we could still produce a bloom filter.
> This could be very useful in filtering entire row groups. Pull request:
> https://github.com ...
> issues.apache.org
> Regards
>


-- 
Best Regards


Re: Parquet - 41

2020-04-20 Thread ARL122
Hi

Is the  C++ version of bloom filter implemented in Arrow Parquet C++?

https://issues.apache.org/jira/browse/PARQUET-41
[PARQUET-41] Add bloom filters to parquet statistics - ASF 
JIRA
For row groups with no dictionary, we could still produce a bloom filter. This 
could be very useful in filtering entire row groups. Pull request: 
https://github.com ...
issues.apache.org
Regards


[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2020-04-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088145#comment-17088145
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

brimzi commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411741293



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
##
@@ -919,6 +895,59 @@ public void appendRowGroup(SeekableInputStream from, 
BlockMetaData rowGroup,
 endBlock();
   }
 
+  /**
+   * Merges adjacent row groups in the supplied files while maintaining that 
the new groups is no more than the specified
+   * maxRowGroupSize
+   * @param inputFiles input files to merge
+   * @param maxRowGroupSize the maximum size in bytes the new created groups 
can be
+   * @param useV2Writer whether to use a V2 encoding based writer when 
rewriting dictionary encoded pages
+   * @param compression compression to use when writing
+   * @throws IOException
+   */
+  public void mergeRowGroups(List inputFiles, long maxRowGroupSize, 
boolean useV2Writer, CompressionCodecName compression) throws IOException {

Review comment:
   That is a good question. My thinking is that this method is similar(an 
improvement) to _appendRowGroup_ which is a public API. In most use cases the 
user of _appendRowGroup_ actually wants the functionality provided by this 
method.  But it boils down to if _appendRowGroup_ really belonged here in the 
first place. I have no strong position on this, do you think I should not 
expose this new method?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [parquet-mr] brimzi commented on a change in pull request #775: PARQUET-1381: add parquet block merging feature

2020-04-20 Thread GitBox


brimzi commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411741293



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
##
@@ -919,6 +895,59 @@ public void appendRowGroup(SeekableInputStream from, 
BlockMetaData rowGroup,
 endBlock();
   }
 
+  /**
+   * Merges adjacent row groups in the supplied files while maintaining that 
the new groups is no more than the specified
+   * maxRowGroupSize
+   * @param inputFiles input files to merge
+   * @param maxRowGroupSize the maximum size in bytes the new created groups 
can be
+   * @param useV2Writer whether to use a V2 encoding based writer when 
rewriting dictionary encoded pages
+   * @param compression compression to use when writing
+   * @throws IOException
+   */
+  public void mergeRowGroups(List inputFiles, long maxRowGroupSize, 
boolean useV2Writer, CompressionCodecName compression) throws IOException {

Review comment:
   That is a good question. My thinking is that this method is similar(an 
improvement) to _appendRowGroup_ which is a public API. In most use cases the 
user of _appendRowGroup_ actually wants the functionality provided by this 
method.  But it boils down to if _appendRowGroup_ really belonged here in the 
first place. I have no strong position on this, do you think I should not 
expose this new method?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2020-04-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088105#comment-17088105
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

brimzi commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411711834



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java
##
@@ -0,0 +1,634 @@
+/*

Review comment:
   Same issue here as above, Functional Interface forces us to do so

##
File path: 
parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
##
@@ -63,28 +104,70 @@ public MergeCommand() {
   @Override
   public String getCommandDescription() {
 return "Merges multiple Parquet files into one. " +
-  "The command doesn't merge row groups, just places one after the other. 
" +
+  "Without -b option the command doesn't merge row groups, just places one 
after the other. " +
   "When used to merge many small files, the resulting file will still 
contain small row groups, " +
-  "which usually leads to bad query performance.";
+  "which usually leads to bad query performance. " +
+  "To have adjacent blocks(row groups) merged together use -b option. " +
+  "Blocks will be grouped into larger one until the upper bound is 
reached. " +
+  "Default block upper bound 128 MB and default compression SNAPPY can be 
customized using -l and -c options";
   }
 
   @Override
   public void execute(CommandLine options) throws Exception {
+super.execute(options);
+
+boolean mergeBlocks = options.hasOption('b');
+
 // Prepare arguments
 List args = options.getArgList();
-List inputFiles = getInputFiles(args.subList(0, args.size() - 1));
+List files = getInputFiles(args.subList(0, args.size() - 1));
 Path outputFile = new Path(args.get(args.size() - 1));
-
 // Merge schema and extraMeta
-FileMetaData mergedMeta = mergedMetadata(inputFiles);
-PrintWriter out = new PrintWriter(Main.out, true);
-
-// Merge data
+ParquetMetadata parquetMetadata = mergedMetadata(files);
 ParquetFileWriter writer = new ParquetFileWriter(conf,
-mergedMeta.getSchema(), outputFile, ParquetFileWriter.Mode.CREATE);
+  parquetMetadata.getFileMetaData().getSchema(), outputFile, 
ParquetFileWriter.Mode.CREATE);
+PrintWriter stdOut = new PrintWriter(Main.out, true);
+
+if (mergeBlocks) {
+  long maxRowGroupSize = options.hasOption('l')? 
Long.parseLong(options.getOptionValue('l')) * 1024 * 1024 : DEFAULT_BLOCK_SIZE;
+  CompressionCodecName compression = options.hasOption('c') ?
+CompressionCodecName.valueOf(options.getOptionValue('c')) : 
CompressionCodecName.SNAPPY;
+
+  stdOut.println("Merging files and row-groups using " + 
compression.name() + " for compression and " + maxRowGroupSize
++ " bytes as the upper bound for new row groups . ");
+  mergeRowGroups(files, parquetMetadata, writer, maxRowGroupSize, 
compression);
+} else {
+  appendRowGroups(files, parquetMetadata.getFileMetaData(), writer, 
stdOut);
+}
+  }
+
+  private void mergeRowGroups(List files, ParquetMetadata 
parquetMetadata, ParquetFileWriter writer,
+  long maxRowGroupSize, CompressionCodecName 
compression) throws IOException {
+
+boolean v2EncodingHint = parquetMetadata.getBlocks().stream()
+  .flatMap(b -> b.getColumns().stream())
+  .anyMatch(chunk -> {
+EncodingStats stats = chunk.getEncodingStats();
+return stats != null && stats.usesV2Pages();
+  });
+
+List inputFiles = files.stream().map(f -> {
+  try {
+return HadoopInputFile.fromPath(f, conf);
+  } catch (IOException e) {
+throw new UncheckedIOException(e);

Review comment:
   Same issue here as above, Functional Interface forces us to do so





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into 

[GitHub] [parquet-mr] brimzi commented on a change in pull request #775: PARQUET-1381: add parquet block merging feature

2020-04-20 Thread GitBox


brimzi commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411711834



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java
##
@@ -0,0 +1,634 @@
+/*

Review comment:
   Same issue here as above, Functional Interface forces us to do so

##
File path: 
parquet-tools/src/main/java/org/apache/parquet/tools/command/MergeCommand.java
##
@@ -63,28 +104,70 @@ public MergeCommand() {
   @Override
   public String getCommandDescription() {
 return "Merges multiple Parquet files into one. " +
-  "The command doesn't merge row groups, just places one after the other. 
" +
+  "Without -b option the command doesn't merge row groups, just places one 
after the other. " +
   "When used to merge many small files, the resulting file will still 
contain small row groups, " +
-  "which usually leads to bad query performance.";
+  "which usually leads to bad query performance. " +
+  "To have adjacent blocks(row groups) merged together use -b option. " +
+  "Blocks will be grouped into larger one until the upper bound is 
reached. " +
+  "Default block upper bound 128 MB and default compression SNAPPY can be 
customized using -l and -c options";
   }
 
   @Override
   public void execute(CommandLine options) throws Exception {
+super.execute(options);
+
+boolean mergeBlocks = options.hasOption('b');
+
 // Prepare arguments
 List args = options.getArgList();
-List inputFiles = getInputFiles(args.subList(0, args.size() - 1));
+List files = getInputFiles(args.subList(0, args.size() - 1));
 Path outputFile = new Path(args.get(args.size() - 1));
-
 // Merge schema and extraMeta
-FileMetaData mergedMeta = mergedMetadata(inputFiles);
-PrintWriter out = new PrintWriter(Main.out, true);
-
-// Merge data
+ParquetMetadata parquetMetadata = mergedMetadata(files);
 ParquetFileWriter writer = new ParquetFileWriter(conf,
-mergedMeta.getSchema(), outputFile, ParquetFileWriter.Mode.CREATE);
+  parquetMetadata.getFileMetaData().getSchema(), outputFile, 
ParquetFileWriter.Mode.CREATE);
+PrintWriter stdOut = new PrintWriter(Main.out, true);
+
+if (mergeBlocks) {
+  long maxRowGroupSize = options.hasOption('l')? 
Long.parseLong(options.getOptionValue('l')) * 1024 * 1024 : DEFAULT_BLOCK_SIZE;
+  CompressionCodecName compression = options.hasOption('c') ?
+CompressionCodecName.valueOf(options.getOptionValue('c')) : 
CompressionCodecName.SNAPPY;
+
+  stdOut.println("Merging files and row-groups using " + 
compression.name() + " for compression and " + maxRowGroupSize
++ " bytes as the upper bound for new row groups . ");
+  mergeRowGroups(files, parquetMetadata, writer, maxRowGroupSize, 
compression);
+} else {
+  appendRowGroups(files, parquetMetadata.getFileMetaData(), writer, 
stdOut);
+}
+  }
+
+  private void mergeRowGroups(List files, ParquetMetadata 
parquetMetadata, ParquetFileWriter writer,
+  long maxRowGroupSize, CompressionCodecName 
compression) throws IOException {
+
+boolean v2EncodingHint = parquetMetadata.getBlocks().stream()
+  .flatMap(b -> b.getColumns().stream())
+  .anyMatch(chunk -> {
+EncodingStats stats = chunk.getEncodingStats();
+return stats != null && stats.usesV2Pages();
+  });
+
+List inputFiles = files.stream().map(f -> {
+  try {
+return HadoopInputFile.fromPath(f, conf);
+  } catch (IOException e) {
+throw new UncheckedIOException(e);

Review comment:
   Same issue here as above, Functional Interface forces us to do so





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2020-04-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088102#comment-17088102
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

brimzi commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411711834



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java
##
@@ -0,0 +1,634 @@
+/*

Review comment:
   Same issue here as above, Functional Interface forces us to do so





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add merge blocks command to parquet-tools
> -
>
> Key: PARQUET-1381
> URL: https://issues.apache.org/jira/browse/PARQUET-1381
> Project: Parquet
>  Issue Type: New Feature
>  Components: parquet-mr
>Affects Versions: 1.10.0
>Reporter: Ekaterina Galieva
>Assignee: Ekaterina Galieva
>Priority: Major
>  Labels: pull-request-available
>
> Current implementation of merge command in parquet-tools doesn't merge row 
> groups, just places one after the other. Add API and command option to be 
> able to merge small blocks into larger ones up to specified size limit.
> h6. Implementation details:
> Blocks are not reordered not to break possible initial predicate pushdown 
> optimizations.
> Blocks are not divided to fit upper bound perfectly. 
> This is an intentional performance optimization. 
> This gives an opportunity to form new blocks by coping full content of 
> smaller blocks by column, not by row.
> h6. Examples:
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [128 | 40], [120]{code}
> Expected output file blocks sizes:
> {{merge }}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b}}
> {code:java}
> [128 | 35 | 128 | 40 | 120]
> {code}
> {{merge -b -l 256 }}
> {code:java}
> [163 | 168 | 120]
> {code}
>  # Input files with blocks sizes:
> {code:java}
> [128 | 35], [40], [120], [6] {code}
> Expected output file blocks sizes:
> {{merge}}
> {code:java}
> [128 | 35 | 40 | 120 | 6] 
> {code}
> {{merge -b}}
> {code:java}
> [128 | 75 | 126] 
> {code}
> {{merge -b -l 256}}
> {code:java}
> [203 | 126]{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [parquet-mr] brimzi commented on a change in pull request #775: PARQUET-1381: add parquet block merging feature

2020-04-20 Thread GitBox


brimzi commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411711834



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java
##
@@ -0,0 +1,634 @@
+/*

Review comment:
   Same issue here as above, Functional Interface forces us to do so





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [parquet-mr] brimzi commented on a change in pull request #775: PARQUET-1381: add parquet block merging feature

2020-04-20 Thread GitBox


brimzi commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411708296



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java
##
@@ -0,0 +1,634 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+import static org.apache.parquet.hadoop.ParquetFileWriter.getColumnsInOrder;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+class RowGroupMerger {
+
+  private final MessageType schema;
+  private final CodecFactory.BytesInputCompressor compressor;
+  private final ParquetProperties parquetProperties;
+
+  RowGroupMerger(MessageType schema, CompressionCodecName compression, boolean 
useV2ValueWriter) {
+this(schema, new Configuration(), compression, useV2ValueWriter);
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, boolean useV2ValueWriter) {
+this(schema, conf, compression, createParquetProperties(useV2ValueWriter));
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, ParquetProperties parquetProperties) {
+this.schema = schema;
+this.parquetProperties = parquetProperties;
+this.compressor = new CodecFactory(conf, 
this.parquetProperties.getPageSizeThreshold()).getCompressor(compression);
+  }
+
+  /**
+   * Merges the row groups making sure that new row groups do not exceed the 
supplied maxRowGroupSize
+   *
+   * @param inputFiles  input files to merge
+   * @param maxRowGroupSize the max limit for new blocks
+   * @param writer  writer to write the new blocks to
+   * @throws IOException if an IO error occurs
+   */
+  void merge(List inputFiles, final long maxRowGroupSize, 
ParquetFileWriter writer) throws IOException {
+
+SizeEstimator estimator = new SizeEstimator(compressor.getCodecName() != 
CompressionCodecName.UNCOMPRESSED);
+MutableMergedBlock mergedBlock = null;
+for (InputFile file : inputFiles) {
+  try (ParquetFileReader reader = ParquetFileReader.open(file)) {
+
+for (BlockMetaData blockMeta : reader.getRowGroups()) {
+  PageReadStore group = reader.readNextRowGroup();
+  Preconditions.checkState(group != null,
+"number of groups returned by FileReader does not match metadata");
+
+  if (mergedBlock != null && 

[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2020-04-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088099#comment-17088099
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

brimzi commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411708296



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java
##
@@ -0,0 +1,634 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+import static org.apache.parquet.hadoop.ParquetFileWriter.getColumnsInOrder;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+class RowGroupMerger {
+
+  private final MessageType schema;
+  private final CodecFactory.BytesInputCompressor compressor;
+  private final ParquetProperties parquetProperties;
+
+  RowGroupMerger(MessageType schema, CompressionCodecName compression, boolean 
useV2ValueWriter) {
+this(schema, new Configuration(), compression, useV2ValueWriter);
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, boolean useV2ValueWriter) {
+this(schema, conf, compression, createParquetProperties(useV2ValueWriter));
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, ParquetProperties parquetProperties) {
+this.schema = schema;
+this.parquetProperties = parquetProperties;
+this.compressor = new CodecFactory(conf, 
this.parquetProperties.getPageSizeThreshold()).getCompressor(compression);
+  }
+
+  /**
+   * Merges the row groups making sure that new row groups do not exceed the 
supplied maxRowGroupSize
+   *
+   * @param inputFiles  input files to merge
+   * @param maxRowGroupSize the max limit for new blocks
+   * @param writer  writer to write the new blocks to
+   * @throws IOException if an IO error occurs
+   */
+  void merge(List inputFiles, final long maxRowGroupSize, 
ParquetFileWriter writer) throws IOException {
+
+SizeEstimator estimator = new SizeEstimator(compressor.getCodecName() != 
CompressionCodecName.UNCOMPRESSED);
+MutableMergedBlock mergedBlock = null;
+for (InputFile file : inputFiles) {
+  try (ParquetFileReader reader = ParquetFileReader.open(file)) {
+
+for (BlockMetaData blockMeta : 

[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2020-04-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088088#comment-17088088
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

brimzi commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411696939



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java
##
@@ -0,0 +1,634 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+import static org.apache.parquet.hadoop.ParquetFileWriter.getColumnsInOrder;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+class RowGroupMerger {
+
+  private final MessageType schema;
+  private final CodecFactory.BytesInputCompressor compressor;
+  private final ParquetProperties parquetProperties;
+
+  RowGroupMerger(MessageType schema, CompressionCodecName compression, boolean 
useV2ValueWriter) {
+this(schema, new Configuration(), compression, useV2ValueWriter);
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, boolean useV2ValueWriter) {
+this(schema, conf, compression, createParquetProperties(useV2ValueWriter));
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, ParquetProperties parquetProperties) {
+this.schema = schema;
+this.parquetProperties = parquetProperties;
+this.compressor = new CodecFactory(conf, 
this.parquetProperties.getPageSizeThreshold()).getCompressor(compression);
+  }
+
+  /**
+   * Merges the row groups making sure that new row groups do not exceed the 
supplied maxRowGroupSize
+   *
+   * @param inputFiles  input files to merge
+   * @param maxRowGroupSize the max limit for new blocks
+   * @param writer  writer to write the new blocks to
+   * @throws IOException if an IO error occurs
+   */
+  void merge(List inputFiles, final long maxRowGroupSize, 
ParquetFileWriter writer) throws IOException {
+
+SizeEstimator estimator = new SizeEstimator(compressor.getCodecName() != 
CompressionCodecName.UNCOMPRESSED);
+MutableMergedBlock mergedBlock = null;
+for (InputFile file : inputFiles) {
+  try (ParquetFileReader reader = ParquetFileReader.open(file)) {
+
+for (BlockMetaData blockMeta : 

[GitHub] [parquet-mr] brimzi commented on a change in pull request #775: PARQUET-1381: add parquet block merging feature

2020-04-20 Thread GitBox


brimzi commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411696939



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java
##
@@ -0,0 +1,634 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+import static org.apache.parquet.hadoop.ParquetFileWriter.getColumnsInOrder;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+class RowGroupMerger {
+
+  private final MessageType schema;
+  private final CodecFactory.BytesInputCompressor compressor;
+  private final ParquetProperties parquetProperties;
+
+  RowGroupMerger(MessageType schema, CompressionCodecName compression, boolean 
useV2ValueWriter) {
+this(schema, new Configuration(), compression, useV2ValueWriter);
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, boolean useV2ValueWriter) {
+this(schema, conf, compression, createParquetProperties(useV2ValueWriter));
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, ParquetProperties parquetProperties) {
+this.schema = schema;
+this.parquetProperties = parquetProperties;
+this.compressor = new CodecFactory(conf, 
this.parquetProperties.getPageSizeThreshold()).getCompressor(compression);
+  }
+
+  /**
+   * Merges the row groups making sure that new row groups do not exceed the 
supplied maxRowGroupSize
+   *
+   * @param inputFiles  input files to merge
+   * @param maxRowGroupSize the max limit for new blocks
+   * @param writer  writer to write the new blocks to
+   * @throws IOException if an IO error occurs
+   */
+  void merge(List inputFiles, final long maxRowGroupSize, 
ParquetFileWriter writer) throws IOException {
+
+SizeEstimator estimator = new SizeEstimator(compressor.getCodecName() != 
CompressionCodecName.UNCOMPRESSED);
+MutableMergedBlock mergedBlock = null;
+for (InputFile file : inputFiles) {
+  try (ParquetFileReader reader = ParquetFileReader.open(file)) {
+
+for (BlockMetaData blockMeta : reader.getRowGroups()) {
+  PageReadStore group = reader.readNextRowGroup();
+  Preconditions.checkState(group != null,
+"number of groups returned by FileReader does not match metadata");
+
+  if (mergedBlock != null && 

[GitHub] [parquet-mr] brimzi commented on a change in pull request #775: PARQUET-1381: add parquet block merging feature

2020-04-20 Thread GitBox


brimzi commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411692398



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java
##
@@ -0,0 +1,634 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+import static org.apache.parquet.hadoop.ParquetFileWriter.getColumnsInOrder;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+class RowGroupMerger {
+
+  private final MessageType schema;
+  private final CodecFactory.BytesInputCompressor compressor;
+  private final ParquetProperties parquetProperties;
+
+  RowGroupMerger(MessageType schema, CompressionCodecName compression, boolean 
useV2ValueWriter) {
+this(schema, new Configuration(), compression, useV2ValueWriter);
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, boolean useV2ValueWriter) {
+this(schema, conf, compression, createParquetProperties(useV2ValueWriter));
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, ParquetProperties parquetProperties) {
+this.schema = schema;
+this.parquetProperties = parquetProperties;
+this.compressor = new CodecFactory(conf, 
this.parquetProperties.getPageSizeThreshold()).getCompressor(compression);
+  }
+
+  /**
+   * Merges the row groups making sure that new row groups do not exceed the 
supplied maxRowGroupSize
+   *
+   * @param inputFiles  input files to merge
+   * @param maxRowGroupSize the max limit for new blocks
+   * @param writer  writer to write the new blocks to
+   * @throws IOException if an IO error occurs
+   */
+  void merge(List inputFiles, final long maxRowGroupSize, 
ParquetFileWriter writer) throws IOException {
+
+SizeEstimator estimator = new SizeEstimator(compressor.getCodecName() != 
CompressionCodecName.UNCOMPRESSED);
+MutableMergedBlock mergedBlock = null;
+for (InputFile file : inputFiles) {
+  try (ParquetFileReader reader = ParquetFileReader.open(file)) {
+
+for (BlockMetaData blockMeta : reader.getRowGroups()) {
+  PageReadStore group = reader.readNextRowGroup();
+  Preconditions.checkState(group != null,
+"number of groups returned by FileReader does not match metadata");
+
+  if (mergedBlock != null && 

[jira] [Commented] (PARQUET-1381) Add merge blocks command to parquet-tools

2020-04-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088085#comment-17088085
 ] 

ASF GitHub Bot commented on PARQUET-1381:
-

brimzi commented on a change in pull request #775:
URL: https://github.com/apache/parquet-mr/pull/775#discussion_r411692398



##
File path: 
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowGroupMerger.java
##
@@ -0,0 +1,634 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import static java.lang.String.format;
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+import static org.apache.parquet.hadoop.ParquetFileWriter.getColumnsInOrder;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.compression.CompressionCodecFactory;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+
+class RowGroupMerger {
+
+  private final MessageType schema;
+  private final CodecFactory.BytesInputCompressor compressor;
+  private final ParquetProperties parquetProperties;
+
+  RowGroupMerger(MessageType schema, CompressionCodecName compression, boolean 
useV2ValueWriter) {
+this(schema, new Configuration(), compression, useV2ValueWriter);
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, boolean useV2ValueWriter) {
+this(schema, conf, compression, createParquetProperties(useV2ValueWriter));
+  }
+
+  RowGroupMerger(MessageType schema, Configuration conf, CompressionCodecName 
compression, ParquetProperties parquetProperties) {
+this.schema = schema;
+this.parquetProperties = parquetProperties;
+this.compressor = new CodecFactory(conf, 
this.parquetProperties.getPageSizeThreshold()).getCompressor(compression);
+  }
+
+  /**
+   * Merges the row groups making sure that new row groups do not exceed the 
supplied maxRowGroupSize
+   *
+   * @param inputFiles  input files to merge
+   * @param maxRowGroupSize the max limit for new blocks
+   * @param writer  writer to write the new blocks to
+   * @throws IOException if an IO error occurs
+   */
+  void merge(List inputFiles, final long maxRowGroupSize, 
ParquetFileWriter writer) throws IOException {
+
+SizeEstimator estimator = new SizeEstimator(compressor.getCodecName() != 
CompressionCodecName.UNCOMPRESSED);
+MutableMergedBlock mergedBlock = null;
+for (InputFile file : inputFiles) {
+  try (ParquetFileReader reader = ParquetFileReader.open(file)) {
+
+for (BlockMetaData blockMeta : 

[GitHub] [parquet-mr] shangxinli commented on issue #776: PARQUET-1229: Parquet MR encryption

2020-04-20 Thread GitBox


shangxinli commented on issue #776:
URL: https://github.com/apache/parquet-mr/pull/776#issuecomment-616599564


   Is this ready to review? Since there is no comment yet, can you squash it to 
one single commit to make the review easier? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (PARQUET-1229) parquet-mr code changes for encryption support

2020-04-20 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17087801#comment-17087801
 ] 

ASF GitHub Bot commented on PARQUET-1229:
-

shangxinli commented on issue #776:
URL: https://github.com/apache/parquet-mr/pull/776#issuecomment-616599564


   Is this ready to review? Since there is no comment yet, can you squash it to 
one single commit to make the review easier? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> parquet-mr code changes for encryption support
> --
>
> Key: PARQUET-1229
> URL: https://issues.apache.org/jira/browse/PARQUET-1229
> Project: Parquet
>  Issue Type: Sub-task
>  Components: parquet-mr
>Reporter: Gidon Gershinsky
>Assignee: Gidon Gershinsky
>Priority: Major
>  Labels: pull-request-available
>
> Addition of encryption/decryption support to the existing Parquet classes and 
> APIs



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (PARQUET-1699) Could not resolve org.apache.yetus:audience-annotations:0.11.0

2020-04-20 Thread Gabor Szadovszky (Jira)


 [ 
https://issues.apache.org/jira/browse/PARQUET-1699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Szadovszky resolved PARQUET-1699.
---
Resolution: Fixed

> Could not resolve org.apache.yetus:audience-annotations:0.11.0
> --
>
> Key: PARQUET-1699
> URL: https://issues.apache.org/jira/browse/PARQUET-1699
> Project: Parquet
>  Issue Type: Bug
>Affects Versions: 1.11.0
>Reporter: Priyank Bagrecha
>Assignee: Priyank Bagrecha
>Priority: Major
>
> Trying to use parquet-protobuf and get this via parquet-common. I'm using 
> latest on master branch
> {code:java}
> > Could not resolve org.apache.yetus:audience-annotations:0.11.0.
>   Required by:
>   project : > org.apache.parquet:parquet-common:1.11.0-SNAPSHOT
>> Could not resolve org.apache.yetus:audience-annotations:0.11.0.
>   > Could not parse POM 
> /Users/pbagrecha/.m2/repository/org/apache/yetus/audience-annotations/0.11.0/audience-annotations-0.11.0.pom
>  > Unable to resolve version for dependency 'jdk.tools:jdk.tools:jar'
>> Could not resolve org.apache.yetus:audience-annotations:0.11.0.
>   > Could not parse POM 
> https://repo1.maven.org/maven2/org/apache/yetus/audience-annotations/0.11.0/audience-annotations-0.11.0.pom
>  > Unable to resolve version for dependency 'jdk.tools:jdk.tools:jar'
>> Could not resolve org.apache.yetus:audience-annotations:0.11.0.
>   > Could not parse POM 
> https://jcenter.bintray.com/org/apache/yetus/audience-annotations/0.11.0/audience-annotations-0.11.0.pom
>  > Unable to resolve version for dependency 
> 'jdk.tools:jdk.tools:jar'{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (PARQUET-1699) Could not resolve org.apache.yetus:audience-annotations:0.11.0

2020-04-20 Thread Gabor Szadovszky (Jira)


 [ 
https://issues.apache.org/jira/browse/PARQUET-1699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Szadovszky reassigned PARQUET-1699:
-

Assignee: Priyank Bagrecha

> Could not resolve org.apache.yetus:audience-annotations:0.11.0
> --
>
> Key: PARQUET-1699
> URL: https://issues.apache.org/jira/browse/PARQUET-1699
> Project: Parquet
>  Issue Type: Bug
>Affects Versions: 1.11.0
>Reporter: Priyank Bagrecha
>Assignee: Priyank Bagrecha
>Priority: Major
>
> Trying to use parquet-protobuf and get this via parquet-common. I'm using 
> latest on master branch
> {code:java}
> > Could not resolve org.apache.yetus:audience-annotations:0.11.0.
>   Required by:
>   project : > org.apache.parquet:parquet-common:1.11.0-SNAPSHOT
>> Could not resolve org.apache.yetus:audience-annotations:0.11.0.
>   > Could not parse POM 
> /Users/pbagrecha/.m2/repository/org/apache/yetus/audience-annotations/0.11.0/audience-annotations-0.11.0.pom
>  > Unable to resolve version for dependency 'jdk.tools:jdk.tools:jar'
>> Could not resolve org.apache.yetus:audience-annotations:0.11.0.
>   > Could not parse POM 
> https://repo1.maven.org/maven2/org/apache/yetus/audience-annotations/0.11.0/audience-annotations-0.11.0.pom
>  > Unable to resolve version for dependency 'jdk.tools:jdk.tools:jar'
>> Could not resolve org.apache.yetus:audience-annotations:0.11.0.
>   > Could not parse POM 
> https://jcenter.bintray.com/org/apache/yetus/audience-annotations/0.11.0/audience-annotations-0.11.0.pom
>  > Unable to resolve version for dependency 
> 'jdk.tools:jdk.tools:jar'{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Filtering GitBox e-mails out of dev@?

2020-04-20 Thread Wes McKinney
Infra made some changes to ensure that GitHub notifications are
archived, but that has resulted in new e-mails being sent to dev@

In Arrow, we didn't want these so we have

* https://issues.apache.org/jira/browse/INFRA-20149
* https://issues.apache.org/jira/browse/ARROW-8520
* Final solution:
https://github.com/apache/arrow/commit/aa55967e6b9cf6fc8b4d2f6ac9ec75f8c28c80f5

You may want to implement the same thing for apache/parquet-mr

- Wes


[jira] [Commented] (PARQUET-1841) [C++] Experiment to see if using SIMD shuffle operations for DecodeSpaced improves performance

2020-04-20 Thread Frank Du (Jira)


[ 
https://issues.apache.org/jira/browse/PARQUET-1841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17087475#comment-17087475
 ] 

Frank Du commented on PARQUET-1841:
---

I wrote a draft implementation for AVX512 int32_t/int64_t path using 
mask_expand_32/mask_expand_64, seems it's working as it pass all exited test 
units. But I don't find one bench-marking which based on the decode spaced API, 
can you point me? And I will work on the SSE part then.

> [C++] Experiment to see if using SIMD shuffle operations for DecodeSpaced 
> improves performance
> --
>
> Key: PARQUET-1841
> URL: https://issues.apache.org/jira/browse/PARQUET-1841
> Project: Parquet
>  Issue Type: Improvement
>  Components: parquet-cpp
>Reporter: Micah Kornfield
>Assignee: Micah Kornfield
>Priority: Major
> Attachments: image-2020-04-14-15-01-48-222.png
>
>
> Followup from PARQUET-1840 for current benchmarks it seems that doing 
> removing the memset somehow either has no impact or is slightly worse.  We 
> should investigate using SIMD operations to speed up spacing. 
>  
> As part of this we can see if moving the memset to only cover uninitialized 
> values after moving all required values provides any speedup.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)