[jira] [Commented] (NIFI-5706) Processor ConvertAvroToParquet

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661079#comment-16661079
 ] 

ASF GitHub Bot commented on NIFI-5706:
--

Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/3079


> Processor ConvertAvroToParquet 
> ---
>
> Key: NIFI-5706
> URL: https://issues.apache.org/jira/browse/NIFI-5706
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.7.1
>Reporter: Mohit
>Assignee: Bryan Bende
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.9.0
>
>
> *Why*?
> PutParquet support is limited to HDFS. 
> PutParquet bypasses the _flowfile_ implementation and writes the file 
> directly to sink. 
> We need a processor for parquet that works like _ConvertAvroToOrc_.
> *What*?
> _ConvertAvroToParquet_ will convert the incoming avro flowfile to a parquet 
> flowfile. Unlike PutParquet, which writes to the hdfs file system, processor 
> ConvertAvroToParquet would write into the flowfile, which can be pipelined to 
> put into other sinks, like _local_, _S3, Azure data lake_ etc.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5706) Processor ConvertAvroToParquet

2018-10-23 Thread ASF subversion and git services (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661077#comment-16661077
 ] 

ASF subversion and git services commented on NIFI-5706:
---

Commit e45584d0fb8e19f6913fb25a8db3ce8303b73cd0 in nifi's branch 
refs/heads/master from [~mgyaar]
[ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=e45584d ]

NIFI-5706 Added ConvertAvroToParquet processor
- Refactored code : ParquetBuilderProperties merged in ParquetUtils

This closes #3079.

Signed-off-by: Bryan Bende 


> Processor ConvertAvroToParquet 
> ---
>
> Key: NIFI-5706
> URL: https://issues.apache.org/jira/browse/NIFI-5706
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.7.1
>Reporter: Mohit
>Assignee: Bryan Bende
>Priority: Major
>  Labels: pull-request-available
>
> *Why*?
> PutParquet support is limited to HDFS. 
> PutParquet bypasses the _flowfile_ implementation and writes the file 
> directly to sink. 
> We need a processor for parquet that works like _ConvertAvroToOrc_.
> *What*?
> _ConvertAvroToParquet_ will convert the incoming avro flowfile to a parquet 
> flowfile. Unlike PutParquet, which writes to the hdfs file system, processor 
> ConvertAvroToParquet would write into the flowfile, which can be pipelined to 
> put into other sinks, like _local_, _S3, Azure data lake_ etc.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5706) Processor ConvertAvroToParquet

2018-10-23 Thread Bryan Bende (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16661075#comment-16661075
 ] 

Bryan Bende commented on NIFI-5706:
---

LGTM +1, going to merge and then I'll look into using this approach for a 
record writer

> Processor ConvertAvroToParquet 
> ---
>
> Key: NIFI-5706
> URL: https://issues.apache.org/jira/browse/NIFI-5706
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.7.1
>Reporter: Mohit
>Assignee: Bryan Bende
>Priority: Major
>  Labels: pull-request-available
>
> *Why*?
> PutParquet support is limited to HDFS. 
> PutParquet bypasses the _flowfile_ implementation and writes the file 
> directly to sink. 
> We need a processor for parquet that works like _ConvertAvroToOrc_.
> *What*?
> _ConvertAvroToParquet_ will convert the incoming avro flowfile to a parquet 
> flowfile. Unlike PutParquet, which writes to the hdfs file system, processor 
> ConvertAvroToParquet would write into the flowfile, which can be pipelined to 
> put into other sinks, like _local_, _S3, Azure data lake_ etc.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5706) Processor ConvertAvroToParquet

2018-10-23 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16660707#comment-16660707
 ] 

ASF GitHub Bot commented on NIFI-5706:
--

Github user mohitgargk commented on the issue:

https://github.com/apache/nifi/pull/3079
  
@bbende Make sense ! I have merged the ParquetBuilderProperties merged in 
ParquetUtils in the latest commit. 


> Processor ConvertAvroToParquet 
> ---
>
> Key: NIFI-5706
> URL: https://issues.apache.org/jira/browse/NIFI-5706
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.7.1
>Reporter: Mohit
>Priority: Major
>  Labels: pull-request-available
>
> *Why*?
> PutParquet support is limited to HDFS. 
> PutParquet bypasses the _flowfile_ implementation and writes the file 
> directly to sink. 
> We need a processor for parquet that works like _ConvertAvroToOrc_.
> *What*?
> _ConvertAvroToParquet_ will convert the incoming avro flowfile to a parquet 
> flowfile. Unlike PutParquet, which writes to the hdfs file system, processor 
> ConvertAvroToParquet would write into the flowfile, which can be pipelined to 
> put into other sinks, like _local_, _S3, Azure data lake_ etc.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5706) Processor ConvertAvroToParquet

2018-10-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659681#comment-16659681
 ] 

ASF GitHub Bot commented on NIFI-5706:
--

Github user bbende commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3079#discussion_r227125269
  
--- Diff: 
nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/utils/ParquetBuilderProperties.java
 ---
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.parquet.utils;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public interface ParquetBuilderProperties {
--- End diff --

I know this is a little bit nit-picky, but instead of having the processors 
implement this interface, could we have the processors reference the properties 
statically? We could even move these into ParquetUtils so there is just one 
utility class for all common code. I think that would follow more of the 
existing patterns in other processors.


> Processor ConvertAvroToParquet 
> ---
>
> Key: NIFI-5706
> URL: https://issues.apache.org/jira/browse/NIFI-5706
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.7.1
>Reporter: Mohit
>Priority: Major
>  Labels: pull-request-available
>
> *Why*?
> PutParquet support is limited to HDFS. 
> PutParquet bypasses the _flowfile_ implementation and writes the file 
> directly to sink. 
> We need a processor for parquet that works like _ConvertAvroToOrc_.
> *What*?
> _ConvertAvroToParquet_ will convert the incoming avro flowfile to a parquet 
> flowfile. Unlike PutParquet, which writes to the hdfs file system, processor 
> ConvertAvroToParquet would write into the flowfile, which can be pipelined to 
> put into other sinks, like _local_, _S3, Azure data lake_ etc.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5706) Processor ConvertAvroToParquet

2018-10-22 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659399#comment-16659399
 ] 

ASF GitHub Bot commented on NIFI-5706:
--

Github user bbende commented on the issue:

https://github.com/apache/nifi/pull/3079
  
Busy with some other things at the moment, but will take a look when I get 
a chance.


> Processor ConvertAvroToParquet 
> ---
>
> Key: NIFI-5706
> URL: https://issues.apache.org/jira/browse/NIFI-5706
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.7.1
>Reporter: Mohit
>Priority: Major
>  Labels: pull-request-available
>
> *Why*?
> PutParquet support is limited to HDFS. 
> PutParquet bypasses the _flowfile_ implementation and writes the file 
> directly to sink. 
> We need a processor for parquet that works like _ConvertAvroToOrc_.
> *What*?
> _ConvertAvroToParquet_ will convert the incoming avro flowfile to a parquet 
> flowfile. Unlike PutParquet, which writes to the hdfs file system, processor 
> ConvertAvroToParquet would write into the flowfile, which can be pipelined to 
> put into other sinks, like _local_, _S3, Azure data lake_ etc.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5706) Processor ConvertAvroToParquet

2018-10-21 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16658133#comment-16658133
 ] 

ASF GitHub Bot commented on NIFI-5706:
--

Github user mohitgargk commented on the issue:

https://github.com/apache/nifi/pull/3079
  
@bbende i have made the suggested changes. Please review.


> Processor ConvertAvroToParquet 
> ---
>
> Key: NIFI-5706
> URL: https://issues.apache.org/jira/browse/NIFI-5706
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.7.1
>Reporter: Mohit
>Priority: Major
>  Labels: pull-request-available
>
> *Why*?
> PutParquet support is limited to HDFS. 
> PutParquet bypasses the _flowfile_ implementation and writes the file 
> directly to sink. 
> We need a processor for parquet that works like _ConvertAvroToOrc_.
> *What*?
> _ConvertAvroToParquet_ will convert the incoming avro flowfile to a parquet 
> flowfile. Unlike PutParquet, which writes to the hdfs file system, processor 
> ConvertAvroToParquet would write into the flowfile, which can be pipelined to 
> put into other sinks, like _local_, _S3, Azure data lake_ etc.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5706) Processor ConvertAvroToParquet

2018-10-19 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16656333#comment-16656333
 ] 

ASF GitHub Bot commented on NIFI-5706:
--

Github user mohitgargk commented on the issue:

https://github.com/apache/nifi/pull/3079
  
> What is the use case for having Parquet files flow through NiFi?
I have a use case where I am writing the parquet files into data lake (in 
azure) and also need to write them on a network storage device. The intent here 
is keep the parquet in flowfile, so that it can flow into any sink later on. If 
we don't have Convert*ToParquet (or similar processors), we would require a hop 
(an i/o endpoint) that may not give good performance. 




> Processor ConvertAvroToParquet 
> ---
>
> Key: NIFI-5706
> URL: https://issues.apache.org/jira/browse/NIFI-5706
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.7.1
>Reporter: Mohit
>Priority: Major
>  Labels: pull-request-available
>
> *Why*?
> PutParquet support is limited to HDFS. 
> PutParquet bypasses the _flowfile_ implementation and writes the file 
> directly to sink. 
> We need a processor for parquet that works like _ConvertAvroToOrc_.
> *What*?
> _ConvertAvroToParquet_ will convert the incoming avro flowfile to a parquet 
> flowfile. Unlike PutParquet, which writes to the hdfs file system, processor 
> ConvertAvroToParquet would write into the flowfile, which can be pipelined to 
> put into other sinks, like _local_, _S3, Azure data lake_ etc.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5706) Processor ConvertAvroToParquet

2018-10-18 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16655455#comment-16655455
 ] 

ASF GitHub Bot commented on NIFI-5706:
--

Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/3079
  
What is the use case for having Parquet files flow through NiFi? I wrote 
ConvertAvroToORC and every time I saw it used, it was immediately followed by 
PutHDFS. So I followed the PutParquet lead and wrote a PutORC for the Hive 3 
NAR. We can't currently do anything with a Parquet file (or ORC for that 
matter) in NiFi, so just curious as to how you envision it being used.

Also I wonder if a ParquetRecordWriter might be a better idea? It would do 
the same thing as a processor but the record processors can read in something 
in any format, not just Avro. This was another impetus for having PutORC 
instead of ConvertAvroToORC.


> Processor ConvertAvroToParquet 
> ---
>
> Key: NIFI-5706
> URL: https://issues.apache.org/jira/browse/NIFI-5706
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.7.1
>Reporter: Mohit
>Priority: Major
>  Labels: pull-request-available
>
> *Why*?
> PutParquet support is limited to HDFS. 
> PutParquet bypasses the _flowfile_ implementation and writes the file 
> directly to sink. 
> We need a processor for parquet that works like _ConvertAvroToOrc_.
> *What*?
> _ConvertAvroToParquet_ will convert the incoming avro flowfile to a parquet 
> flowfile. Unlike PutParquet, which writes to the hdfs file system, processor 
> ConvertAvroToParquet would write into the flowfile, which can be pipelined to 
> put into other sinks, like _local_, _S3, Azure data lake_ etc.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5706) Processor ConvertAvroToParquet

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16653978#comment-16653978
 ] 

ASF GitHub Bot commented on NIFI-5706:
--

Github user mohitgargk commented on the issue:

https://github.com/apache/nifi/pull/3079
  
thanks @bbende for the feedback, will work on the fix tomorrow. 


> Processor ConvertAvroToParquet 
> ---
>
> Key: NIFI-5706
> URL: https://issues.apache.org/jira/browse/NIFI-5706
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.7.1
>Reporter: Mohit
>Priority: Major
>  Labels: pull-request-available
>
> *Why*?
> PutParquet support is limited to HDFS. 
> PutParquet bypasses the _flowfile_ implementation and writes the file 
> directly to sink. 
> We need a processor for parquet that works like _ConvertAvroToOrc_.
> *What*?
> _ConvertAvroToParquet_ will convert the incoming avro flowfile to a parquet 
> flowfile. Unlike PutParquet, which writes to the hdfs file system, processor 
> ConvertAvroToParquet would write into the flowfile, which can be pipelined to 
> put into other sinks, like _local_, _S3, Azure data lake_ etc.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5706) Processor ConvertAvroToParquet

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16653904#comment-16653904
 ] 

ASF GitHub Bot commented on NIFI-5706:
--

Github user bbende commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3079#discussion_r226018942
  
--- Diff: 
nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java
 ---
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.parquet;
+
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.parquet.stream.NifiParquetOutputFile;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Tags({"avro", "parquet", "convert"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Converts Avro records into Parquet file format. 
The incoming FlowFile should be a valid avro file. If an incoming FlowFile does 
"
++ "not contain any records, an empty parquet file is the output. 
NOTE: Many Avro datatypes (collections, primitives, and unions of primitives, 
e.g.) can "
++ "be converted to parquet, but unions of collections and other 
complex datatypes may not be able to be converted to Parquet.")
+@WritesAttributes({
+@WritesAttribute(attribute = "filename", description = "Sets the 
filename to the existing filename with the extension replaced by / added to by 
.parquet"),
+@WritesAttribute(attribute = "record.count", description = "Sets 
the number of records in the parquet file.")
+})
+public class ConvertAvroToParquet extends AbstractProcessor {
+
+// Attributes
+public static final String FILE_NAME_ATTRIBUTE = "file.name";
+public static final String RECORD_COUNT_ATTRIBUTE = "record.count";
+
+public static final List COMPRESSION_TYPES;
+static {
+final List compressionTypes = new ArrayList<>();
+for 

[jira] [Commented] (NIFI-5706) Processor ConvertAvroToParquet

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16653908#comment-16653908
 ] 

ASF GitHub Bot commented on NIFI-5706:
--

Github user bbende commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3079#discussion_r226020300
  
--- Diff: 
nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java
 ---
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.parquet;
+
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.parquet.stream.NifiParquetOutputFile;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Tags({"avro", "parquet", "convert"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Converts Avro records into Parquet file format. 
The incoming FlowFile should be a valid avro file. If an incoming FlowFile does 
"
++ "not contain any records, an empty parquet file is the output. 
NOTE: Many Avro datatypes (collections, primitives, and unions of primitives, 
e.g.) can "
++ "be converted to parquet, but unions of collections and other 
complex datatypes may not be able to be converted to Parquet.")
+@WritesAttributes({
+@WritesAttribute(attribute = "filename", description = "Sets the 
filename to the existing filename with the extension replaced by / added to by 
.parquet"),
+@WritesAttribute(attribute = "record.count", description = "Sets 
the number of records in the parquet file.")
+})
+public class ConvertAvroToParquet extends AbstractProcessor {
+
+// Attributes
+public static final String FILE_NAME_ATTRIBUTE = "file.name";
+public static final String RECORD_COUNT_ATTRIBUTE = "record.count";
+
+public static final List COMPRESSION_TYPES;
+static {
+final List compressionTypes = new ArrayList<>();
+for 

[jira] [Commented] (NIFI-5706) Processor ConvertAvroToParquet

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16653906#comment-16653906
 ] 

ASF GitHub Bot commented on NIFI-5706:
--

Github user bbende commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3079#discussion_r226019319
  
--- Diff: 
nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java
 ---
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.parquet;
+
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.parquet.stream.NifiParquetOutputFile;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Tags({"avro", "parquet", "convert"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Converts Avro records into Parquet file format. 
The incoming FlowFile should be a valid avro file. If an incoming FlowFile does 
"
++ "not contain any records, an empty parquet file is the output. 
NOTE: Many Avro datatypes (collections, primitives, and unions of primitives, 
e.g.) can "
++ "be converted to parquet, but unions of collections and other 
complex datatypes may not be able to be converted to Parquet.")
+@WritesAttributes({
+@WritesAttribute(attribute = "filename", description = "Sets the 
filename to the existing filename with the extension replaced by / added to by 
.parquet"),
+@WritesAttribute(attribute = "record.count", description = "Sets 
the number of records in the parquet file.")
+})
+public class ConvertAvroToParquet extends AbstractProcessor {
+
+// Attributes
+public static final String FILE_NAME_ATTRIBUTE = "file.name";
+public static final String RECORD_COUNT_ATTRIBUTE = "record.count";
+
+public static final List COMPRESSION_TYPES;
+static {
+final List compressionTypes = new ArrayList<>();
+for 

[jira] [Commented] (NIFI-5706) Processor ConvertAvroToParquet

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16653903#comment-16653903
 ] 

ASF GitHub Bot commented on NIFI-5706:
--

Github user bbende commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3079#discussion_r226017678
  
--- Diff: 
nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java
 ---
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.parquet;
+
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.parquet.stream.NifiParquetOutputFile;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Tags({"avro", "parquet", "convert"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Converts Avro records into Parquet file format. 
The incoming FlowFile should be a valid avro file. If an incoming FlowFile does 
"
++ "not contain any records, an empty parquet file is the output. 
NOTE: Many Avro datatypes (collections, primitives, and unions of primitives, 
e.g.) can "
++ "be converted to parquet, but unions of collections and other 
complex datatypes may not be able to be converted to Parquet.")
+@WritesAttributes({
+@WritesAttribute(attribute = "filename", description = "Sets the 
filename to the existing filename with the extension replaced by / added to by 
.parquet"),
+@WritesAttribute(attribute = "record.count", description = "Sets 
the number of records in the parquet file.")
+})
+public class ConvertAvroToParquet extends AbstractProcessor {
+
+// Attributes
+public static final String FILE_NAME_ATTRIBUTE = "file.name";
--- End diff --

This appears to be unused.


> Processor ConvertAvroToParquet 
> ---
>
> Key: NIFI-5706
> URL: https://issues.apache.org/jira/browse/NIFI-5706
> 

[jira] [Commented] (NIFI-5706) Processor ConvertAvroToParquet

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16653905#comment-16653905
 ] 

ASF GitHub Bot commented on NIFI-5706:
--

Github user bbende commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3079#discussion_r226017591
  
--- Diff: 
nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java
 ---
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.parquet;
+
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.parquet.stream.NifiParquetOutputFile;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Tags({"avro", "parquet", "convert"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Converts Avro records into Parquet file format. 
The incoming FlowFile should be a valid avro file. If an incoming FlowFile does 
"
++ "not contain any records, an empty parquet file is the output. 
NOTE: Many Avro datatypes (collections, primitives, and unions of primitives, 
e.g.) can "
++ "be converted to parquet, but unions of collections and other 
complex datatypes may not be able to be converted to Parquet.")
+@WritesAttributes({
+@WritesAttribute(attribute = "filename", description = "Sets the 
filename to the existing filename with the extension replaced by / added to by 
.parquet"),
+@WritesAttribute(attribute = "record.count", description = "Sets 
the number of records in the parquet file.")
+})
+public class ConvertAvroToParquet extends AbstractProcessor {
+
+// Attributes
+public static final String FILE_NAME_ATTRIBUTE = "file.name";
+public static final String RECORD_COUNT_ATTRIBUTE = "record.count";
+
+public static final List COMPRESSION_TYPES;
+static {
+final List compressionTypes = new ArrayList<>();
+for 

[jira] [Commented] (NIFI-5706) Processor ConvertAvroToParquet

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16653907#comment-16653907
 ] 

ASF GitHub Bot commented on NIFI-5706:
--

Github user bbende commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3079#discussion_r226021329
  
--- Diff: 
nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java
 ---
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.parquet;
+
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.parquet.stream.NifiParquetOutputFile;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Tags({"avro", "parquet", "convert"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Converts Avro records into Parquet file format. 
The incoming FlowFile should be a valid avro file. If an incoming FlowFile does 
"
++ "not contain any records, an empty parquet file is the output. 
NOTE: Many Avro datatypes (collections, primitives, and unions of primitives, 
e.g.) can "
++ "be converted to parquet, but unions of collections and other 
complex datatypes may not be able to be converted to Parquet.")
+@WritesAttributes({
+@WritesAttribute(attribute = "filename", description = "Sets the 
filename to the existing filename with the extension replaced by / added to by 
.parquet"),
+@WritesAttribute(attribute = "record.count", description = "Sets 
the number of records in the parquet file.")
+})
+public class ConvertAvroToParquet extends AbstractProcessor {
+
+// Attributes
+public static final String FILE_NAME_ATTRIBUTE = "file.name";
+public static final String RECORD_COUNT_ATTRIBUTE = "record.count";
+
+public static final List COMPRESSION_TYPES;
+static {
+final List compressionTypes = new ArrayList<>();
+for 

[jira] [Commented] (NIFI-5706) Processor ConvertAvroToParquet

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16653556#comment-16653556
 ] 

ASF GitHub Bot commented on NIFI-5706:
--

Github user bbende commented on the issue:

https://github.com/apache/nifi/pull/3079
  
@MikeThomsen the advantage is not having to write the parquet out to local 
disk somewhere, and then have a disconnected flow where another part reads it 
back in. With this approach the data stays in NiFi's repositories and the 
ConvertAvroToParquet can be directly connected to the next processor.

I will try to give this a review in the coming days... Since this processor 
is already developed, the best approach here maybe to commit this as is 
(assuming the processor works as expected) and then afterwards refactor the 
utility classes into a nifi-parquet-utils under 
nifi-nar-bundles/nifi-extension-utils and this way the code can be shared by 
this processor and the eventual record writer.


> Processor ConvertAvroToParquet 
> ---
>
> Key: NIFI-5706
> URL: https://issues.apache.org/jira/browse/NIFI-5706
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.7.1
>Reporter: Mohit
>Priority: Major
>  Labels: pull-request-available
>
> *Why*?
> PutParquet support is limited to HDFS. 
> PutParquet bypasses the _flowfile_ implementation and writes the file 
> directly to sink. 
> We need a processor for parquet that works like _ConvertAvroToOrc_.
> *What*?
> _ConvertAvroToParquet_ will convert the incoming avro flowfile to a parquet 
> flowfile. Unlike PutParquet, which writes to the hdfs file system, processor 
> ConvertAvroToParquet would write into the flowfile, which can be pipelined to 
> put into other sinks, like _local_, _S3, Azure data lake_ etc.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5706) Processor ConvertAvroToParquet

2018-10-17 Thread Ed Berezitsky (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16653417#comment-16653417
 ] 

Ed Berezitsky commented on NIFI-5706:
-

+1 for Parquet Record Writer

Pros: applicable to any format having provided (or scripted) record reader. 

> Processor ConvertAvroToParquet 
> ---
>
> Key: NIFI-5706
> URL: https://issues.apache.org/jira/browse/NIFI-5706
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.7.1
>Reporter: Mohit
>Priority: Major
>  Labels: pull-request-available
>
> *Why*?
> PutParquet support is limited to HDFS. 
> PutParquet bypasses the _flowfile_ implementation and writes the file 
> directly to sink. 
> We need a processor for parquet that works like _ConvertAvroToOrc_.
> *What*?
> _ConvertAvroToParquet_ will convert the incoming avro flowfile to a parquet 
> flowfile. Unlike PutParquet, which writes to the hdfs file system, processor 
> ConvertAvroToParquet would write into the flowfile, which can be pipelined to 
> put into other sinks, like _local_, _S3, Azure data lake_ etc.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5706) Processor ConvertAvroToParquet

2018-10-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16653391#comment-16653391
 ] 

ASF GitHub Bot commented on NIFI-5706:
--

Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/3079
  
From Jira:

> PutParquet support is limited to HDFS. 

It's actually not. You can write it to the local file system by supplying a 
Hadoop configuration that tells it to use that.

It's not clear to me that this differs fundamentally from PutParquet other 
than the output target. Am I missing something?


> Processor ConvertAvroToParquet 
> ---
>
> Key: NIFI-5706
> URL: https://issues.apache.org/jira/browse/NIFI-5706
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.7.1
>Reporter: Mohit
>Priority: Major
>  Labels: pull-request-available
>
> *Why*?
> PutParquet support is limited to HDFS. 
> PutParquet bypasses the _flowfile_ implementation and writes the file 
> directly to sink. 
> We need a processor for parquet that works like _ConvertAvroToOrc_.
> *What*?
> _ConvertAvroToParquet_ will convert the incoming avro flowfile to a parquet 
> flowfile. Unlike PutParquet, which writes to the hdfs file system, processor 
> ConvertAvroToParquet would write into the flowfile, which can be pipelined to 
> put into other sinks, like _local_, _S3, Azure data lake_ etc.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5706) Processor ConvertAvroToParquet

2018-10-16 Thread Mohit (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16652223#comment-16652223
 ] 

Mohit commented on NIFI-5706:
-

[~pvillard] [~bende], to be honest I did not look at ConvertRecord as a 
possible point for ParquetWriter. I found ConvertAvroToParquet like operator 
would make sense, which is highly inspired from ConvertAvroToOrc.

Its true that PositionOutputStream (way to write to OutputStream) is a fairly 
recent feature added by parquet. I found a similar implementation in [Apache 
Beam|https://github.com/apache/beam/blob/master/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java]
 project as well. 

I have been using ConvertAvroToParquet in my deployment for over a month. I 
found this to be quite handy and straightforward, so thought of contributing.

Let me know if you think this processor should best be part of ConvertRecord. I 
will start exploring it. 

 

> Processor ConvertAvroToParquet 
> ---
>
> Key: NIFI-5706
> URL: https://issues.apache.org/jira/browse/NIFI-5706
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.7.1
>Reporter: Mohit
>Priority: Major
>  Labels: pull-request-available
>
> *Why*?
> PutParquet support is limited to HDFS. 
> PutParquet bypasses the _flowfile_ implementation and writes the file 
> directly to sink. 
> We need a processor for parquet that works like _ConvertAvroToOrc_.
> *What*?
> _ConvertAvroToParquet_ will convert the incoming avro flowfile to a parquet 
> flowfile. Unlike PutParquet, which writes to the hdfs file system, processor 
> ConvertAvroToParquet would write into the flowfile, which can be pipelined to 
> put into other sinks, like _local_, _S3, Azure data lake_ etc.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5706) Processor ConvertAvroToParquet

2018-10-16 Thread Bryan Bende (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16652138#comment-16652138
 ] 

Bryan Bende commented on NIFI-5706:
---

I originally implemented PutParquet which allowed a record reader and then 
required writing to HDFS, and I did this because I didn't think it was possible 
to write Parquet to the OutputStream of a flow file, since Parquet's whole API 
is based on the Hadoop Filesystem object.

However, the approach in this PR is quite interesting and shows that it might 
actually be possible!! Nice work.

I do agree with Pierre's comment, that if we can take this approach and instead 
implement a Parquet record writer, we can then use ConvertRecord to convert any 
format to Parquet.

> Processor ConvertAvroToParquet 
> ---
>
> Key: NIFI-5706
> URL: https://issues.apache.org/jira/browse/NIFI-5706
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.7.1
>Reporter: Mohit
>Priority: Major
>  Labels: pull-request-available
>
> *Why*?
> PutParquet support is limited to HDFS. 
> PutParquet bypasses the _flowfile_ implementation and writes the file 
> directly to sink. 
> We need a processor for parquet that works like _ConvertAvroToOrc_.
> *What*?
> _ConvertAvroToParquet_ will convert the incoming avro flowfile to a parquet 
> flowfile. Unlike PutParquet, which writes to the hdfs file system, processor 
> ConvertAvroToParquet would write into the flowfile, which can be pipelined to 
> put into other sinks, like _local_, _S3, Azure data lake_ etc.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5706) Processor ConvertAvroToParquet

2018-10-16 Thread Pierre Villard (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651573#comment-16651573
 ] 

Pierre Villard commented on NIFI-5706:
--

Didn't look at the code so my question could be dumb: isn't it possible to 
create a Record Writer for Parquet? It'd much more flexible than forcing Avro 
to Parquet conversion only.

> Processor ConvertAvroToParquet 
> ---
>
> Key: NIFI-5706
> URL: https://issues.apache.org/jira/browse/NIFI-5706
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.7.1
>Reporter: Mohit
>Priority: Major
>  Labels: pull-request-available
>
> *Why*?
> PutParquet support is limited to HDFS. 
> PutParquet bypasses the _flowfile_ implementation and writes the file 
> directly to sink. 
> We need a processor for parquet that works like _ConvertAvroToOrc_.
> *What*?
> _ConvertAvroToParquet_ will convert the incoming avro flowfile to a parquet 
> flowfile. Unlike PutParquet, which writes to the hdfs file system, processor 
> ConvertAvroToParquet would write into the flowfile, which can be pipelined to 
> put into other sinks, like _local_, _S3, Azure data lake_ etc.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5706) Processor ConvertAvroToParquet

2018-10-16 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/NIFI-5706?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651214#comment-16651214
 ] 

ASF GitHub Bot commented on NIFI-5706:
--

GitHub user mohitgargk opened a pull request:

https://github.com/apache/nifi/pull/3079

NIFI-5706 : Added ConvertAvroToParquet processor

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [ ] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [ ] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mohitgargk/nifi NIFI-5706

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/3079.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3079


commit 18fa9bf070fda41fc78f5399757eb622b4049776
Author: Mohit Garg 
Date:   2018-10-16T06:10:14Z

Added ConvertAvroToParquet processor




> Processor ConvertAvroToParquet 
> ---
>
> Key: NIFI-5706
> URL: https://issues.apache.org/jira/browse/NIFI-5706
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.7.1
>Reporter: Mohit
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>
> *Why*?
> PutParquet support is limited to HDFS. 
> PutParquet bypasses the _flowfile_ implementation and writes the file 
> directly to sink. 
> We need a processor for parquet that works like _ConvertAvroToOrc_.
> *What*?
> _ConvertAvroToParquet_ will convert the incoming avro flowfile to a parquet 
> flowfile. Unlike PutParquet, which writes to the hdfs file system, processor 
> ConvertAvroToParquet would write into the flowfile, which can be pipelined to 
> put into other sinks, like _local_, _S3, Azure data lake_ etc.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)