[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-05-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009723#comment-16009723
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/420


> Create QueryFlowFile Processor
> --
>
> Key: NIFI-1280
> URL: https://issues.apache.org/jira/browse/NIFI-1280
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Reporter: Mark Payne
>Assignee: Mark Payne
> Fix For: 1.2.0
>
> Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml
>
>
> We should have a Processor that allows users to easily filter out specific 
> columns from CSV data. For instance, a user would configure two different 
> properties: "Columns of Interest" (a comma-separated list of column indexes) 
> and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it 
> would be with this Processor, as the user has to use Regular Expressions, 
> etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a 
> Sample CSV and choose which columns from there, similar to the way that Excel 
> works when importing CSV by dragging and selecting the desired columns? That 
> would certainly be a larger undertaking and would not need to be done for an 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15965135#comment-15965135
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/1652


> Create QueryFlowFile Processor
> --
>
> Key: NIFI-1280
> URL: https://issues.apache.org/jira/browse/NIFI-1280
> Project: Apache NiFi
>  Issue Type: Task
>  Components: Extensions
>Reporter: Mark Payne
>Assignee: Mark Payne
> Fix For: 1.2.0
>
> Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml
>
>
> We should have a Processor that allows users to easily filter out specific 
> columns from CSV data. For instance, a user would configure two different 
> properties: "Columns of Interest" (a comma-separated list of column indexes) 
> and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it 
> would be with this Processor, as the user has to use Regular Expressions, 
> etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a 
> Sample CSV and choose which columns from there, similar to the way that Excel 
> works when importing CSV by dragging and selecting the desired columns? That 
> would certainly be a larger undertaking and would not need to be done for an 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-11 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15965137#comment-15965137
 ] 

ASF subversion and git services commented on NIFI-1280:
---

Commit 68c592ea43d30754ec07c42cf10563fe9db185ae in nifi's branch 
refs/heads/master from [~ozhurakousky]
[ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=68c592e ]

NIFI-1280 added support for RecordSchema in SchemaRegistry

Signed-off-by: Mark Payne 
Signed-off-by: Matt Burgess 

NIFI-1280: Updated SimpleKeyValueSchemaRegistry to make use of new CHOICE 
RecordFieldType - Update Record Readers to use SchemaRegistry controller 
service. Moved SchemaRegistry api into its own maven module and added to 
standard-services-api so that we can properly add dependencies on it. Code 
cleanup and bug fixes

Signed-off-by: Matt Burgess 

NIFI-1280: Fixed checkstyle violations and license exclusions for RAT plugin

Signed-off-by: Matt Burgess 

NIFI-1280: Addressed feedback from PR Review

Signed-off-by: Matt Burgess 

NIFI-1280: Additional changes/doc to support QueryFlowFile and Record 
Readers/Writers

This closes #1652


> Create QueryFlowFile Processor
> --
>
> Key: NIFI-1280
> URL: https://issues.apache.org/jira/browse/NIFI-1280
> Project: Apache NiFi
>  Issue Type: Task
>  Components: Extensions
>Reporter: Mark Payne
>Assignee: Mark Payne
> Fix For: 1.2.0
>
> Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml
>
>
> We should have a Processor that allows users to easily filter out specific 
> columns from CSV data. For instance, a user would configure two different 
> properties: "Columns of Interest" (a comma-separated list of column indexes) 
> and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it 
> would be with this Processor, as the user has to use Regular Expressions, 
> etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a 
> Sample CSV and choose which columns from there, similar to the way that Excel 
> works when importing CSV by dragging and selecting the desired columns? That 
> would certainly be a larger undertaking and would not need to be done for an 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-11 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15965134#comment-15965134
 ] 

ASF subversion and git services commented on NIFI-1280:
---

Commit 68c592ea43d30754ec07c42cf10563fe9db185ae in nifi's branch 
refs/heads/master from [~ozhurakousky]
[ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=68c592e ]

NIFI-1280 added support for RecordSchema in SchemaRegistry

Signed-off-by: Mark Payne 
Signed-off-by: Matt Burgess 

NIFI-1280: Updated SimpleKeyValueSchemaRegistry to make use of new CHOICE 
RecordFieldType - Update Record Readers to use SchemaRegistry controller 
service. Moved SchemaRegistry api into its own maven module and added to 
standard-services-api so that we can properly add dependencies on it. Code 
cleanup and bug fixes

Signed-off-by: Matt Burgess 

NIFI-1280: Fixed checkstyle violations and license exclusions for RAT plugin

Signed-off-by: Matt Burgess 

NIFI-1280: Addressed feedback from PR Review

Signed-off-by: Matt Burgess 

NIFI-1280: Additional changes/doc to support QueryFlowFile and Record 
Readers/Writers

This closes #1652


> Create QueryFlowFile Processor
> --
>
> Key: NIFI-1280
> URL: https://issues.apache.org/jira/browse/NIFI-1280
> Project: Apache NiFi
>  Issue Type: Task
>  Components: Extensions
>Reporter: Mark Payne
>Assignee: Mark Payne
> Fix For: 1.2.0
>
> Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml
>
>
> We should have a Processor that allows users to easily filter out specific 
> columns from CSV data. For instance, a user would configure two different 
> properties: "Columns of Interest" (a comma-separated list of column indexes) 
> and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it 
> would be with this Processor, as the user has to use Regular Expressions, 
> etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a 
> Sample CSV and choose which columns from there, similar to the way that Excel 
> works when importing CSV by dragging and selecting the desired columns? That 
> would certainly be a larger undertaking and would not need to be done for an 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-11 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15965136#comment-15965136
 ] 

ASF subversion and git services commented on NIFI-1280:
---

Commit 68c592ea43d30754ec07c42cf10563fe9db185ae in nifi's branch 
refs/heads/master from [~ozhurakousky]
[ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=68c592e ]

NIFI-1280 added support for RecordSchema in SchemaRegistry

Signed-off-by: Mark Payne 
Signed-off-by: Matt Burgess 

NIFI-1280: Updated SimpleKeyValueSchemaRegistry to make use of new CHOICE 
RecordFieldType - Update Record Readers to use SchemaRegistry controller 
service. Moved SchemaRegistry api into its own maven module and added to 
standard-services-api so that we can properly add dependencies on it. Code 
cleanup and bug fixes

Signed-off-by: Matt Burgess 

NIFI-1280: Fixed checkstyle violations and license exclusions for RAT plugin

Signed-off-by: Matt Burgess 

NIFI-1280: Addressed feedback from PR Review

Signed-off-by: Matt Burgess 

NIFI-1280: Additional changes/doc to support QueryFlowFile and Record 
Readers/Writers

This closes #1652


> Create QueryFlowFile Processor
> --
>
> Key: NIFI-1280
> URL: https://issues.apache.org/jira/browse/NIFI-1280
> Project: Apache NiFi
>  Issue Type: Task
>  Components: Extensions
>Reporter: Mark Payne
>Assignee: Mark Payne
> Fix For: 1.2.0
>
> Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml
>
>
> We should have a Processor that allows users to easily filter out specific 
> columns from CSV data. For instance, a user would configure two different 
> properties: "Columns of Interest" (a comma-separated list of column indexes) 
> and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it 
> would be with this Processor, as the user has to use Regular Expressions, 
> etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a 
> Sample CSV and choose which columns from there, similar to the way that Excel 
> works when importing CSV by dragging and selecting the desired columns? That 
> would certainly be a larger undertaking and would not need to be done for an 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-11 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15965132#comment-15965132
 ] 

ASF subversion and git services commented on NIFI-1280:
---

Commit 68c592ea43d30754ec07c42cf10563fe9db185ae in nifi's branch 
refs/heads/master from [~ozhurakousky]
[ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=68c592e ]

NIFI-1280 added support for RecordSchema in SchemaRegistry

Signed-off-by: Mark Payne 
Signed-off-by: Matt Burgess 

NIFI-1280: Updated SimpleKeyValueSchemaRegistry to make use of new CHOICE 
RecordFieldType - Update Record Readers to use SchemaRegistry controller 
service. Moved SchemaRegistry api into its own maven module and added to 
standard-services-api so that we can properly add dependencies on it. Code 
cleanup and bug fixes

Signed-off-by: Matt Burgess 

NIFI-1280: Fixed checkstyle violations and license exclusions for RAT plugin

Signed-off-by: Matt Burgess 

NIFI-1280: Addressed feedback from PR Review

Signed-off-by: Matt Burgess 

NIFI-1280: Additional changes/doc to support QueryFlowFile and Record 
Readers/Writers

This closes #1652


> Create QueryFlowFile Processor
> --
>
> Key: NIFI-1280
> URL: https://issues.apache.org/jira/browse/NIFI-1280
> Project: Apache NiFi
>  Issue Type: Task
>  Components: Extensions
>Reporter: Mark Payne
>Assignee: Mark Payne
> Fix For: 1.2.0
>
> Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml
>
>
> We should have a Processor that allows users to easily filter out specific 
> columns from CSV data. For instance, a user would configure two different 
> properties: "Columns of Interest" (a comma-separated list of column indexes) 
> and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it 
> would be with this Processor, as the user has to use Regular Expressions, 
> etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a 
> Sample CSV and choose which columns from there, similar to the way that Excel 
> works when importing CSV by dragging and selecting the desired columns? That 
> would certainly be a larger undertaking and would not need to be done for an 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15965129#comment-15965129
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user mattyb149 commented on the issue:

https://github.com/apache/nifi/pull/1652
  
+1 LGTM ran a full build with unit tests and contrib-check, tried various 
RecordReaders and Writers and QueryFlowFile queries, great work!  Merging to 
master


> Create QueryFlowFile Processor
> --
>
> Key: NIFI-1280
> URL: https://issues.apache.org/jira/browse/NIFI-1280
> Project: Apache NiFi
>  Issue Type: Task
>  Components: Extensions
>Reporter: Mark Payne
>Assignee: Mark Payne
> Fix For: 1.2.0
>
> Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml
>
>
> We should have a Processor that allows users to easily filter out specific 
> columns from CSV data. For instance, a user would configure two different 
> properties: "Columns of Interest" (a comma-separated list of column indexes) 
> and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it 
> would be with this Processor, as the user has to use Regular Expressions, 
> etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a 
> Sample CSV and choose which columns from there, similar to the way that Excel 
> works when importing CSV by dragging and selecting the desired columns? That 
> would certainly be a larger undertaking and would not need to be done for an 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-11 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15965131#comment-15965131
 ] 

ASF subversion and git services commented on NIFI-1280:
---

Commit a88d3bfa3c53d9cbe375f2b89eaa9248eb92df29 in nifi's branch 
refs/heads/master from [~markap14]
[ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=a88d3bf ]

NIFI-1280: Refactoring to make more generic so that other data types can be 
supported; created InputStreams to content on-demand so that multiple passes 
can be made over FlowFile content if required. Created new Controller Services 
for reading and writing specific data types

Signed-off-by: Matt Burgess 


> Create QueryFlowFile Processor
> --
>
> Key: NIFI-1280
> URL: https://issues.apache.org/jira/browse/NIFI-1280
> Project: Apache NiFi
>  Issue Type: Task
>  Components: Extensions
>Reporter: Mark Payne
>Assignee: Mark Payne
> Fix For: 1.2.0
>
> Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml
>
>
> We should have a Processor that allows users to easily filter out specific 
> columns from CSV data. For instance, a user would configure two different 
> properties: "Columns of Interest" (a comma-separated list of column indexes) 
> and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it 
> would be with this Processor, as the user has to use Regular Expressions, 
> etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a 
> Sample CSV and choose which columns from there, similar to the way that Excel 
> works when importing CSV by dragging and selecting the desired columns? That 
> would certainly be a larger undertaking and would not need to be done for an 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-11 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15965133#comment-15965133
 ] 

ASF subversion and git services commented on NIFI-1280:
---

Commit 68c592ea43d30754ec07c42cf10563fe9db185ae in nifi's branch 
refs/heads/master from [~ozhurakousky]
[ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=68c592e ]

NIFI-1280 added support for RecordSchema in SchemaRegistry

Signed-off-by: Mark Payne 
Signed-off-by: Matt Burgess 

NIFI-1280: Updated SimpleKeyValueSchemaRegistry to make use of new CHOICE 
RecordFieldType - Update Record Readers to use SchemaRegistry controller 
service. Moved SchemaRegistry api into its own maven module and added to 
standard-services-api so that we can properly add dependencies on it. Code 
cleanup and bug fixes

Signed-off-by: Matt Burgess 

NIFI-1280: Fixed checkstyle violations and license exclusions for RAT plugin

Signed-off-by: Matt Burgess 

NIFI-1280: Addressed feedback from PR Review

Signed-off-by: Matt Burgess 

NIFI-1280: Additional changes/doc to support QueryFlowFile and Record 
Readers/Writers

This closes #1652


> Create QueryFlowFile Processor
> --
>
> Key: NIFI-1280
> URL: https://issues.apache.org/jira/browse/NIFI-1280
> Project: Apache NiFi
>  Issue Type: Task
>  Components: Extensions
>Reporter: Mark Payne
>Assignee: Mark Payne
> Fix For: 1.2.0
>
> Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml
>
>
> We should have a Processor that allows users to easily filter out specific 
> columns from CSV data. For instance, a user would configure two different 
> properties: "Columns of Interest" (a comma-separated list of column indexes) 
> and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it 
> would be with this Processor, as the user has to use Regular Expressions, 
> etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a 
> Sample CSV and choose which columns from there, similar to the way that Excel 
> works when importing CSV by dragging and selecting the desired columns? That 
> would certainly be a larger undertaking and would not need to be done for an 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962902#comment-15962902
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user markap14 commented on the issue:

https://github.com/apache/nifi/pull/1652
  
ok @olegz @mattyb149 I have submitted a new commit that addresses all of 
the feedback above.


> Create QueryFlowFile Processor
> --
>
> Key: NIFI-1280
> URL: https://issues.apache.org/jira/browse/NIFI-1280
> Project: Apache NiFi
>  Issue Type: Task
>  Components: Extensions
>Reporter: Mark Payne
>Assignee: Mark Payne
> Fix For: 1.2.0
>
> Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml
>
>
> We should have a Processor that allows users to easily filter out specific 
> columns from CSV data. For instance, a user would configure two different 
> properties: "Columns of Interest" (a comma-separated list of column indexes) 
> and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it 
> would be with this Processor, as the user has to use Regular Expressions, 
> etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a 
> Sample CSV and choose which columns from there, similar to the way that Excel 
> works when importing CSV by dragging and selecting the desired columns? That 
> would certainly be a larger undertaking and would not need to be done for an 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962814#comment-15962814
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1652#discussion_r110647631
  
--- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 ---
@@ -2201,7 +2202,10 @@ public int read(final byte[] b, final int off, final 
int len) throws IOException
 
 @Override
 public void close() throws IOException {
-StandardProcessSession.this.bytesRead += 
countingStream.getBytesRead();
+if (!closed) {
+StandardProcessSession.this.bytesRead += 
countingStream.getBytesRead();
+closed = true;
+}
--- End diff --

Agreed, it is not thread-safe. But ProcessSession is not intended to be 
thread-safe :)


> Create QueryFlowFile Processor
> --
>
> Key: NIFI-1280
> URL: https://issues.apache.org/jira/browse/NIFI-1280
> Project: Apache NiFi
>  Issue Type: Task
>  Components: Extensions
>Reporter: Mark Payne
>Assignee: Mark Payne
> Fix For: 1.2.0
>
> Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml
>
>
> We should have a Processor that allows users to easily filter out specific 
> columns from CSV data. For instance, a user would configure two different 
> properties: "Columns of Interest" (a comma-separated list of column indexes) 
> and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it 
> would be with this Processor, as the user has to use Regular Expressions, 
> etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a 
> Sample CSV and choose which columns from there, similar to the way that Excel 
> works when importing CSV by dragging and selecting the desired columns? That 
> would certainly be a larger undertaking and would not need to be done for an 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962811#comment-15962811
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1652#discussion_r110647404
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java
 ---
@@ -0,0 +1,550 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import org.apache.calcite.config.CalciteConnectionProperty;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.DynamicRelationship;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.queryflowfile.FlowFileTable;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.RowRecordReaderFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.ResultSetRecordSet;
+import org.apache.nifi.util.StopWatch;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"sql", "query", "calcite", "route", "record", "transform", 
"select", "update", "modify", "etl", "filter", "record", "csv", "json", "logs", 
"text", "avro", "aggregate"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Evaluates one or more SQL queries against the 
contents of a FlowFile. The result of the "
++ "SQL query then becomes the content of the output FlowFile. This can 
be used, for 

[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962813#comment-15962813
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1652#discussion_r110647534
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.json;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import 
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
+import org.codehaus.jackson.JsonNode;
+
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.PathNotFoundException;
+import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
+
+public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
+private static final Configuration STRICT_PROVIDER_CONFIGURATION = 
Configuration.builder().jsonProvider(new JacksonJsonProvider()).build();
+
+private final LinkedHashMap jsonPaths;
+private final InputStream in;
+private RecordSchema schema;
+private final String dateFormat;
+private final String timeFormat;
+private final String timestampFormat;
+
+public JsonPathRowRecordReader(final LinkedHashMap 
jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog 
logger,
+final String dateFormat, final String timeFormat, final String 
timestampFormat)
+throws MalformedRecordException, IOException {
+super(in, logger);
+
+this.dateFormat = dateFormat;
+this.timeFormat = timeFormat;
+this.timestampFormat = timestampFormat;
+
+this.schema = schema;
+this.jsonPaths = jsonPaths;
+this.in = in;
+}
+
+@Override
+public void close() throws IOException {
+in.close();
+}
+
+@Override
+public RecordSchema getSchema() {
+return schema;
+}
+
+@Override
+protected Record convertJsonNodeToRecord(final JsonNode jsonNode, 
final RecordSchema schema) throws IOException {
+if (jsonNode == null) {
+return null;
+}
+
+final DocumentContext ctx = 
JsonPath.using(STRICT_PROVIDER_CONFIGURATION).parse(jsonNode.toString());
+final Map values = new 
HashMap<>(schema.getFieldCount());
+
+for (final Map.Entry entry : 
jsonPaths.entrySet()) {
+final String fieldName = entry.getKey();
+final DataType desiredType = 
schema.getDataType(fieldName).orElse(null);
+if (desiredType == null) {
+continue;
+}
+
+final JsonPath jsonPath = entry.getValue();
+
+

[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962805#comment-15962805
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1652#discussion_r110646586
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.grok;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import io.thekraken.grok.api.Grok;
+import io.thekraken.grok.api.GrokUtils;
+import io.thekraken.grok.api.Match;
+
+public class GrokRecordReader implements RecordReader {
+private final BufferedReader reader;
+private final Grok grok;
+private RecordSchema schema;
+
+private String nextLine;
+
+static final String STACK_TRACE_COLUMN_NAME = "STACK_TRACE";
+private static final Pattern STACK_TRACE_PATTERN = Pattern.compile(
+"^\\s*(?:(?:|\\t)+at )|"
++ "(?:(?:|\\t)+\\[CIRCULAR REFERENCE\\:)|"
++ "(?:Caused by\\: )|"
++ "(?:Suppressed\\: )|"
++ "(?:\\s+... \\d+ (?:more|common frames? omitted)$)");
+
+private static final FastDateFormat TIME_FORMAT_DATE;
+private static final FastDateFormat TIME_FORMAT_TIME;
+private static final FastDateFormat TIME_FORMAT_TIMESTAMP;
+
+static {
+final TimeZone gmt = TimeZone.getTimeZone("GMT");
+TIME_FORMAT_DATE = FastDateFormat.getInstance("-MM-dd", gmt);
+TIME_FORMAT_TIME = FastDateFormat.getInstance("HH:mm:ss", gmt);
+TIME_FORMAT_TIMESTAMP = FastDateFormat.getInstance("-MM-dd 
HH:mm:ss", gmt);
+}
+
+public GrokRecordReader(final InputStream in, final Grok grok, final 
RecordSchema schema) {
+this.reader = new BufferedReader(new InputStreamReader(in));
+this.grok = grok;
+this.schema = schema;
+}
+
+@Override
+public void close() throws IOException {
+reader.close();
+}
+
+@Override
+public Record nextRecord() throws IOException, 
MalformedRecordException {
+final String line = nextLine == null ? reader.readLine() : 
nextLine;
+nextLine = null; // ensure that we don't process nextLine again
+if (line == null) {
+return null;
+}
+
+final RecordSchema schema = getSchema();
+
+final Match match = grok.match(line);
+match.captures();
+final Map valueMap = match.toMap();
+if (valueMap.isEmpty()) {   // We were unable to match the pattern 
so return an empty Object array.
+return new 

[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962804#comment-15962804
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1652#discussion_r110646390
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.grok;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import io.thekraken.grok.api.Grok;
+import io.thekraken.grok.api.GrokUtils;
+import io.thekraken.grok.api.Match;
+
+public class GrokRecordReader implements RecordReader {
+private final BufferedReader reader;
+private final Grok grok;
+private RecordSchema schema;
+
+private String nextLine;
+
+static final String STACK_TRACE_COLUMN_NAME = "STACK_TRACE";
+private static final Pattern STACK_TRACE_PATTERN = Pattern.compile(
+"^\\s*(?:(?:|\\t)+at )|"
++ "(?:(?:|\\t)+\\[CIRCULAR REFERENCE\\:)|"
++ "(?:Caused by\\: )|"
++ "(?:Suppressed\\: )|"
++ "(?:\\s+... \\d+ (?:more|common frames? omitted)$)");
+
+private static final FastDateFormat TIME_FORMAT_DATE;
+private static final FastDateFormat TIME_FORMAT_TIME;
+private static final FastDateFormat TIME_FORMAT_TIMESTAMP;
+
+static {
+final TimeZone gmt = TimeZone.getTimeZone("GMT");
+TIME_FORMAT_DATE = FastDateFormat.getInstance("-MM-dd", gmt);
+TIME_FORMAT_TIME = FastDateFormat.getInstance("HH:mm:ss", gmt);
+TIME_FORMAT_TIMESTAMP = FastDateFormat.getInstance("-MM-dd 
HH:mm:ss", gmt);
+}
+
+public GrokRecordReader(final InputStream in, final Grok grok, final 
RecordSchema schema) {
+this.reader = new BufferedReader(new InputStreamReader(in));
+this.grok = grok;
+this.schema = schema;
+}
+
+@Override
+public void close() throws IOException {
+reader.close();
+}
+
+@Override
+public Record nextRecord() throws IOException, 
MalformedRecordException {
+final String line = nextLine == null ? reader.readLine() : 
nextLine;
+nextLine = null; // ensure that we don't process nextLine again
+if (line == null) {
+return null;
+}
+
+final RecordSchema schema = getSchema();
+
+final Match match = grok.match(line);
+match.captures();
+final Map valueMap = match.toMap();
+if (valueMap.isEmpty()) {   // We were unable to match the pattern 
so return an empty Object array.
+return new 

[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962800#comment-15962800
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1652#discussion_r110646007
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.grok;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RowRecordReaderFactory;
+import org.apache.nifi.serialization.SchemaRegistryRecordReader;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import io.thekraken.grok.api.Grok;
+import io.thekraken.grok.api.exception.GrokException;
+
+@Tags({"grok", "logs", "logfiles", "parse", "unstructured", "text", 
"record", "reader", "regex", "pattern", "logstash"})
+@CapabilityDescription("Provides a mechanism for reading unstructured text 
data, such as log files, and structuring the data "
++ "so that it can be processed. The service is configured using Grok 
patterns. "
++ "The service reads from a stream of data and splits each message 
that it finds into a separate Record, each containing the fields that are 
configured. "
++ "If a line in the input does not match the expected message pattern, 
the line of text is considered to be part of the previous "
++ "message, with the exception of stack traces. A stack trace that is 
found at the end of a log message is considered to be part "
++ "of the previous message but is added to the 'STACK_TRACE' field of 
the Record. If a record has no stack trace, it will have a NULL value "
++ "for the STACK_TRACE field. All fields that are parsed are 
considered to be of type String by default. If there is need to change the type 
of a field, "
++ "this can be accomplished by configuring the Schema Registry to use 
and adding the appropriate schema.")
+public class GrokReader extends SchemaRegistryRecordReader implements 
RowRecordReaderFactory {
+private volatile Grok grok;
+private volatile boolean useSchemaRegistry;
+
+private static final String DEFAULT_PATTERN_NAME = 
"/default-grok-patterns.txt";
+
+static final PropertyDescriptor PATTERN_FILE = new 
PropertyDescriptor.Builder()
+.name("Grok Pattern File")
+.description("Path to a file that contains Grok Patterns to use 
for parsing logs. If not specified, a built-in default Pattern file "
++ "will be used. If specified, all patterns in the given 
pattern file will override the default patterns. See the Controller Service's "
++ "Additional Details for a list of pre-defined patterns.")
+.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+.expressionLanguageSupported(true)
+.required(false)
+.build();
+
+static final PropertyDescriptor GROK_EXPRESSION = new 
PropertyDescriptor.Builder()
+.name("Grok Expression")
+.description("Specifies the format of a log line in Grok format. 
This allows the Record 

[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962797#comment-15962797
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1652#discussion_r110645869
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.csv;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.DateTimeTextRecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+@Tags({"csv", "result", "set", "recordset", "record", "writer", 
"serializer", "row", "tsv", "tab", "separated", "delimited"})
+@CapabilityDescription("Writes the contents of a RecordSet as CSV data. 
The first line written "
++ "will be the column names. All subsequent lines will be the values 
corresponding to those columns.")
+public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter 
implements RecordSetWriterFactory {
+
+private volatile CSVFormat csvFormat;
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+final List properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+properties.add(CSVUtils.CSV_FORMAT);
+properties.add(CSVUtils.VALUE_SEPARATOR);
+properties.add(CSVUtils.QUOTE_CHAR);
+properties.add(CSVUtils.ESCAPE_CHAR);
+properties.add(CSVUtils.COMMENT_MARKER);
+properties.add(CSVUtils.NULL_STRING);
+properties.add(CSVUtils.TRIM_FIELDS);
+properties.add(CSVUtils.QUOTE_MODE);
+properties.add(CSVUtils.RECORD_SEPARATOR);
+properties.add(CSVUtils.TRAILING_DELIMITER);
+return properties;
+}
--- End diff --

Can do.


> Create QueryFlowFile Processor
> --
>
> Key: NIFI-1280
> URL: https://issues.apache.org/jira/browse/NIFI-1280
> Project: Apache NiFi
>  Issue Type: Task
>  Components: Extensions
>Reporter: Mark Payne
>Assignee: Mark Payne
> Fix For: 1.2.0
>
> Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml
>
>
> We should have a Processor that allows users to easily filter out specific 
> columns from CSV data. For instance, a user would configure two different 
> properties: "Columns of Interest" (a comma-separated list of column indexes) 
> and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it 
> would be with this Processor, as the user has to use Regular Expressions, 
> etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a 
> Sample CSV and choose which columns from there, similar to the way that Excel 
> works when importing CSV by dragging and selecting the desired columns? That 
> would certainly be a larger undertaking and would not need to be done for an 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962793#comment-15962793
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user markap14 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1652#discussion_r110645816
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/.gitignore
 ---
@@ -0,0 +1 @@
+/bin/
--- End diff --

Yeah that was added by my IDE. Will fix.


> Create QueryFlowFile Processor
> --
>
> Key: NIFI-1280
> URL: https://issues.apache.org/jira/browse/NIFI-1280
> Project: Apache NiFi
>  Issue Type: Task
>  Components: Extensions
>Reporter: Mark Payne
>Assignee: Mark Payne
> Fix For: 1.2.0
>
> Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml
>
>
> We should have a Processor that allows users to easily filter out specific 
> columns from CSV data. For instance, a user would configure two different 
> properties: "Columns of Interest" (a comma-separated list of column indexes) 
> and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it 
> would be with this Processor, as the user has to use Regular Expressions, 
> etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a 
> Sample CSV and choose which columns from there, similar to the way that Excel 
> works when importing CSV by dragging and selecting the desired columns? That 
> would certainly be a larger undertaking and would not need to be done for an 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962167#comment-15962167
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user olegz commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1652#discussion_r110541128
  
--- Diff: 
nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
 ---
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.services;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.avro.LogicalType;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+@Tags({ "schema", "registry", "avro", "json", "csv" })
+@CapabilityDescription("Provides a service for registering and accessing 
schemas. You can register a schema "
++ "as a dynamic property where 'name' represents the schema name 
and 'value' represents the textual "
++ "representation of the actual schema following the syntax and 
semantics of Avro's Schema format.")
+public class AvroSchemaRegistry extends AbstractControllerService 
implements SchemaRegistry {
+
+private final Map schemaNameToSchemaMap;
+
+private static final String LOGICAL_TYPE_DATE = "date";
+private static final String LOGICAL_TYPE_TIME_MILLIS = "time-millis";
+private static final String LOGICAL_TYPE_TIME_MICROS = "time-micros";
+private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = 
"timestamp-millis";
+private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = 
"timestamp-micros";
+
+
+public AvroSchemaRegistry() {
+this.schemaNameToSchemaMap = new HashMap<>();
+}
+
+@OnEnabled
+public void enable(ConfigurationContext configuratiponContext) throws 
InitializationException {
+
this.schemaNameToSchemaMap.putAll(configuratiponContext.getProperties().entrySet().stream()
+.filter(propEntry -> propEntry.getKey().isDynamic())
+.collect(Collectors.toMap(propEntry -> 
propEntry.getKey().getName(), propEntry -> propEntry.getValue(;
+}
+
+@Override
+public String retrieveSchemaText(String schemaName) {
+if (!this.schemaNameToSchemaMap.containsKey(schemaName)) {
+throw new IllegalArgumentException("Failed to find schema; 
Name: '" + schemaName + ".");
+} else {
+return this.schemaNameToSchemaMap.get(schemaName);
+}
+}
+
+@Override
+public String retrieveSchemaText(String schemaName, Map attributes) {
+throw new UnsupportedOperationException("This version of schema 
registry does not "
++ "support this operation, since schemas are only 
identofied by name.");
--- End diff --

Perhaps instead of throwing the exception we should just delegate to the 

[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962169#comment-15962169
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user olegz commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1652#discussion_r110541076
  
--- Diff: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 ---
@@ -2201,7 +2202,10 @@ public int read(final byte[] b, final int off, final 
int len) throws IOException
 
 @Override
 public void close() throws IOException {
-StandardProcessSession.this.bytesRead += 
countingStream.getBytesRead();
+if (!closed) {
+StandardProcessSession.this.bytesRead += 
countingStream.getBytesRead();
+closed = true;
+}
--- End diff --

Technically the above is not thread safe, consider adding some 
synchronization.


> Create QueryFlowFile Processor
> --
>
> Key: NIFI-1280
> URL: https://issues.apache.org/jira/browse/NIFI-1280
> Project: Apache NiFi
>  Issue Type: Task
>  Components: Extensions
>Reporter: Mark Payne
>Assignee: Mark Payne
> Fix For: 1.2.0
>
> Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml
>
>
> We should have a Processor that allows users to easily filter out specific 
> columns from CSV data. For instance, a user would configure two different 
> properties: "Columns of Interest" (a comma-separated list of column indexes) 
> and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it 
> would be with this Processor, as the user has to use Regular Expressions, 
> etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a 
> Sample CSV and choose which columns from there, similar to the way that Excel 
> works when importing CSV by dragging and selecting the desired columns? That 
> would certainly be a larger undertaking and would not need to be done for an 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962173#comment-15962173
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user olegz commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1652#discussion_r110541830
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.grok;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import io.thekraken.grok.api.Grok;
+import io.thekraken.grok.api.GrokUtils;
+import io.thekraken.grok.api.Match;
+
+public class GrokRecordReader implements RecordReader {
+private final BufferedReader reader;
+private final Grok grok;
+private RecordSchema schema;
+
+private String nextLine;
+
+static final String STACK_TRACE_COLUMN_NAME = "STACK_TRACE";
+private static final Pattern STACK_TRACE_PATTERN = Pattern.compile(
+"^\\s*(?:(?:|\\t)+at )|"
++ "(?:(?:|\\t)+\\[CIRCULAR REFERENCE\\:)|"
++ "(?:Caused by\\: )|"
++ "(?:Suppressed\\: )|"
++ "(?:\\s+... \\d+ (?:more|common frames? omitted)$)");
+
+private static final FastDateFormat TIME_FORMAT_DATE;
+private static final FastDateFormat TIME_FORMAT_TIME;
+private static final FastDateFormat TIME_FORMAT_TIMESTAMP;
+
+static {
+final TimeZone gmt = TimeZone.getTimeZone("GMT");
+TIME_FORMAT_DATE = FastDateFormat.getInstance("-MM-dd", gmt);
+TIME_FORMAT_TIME = FastDateFormat.getInstance("HH:mm:ss", gmt);
+TIME_FORMAT_TIMESTAMP = FastDateFormat.getInstance("-MM-dd 
HH:mm:ss", gmt);
+}
+
+public GrokRecordReader(final InputStream in, final Grok grok, final 
RecordSchema schema) {
+this.reader = new BufferedReader(new InputStreamReader(in));
+this.grok = grok;
+this.schema = schema;
+}
+
+@Override
+public void close() throws IOException {
+reader.close();
+}
+
+@Override
+public Record nextRecord() throws IOException, 
MalformedRecordException {
+final String line = nextLine == null ? reader.readLine() : 
nextLine;
+nextLine = null; // ensure that we don't process nextLine again
+if (line == null) {
+return null;
+}
+
+final RecordSchema schema = getSchema();
+
+final Match match = grok.match(line);
+match.captures();
+final Map valueMap = match.toMap();
+if (valueMap.isEmpty()) {   // We were unable to match the pattern 
so return an empty Object array.
+return new 

[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962170#comment-15962170
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user olegz commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1652#discussion_r110541217
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java
 ---
@@ -0,0 +1,550 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import org.apache.calcite.config.CalciteConnectionProperty;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.DynamicRelationship;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.queryflowfile.FlowFileTable;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.RowRecordReaderFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.ResultSetRecordSet;
+import org.apache.nifi.util.StopWatch;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"sql", "query", "calcite", "route", "record", "transform", 
"select", "update", "modify", "etl", "filter", "record", "csv", "json", "logs", 
"text", "avro", "aggregate"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Evaluates one or more SQL queries against the 
contents of a FlowFile. The result of the "
++ "SQL query then becomes the content of the output FlowFile. This can 
be used, for example, "

[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962168#comment-15962168
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user olegz commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1652#discussion_r110541764
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
 ---
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.json;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.serialization.record.util.DataTypeUtils;
+import 
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
+import org.codehaus.jackson.JsonNode;
+
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.DocumentContext;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.PathNotFoundException;
+import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
+
+public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
+private static final Configuration STRICT_PROVIDER_CONFIGURATION = 
Configuration.builder().jsonProvider(new JacksonJsonProvider()).build();
+
+private final LinkedHashMap jsonPaths;
+private final InputStream in;
+private RecordSchema schema;
+private final String dateFormat;
+private final String timeFormat;
+private final String timestampFormat;
+
+public JsonPathRowRecordReader(final LinkedHashMap 
jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog 
logger,
+final String dateFormat, final String timeFormat, final String 
timestampFormat)
+throws MalformedRecordException, IOException {
+super(in, logger);
+
+this.dateFormat = dateFormat;
+this.timeFormat = timeFormat;
+this.timestampFormat = timestampFormat;
+
+this.schema = schema;
+this.jsonPaths = jsonPaths;
+this.in = in;
+}
+
+@Override
+public void close() throws IOException {
+in.close();
+}
+
+@Override
+public RecordSchema getSchema() {
+return schema;
+}
+
+@Override
+protected Record convertJsonNodeToRecord(final JsonNode jsonNode, 
final RecordSchema schema) throws IOException {
+if (jsonNode == null) {
+return null;
+}
+
+final DocumentContext ctx = 
JsonPath.using(STRICT_PROVIDER_CONFIGURATION).parse(jsonNode.toString());
+final Map values = new 
HashMap<>(schema.getFieldCount());
+
+for (final Map.Entry entry : 
jsonPaths.entrySet()) {
+final String fieldName = entry.getKey();
+final DataType desiredType = 
schema.getDataType(fieldName).orElse(null);
+if (desiredType == null) {
+continue;
+}
+
+final JsonPath jsonPath = entry.getValue();
+
+Object 

[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962166#comment-15962166
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user olegz commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1652#discussion_r110544739
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
 ---
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.grok;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RowRecordReaderFactory;
+import org.apache.nifi.serialization.SchemaRegistryRecordReader;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import io.thekraken.grok.api.Grok;
+import io.thekraken.grok.api.exception.GrokException;
+
+@Tags({"grok", "logs", "logfiles", "parse", "unstructured", "text", 
"record", "reader", "regex", "pattern", "logstash"})
+@CapabilityDescription("Provides a mechanism for reading unstructured text 
data, such as log files, and structuring the data "
++ "so that it can be processed. The service is configured using Grok 
patterns. "
++ "The service reads from a stream of data and splits each message 
that it finds into a separate Record, each containing the fields that are 
configured. "
++ "If a line in the input does not match the expected message pattern, 
the line of text is considered to be part of the previous "
++ "message, with the exception of stack traces. A stack trace that is 
found at the end of a log message is considered to be part "
++ "of the previous message but is added to the 'STACK_TRACE' field of 
the Record. If a record has no stack trace, it will have a NULL value "
++ "for the STACK_TRACE field. All fields that are parsed are 
considered to be of type String by default. If there is need to change the type 
of a field, "
++ "this can be accomplished by configuring the Schema Registry to use 
and adding the appropriate schema.")
+public class GrokReader extends SchemaRegistryRecordReader implements 
RowRecordReaderFactory {
+private volatile Grok grok;
+private volatile boolean useSchemaRegistry;
+
+private static final String DEFAULT_PATTERN_NAME = 
"/default-grok-patterns.txt";
+
+static final PropertyDescriptor PATTERN_FILE = new 
PropertyDescriptor.Builder()
+.name("Grok Pattern File")
+.description("Path to a file that contains Grok Patterns to use 
for parsing logs. If not specified, a built-in default Pattern file "
++ "will be used. If specified, all patterns in the given 
pattern file will override the default patterns. See the Controller Service's "
++ "Additional Details for a list of pre-defined patterns.")
+.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+.expressionLanguageSupported(true)
+.required(false)
+.build();
+
+static final PropertyDescriptor GROK_EXPRESSION = new 
PropertyDescriptor.Builder()
+.name("Grok Expression")
+.description("Specifies the format of a log line in Grok format. 
This allows the Record 

[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962172#comment-15962172
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user olegz commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1652#discussion_r110541788
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
 ---
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.grok;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import io.thekraken.grok.api.Grok;
+import io.thekraken.grok.api.GrokUtils;
+import io.thekraken.grok.api.Match;
+
+public class GrokRecordReader implements RecordReader {
+private final BufferedReader reader;
+private final Grok grok;
+private RecordSchema schema;
+
+private String nextLine;
+
+static final String STACK_TRACE_COLUMN_NAME = "STACK_TRACE";
+private static final Pattern STACK_TRACE_PATTERN = Pattern.compile(
+"^\\s*(?:(?:|\\t)+at )|"
++ "(?:(?:|\\t)+\\[CIRCULAR REFERENCE\\:)|"
++ "(?:Caused by\\: )|"
++ "(?:Suppressed\\: )|"
++ "(?:\\s+... \\d+ (?:more|common frames? omitted)$)");
+
+private static final FastDateFormat TIME_FORMAT_DATE;
+private static final FastDateFormat TIME_FORMAT_TIME;
+private static final FastDateFormat TIME_FORMAT_TIMESTAMP;
+
+static {
+final TimeZone gmt = TimeZone.getTimeZone("GMT");
+TIME_FORMAT_DATE = FastDateFormat.getInstance("-MM-dd", gmt);
+TIME_FORMAT_TIME = FastDateFormat.getInstance("HH:mm:ss", gmt);
+TIME_FORMAT_TIMESTAMP = FastDateFormat.getInstance("-MM-dd 
HH:mm:ss", gmt);
+}
+
+public GrokRecordReader(final InputStream in, final Grok grok, final 
RecordSchema schema) {
+this.reader = new BufferedReader(new InputStreamReader(in));
+this.grok = grok;
+this.schema = schema;
+}
+
+@Override
+public void close() throws IOException {
+reader.close();
+}
+
+@Override
+public Record nextRecord() throws IOException, 
MalformedRecordException {
+final String line = nextLine == null ? reader.readLine() : 
nextLine;
+nextLine = null; // ensure that we don't process nextLine again
+if (line == null) {
+return null;
+}
+
+final RecordSchema schema = getSchema();
+
+final Match match = grok.match(line);
+match.captures();
+final Map valueMap = match.toMap();
+if (valueMap.isEmpty()) {   // We were unable to match the pattern 
so return an empty Object array.
+return new 

[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962174#comment-15962174
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user olegz commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1652#discussion_r110544779
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.csv;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.csv.CSVFormat;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.DateTimeTextRecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+@Tags({"csv", "result", "set", "recordset", "record", "writer", 
"serializer", "row", "tsv", "tab", "separated", "delimited"})
+@CapabilityDescription("Writes the contents of a RecordSet as CSV data. 
The first line written "
++ "will be the column names. All subsequent lines will be the values 
corresponding to those columns.")
+public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter 
implements RecordSetWriterFactory {
+
+private volatile CSVFormat csvFormat;
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+final List properties = new 
ArrayList<>(super.getSupportedPropertyDescriptors());
+properties.add(CSVUtils.CSV_FORMAT);
+properties.add(CSVUtils.VALUE_SEPARATOR);
+properties.add(CSVUtils.QUOTE_CHAR);
+properties.add(CSVUtils.ESCAPE_CHAR);
+properties.add(CSVUtils.COMMENT_MARKER);
+properties.add(CSVUtils.NULL_STRING);
+properties.add(CSVUtils.TRIM_FIELDS);
+properties.add(CSVUtils.QUOTE_MODE);
+properties.add(CSVUtils.RECORD_SEPARATOR);
+properties.add(CSVUtils.TRAILING_DELIMITER);
+return properties;
+}
--- End diff --

Similar as before. This will be called at least 3 times, so may be static 
initializer. This pattern appears in many places throughout this PR so consider 
fixing it in many places.


> Create QueryFlowFile Processor
> --
>
> Key: NIFI-1280
> URL: https://issues.apache.org/jira/browse/NIFI-1280
> Project: Apache NiFi
>  Issue Type: Task
>  Components: Extensions
>Reporter: Mark Payne
>Assignee: Mark Payne
> Fix For: 1.2.0
>
> Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml
>
>
> We should have a Processor that allows users to easily filter out specific 
> columns from CSV data. For instance, a user would configure two different 
> properties: "Columns of Interest" (a comma-separated list of column indexes) 
> and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it 
> would be with this Processor, as the user has to use Regular Expressions, 
> etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a 
> Sample CSV and choose which columns from there, similar to the way that Excel 
> works when importing CSV by dragging and selecting the desired columns? That 
> would certainly be a larger undertaking and would not need to be done for an 
> initial implementation.



--
This message was sent by Atlassian JIRA

[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962171#comment-15962171
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user olegz commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1652#discussion_r110544874
  
--- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/.gitignore
 ---
@@ -0,0 +1 @@
+/bin/
--- End diff --

I think it's covered by the global .gitignore


> Create QueryFlowFile Processor
> --
>
> Key: NIFI-1280
> URL: https://issues.apache.org/jira/browse/NIFI-1280
> Project: Apache NiFi
>  Issue Type: Task
>  Components: Extensions
>Reporter: Mark Payne
>Assignee: Mark Payne
> Fix For: 1.2.0
>
> Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml
>
>
> We should have a Processor that allows users to easily filter out specific 
> columns from CSV data. For instance, a user would configure two different 
> properties: "Columns of Interest" (a comma-separated list of column indexes) 
> and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it 
> would be with this Processor, as the user has to use Regular Expressions, 
> etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a 
> Sample CSV and choose which columns from there, similar to the way that Excel 
> works when importing CSV by dragging and selecting the desired columns? That 
> would certainly be a larger undertaking and would not need to be done for an 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15961299#comment-15961299
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1652#discussion_r110460868
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java
 ---
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.RowRecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestQueryFlowFile {
+
+static {
+System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", 
"info");
+System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
+System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", 
"debug");
+
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard.SQLTransform",
 "debug");
+}
+
+private static final String REL_NAME = "success";
+
+@Test
+public void testSimple() throws InitializationException, IOException, 
SQLException {
+final MockRecordParser parser = new MockRecordParser();
+parser.addSchemaField("name", RecordFieldType.STRING);
+parser.addSchemaField("age", RecordFieldType.INT);
+parser.addRecord("Tom", 49);
+
+final MockRecordWriter writer = new 
MockRecordWriter("\"name\",\"points\"");
+
+final TestRunner runner = 
TestRunners.newTestRunner(QueryFlowFile.class);
+runner.addControllerService("parser", parser);
+runner.enableControllerService(parser);
+runner.addControllerService("writer", writer);
+runner.enableControllerService(writer);
+
+runner.setProperty(REL_NAME, "select name, age from FLOWFILE WHERE 
name <> ''");
+runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser");
+runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer");
+
+final int numIterations = 1;
+for (int i = 0; i < numIterations; i++) {
+runner.enqueue(new byte[0]);
+}
+
+runner.setThreadCount(4);
+runner.run(2 * numIterations);
+
+runner.assertTransferCount(REL_NAME, 1);
+final MockFlowFile out = 
runner.getFlowFilesForRelationship(REL_NAME).get(0);
+ 

[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15961298#comment-15961298
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1652#discussion_r110460794
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java
 ---
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.RowRecordReaderFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestQueryFlowFile {
+
+static {
+System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", 
"info");
+System.setProperty("org.slf4j.simpleLogger.showDateTime", "true");
+System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", 
"debug");
+
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard.SQLTransform",
 "debug");
+}
+
+private static final String REL_NAME = "success";
+
+@Test
+public void testSimple() throws InitializationException, IOException, 
SQLException {
+final MockRecordParser parser = new MockRecordParser();
+parser.addSchemaField("name", RecordFieldType.STRING);
+parser.addSchemaField("age", RecordFieldType.INT);
+parser.addRecord("Tom", 49);
+
+final MockRecordWriter writer = new 
MockRecordWriter("\"name\",\"points\"");
+
+final TestRunner runner = 
TestRunners.newTestRunner(QueryFlowFile.class);
+runner.addControllerService("parser", parser);
+runner.enableControllerService(parser);
+runner.addControllerService("writer", writer);
+runner.enableControllerService(writer);
+
+runner.setProperty(REL_NAME, "select name, age from FLOWFILE WHERE 
name <> ''");
+runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser");
+runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer");
+
+final int numIterations = 1;
+for (int i = 0; i < numIterations; i++) {
+runner.enqueue(new byte[0]);
+}
+
+runner.setThreadCount(4);
+runner.run(2 * numIterations);
+
+runner.assertTransferCount(REL_NAME, 1);
+final MockFlowFile out = 
runner.getFlowFilesForRelationship(REL_NAME).get(0);
+ 

[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15960845#comment-15960845
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/1652#discussion_r110397744
  
--- Diff: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java
 ---
@@ -0,0 +1,550 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import org.apache.calcite.config.CalciteConnectionProperty;
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.jdbc.CalciteConnection;
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.DynamicRelationship;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.queryflowfile.FlowFileTable;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.RowRecordReaderFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.ResultSetRecordSet;
+import org.apache.nifi.util.StopWatch;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"sql", "query", "calcite", "route", "record", "transform", 
"select", "update", "modify", "etl", "filter", "record", "csv", "json", "logs", 
"text", "avro", "aggregate"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Evaluates one or more SQL queries against the 
contents of a FlowFile. The result of the "
++ "SQL query then becomes the content of the output FlowFile. This can 
be used, for 

[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15960775#comment-15960775
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user markap14 commented on the issue:

https://github.com/apache/nifi/pull/1652
  
@olegz thanks for reviewing! Just a heads up - I pushed a new commit to 
address some RAT exclusions and checkstyle violations -- i had forgotten to run 
the -Pcontrib-check before creating the PR.


> Create QueryFlowFile Processor
> --
>
> Key: NIFI-1280
> URL: https://issues.apache.org/jira/browse/NIFI-1280
> Project: Apache NiFi
>  Issue Type: Task
>  Components: Extensions
>Reporter: Mark Payne
>Assignee: Mark Payne
> Fix For: 1.2.0
>
> Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml
>
>
> We should have a Processor that allows users to easily filter out specific 
> columns from CSV data. For instance, a user would configure two different 
> properties: "Columns of Interest" (a comma-separated list of column indexes) 
> and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it 
> would be with this Processor, as the user has to use Regular Expressions, 
> etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a 
> Sample CSV and choose which columns from there, similar to the way that Excel 
> works when importing CSV by dragging and selecting the desired columns? That 
> would certainly be a larger undertaking and would not need to be done for an 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15959541#comment-15959541
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

Github user olegz commented on the issue:

https://github.com/apache/nifi/pull/1652
  
Reviewing. . .


> Create QueryFlowFile Processor
> --
>
> Key: NIFI-1280
> URL: https://issues.apache.org/jira/browse/NIFI-1280
> Project: Apache NiFi
>  Issue Type: Task
>  Components: Extensions
>Reporter: Mark Payne
>Assignee: Mark Payne
> Fix For: 1.2.0
>
> Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml
>
>
> We should have a Processor that allows users to easily filter out specific 
> columns from CSV data. For instance, a user would configure two different 
> properties: "Columns of Interest" (a comma-separated list of column indexes) 
> and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it 
> would be with this Processor, as the user has to use Regular Expressions, 
> etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a 
> Sample CSV and choose which columns from there, similar to the way that Excel 
> works when importing CSV by dragging and selecting the desired columns? That 
> would certainly be a larger undertaking and would not need to be done for an 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-06 Thread Mark Payne (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15959520#comment-15959520
 ] 

Mark Payne commented on NIFI-1280:
--

I've also attached a Template to aid in testing, since there's a lot going on 
here. It creates several controller services and processors. Would just need to 
enable the controller services and the processors run through several scenarios.

> Create QueryFlowFile Processor
> --
>
> Key: NIFI-1280
> URL: https://issues.apache.org/jira/browse/NIFI-1280
> Project: Apache NiFi
>  Issue Type: Task
>  Components: Extensions
>Reporter: Mark Payne
>Assignee: Mark Payne
> Fix For: 1.2.0
>
> Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml
>
>
> We should have a Processor that allows users to easily filter out specific 
> columns from CSV data. For instance, a user would configure two different 
> properties: "Columns of Interest" (a comma-separated list of column indexes) 
> and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it 
> would be with this Processor, as the user has to use Regular Expressions, 
> etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a 
> Sample CSV and choose which columns from there, similar to the way that Excel 
> works when importing CSV by dragging and selecting the desired columns? That 
> would certainly be a larger undertaking and would not need to be done for an 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-06 Thread Mark Payne (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15959473#comment-15959473
 ] 

Mark Payne commented on NIFI-1280:
--

I've created a PR that I think is sufficient. There are a few more things that 
I would like to do, but this has dragged on long enough without me pushing 
anything, so I've pushed a PR so that people can review & hopefully get merged. 
Will create separate JIRA's for the remaining enhances that I would like to 
perform. The most significant is to allow more flexibility in choosing the 
schema to use. Rather than requiring a Schema Name be provided with a Schema 
registry would like to allow user to use an attribute or read schema from the 
content of the FlowFile itself in cases such as Avro. In addition, I want to 
add updates to include the schema on the outgoing records when appropriate.

> Create QueryFlowFile Processor
> --
>
> Key: NIFI-1280
> URL: https://issues.apache.org/jira/browse/NIFI-1280
> Project: Apache NiFi
>  Issue Type: Task
>  Components: Extensions
>Reporter: Mark Payne
>Assignee: Mark Payne
> Fix For: 1.2.0
>
>
> We should have a Processor that allows users to easily filter out specific 
> columns from CSV data. For instance, a user would configure two different 
> properties: "Columns of Interest" (a comma-separated list of column indexes) 
> and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it 
> would be with this Processor, as the user has to use Regular Expressions, 
> etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a 
> Sample CSV and choose which columns from there, similar to the way that Excel 
> works when importing CSV by dragging and selecting the desired columns? That 
> would certainly be a larger undertaking and would not need to be done for an 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15959470#comment-15959470
 ] 

ASF GitHub Bot commented on NIFI-1280:
--

GitHub user markap14 opened a pull request:

https://github.com/apache/nifi/pull/1652

NIFI-1280: Renamed FilterCSVColumns to QueryFlowFile; introduced readers 
and writers as controller services

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [ ] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [ ] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/markap14/nifi NIFI-1280

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/1652.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1652


commit 4a10c7826051a85d8fab506a76cfc4144d483f0c
Author: Toivo Adams 
Date:   2016-05-07T09:29:15Z

NIFI-1280 Create FilterCSVColumns Processor.

commit 4f262960dcd4e17bc77c78b50c979c3717ab40ec
Author: Mark Payne 
Date:   2016-07-11T18:57:00Z

NIFI-1280: Refactoring to make more generic so that other data types can be 
supported; created InputStreams to content on-demand so that multiple passes 
can be made over FlowFile content if required. Created new Controller Services 
for reading and writing specific data types

commit 9def3d90374025a8e7172ac6d014f7b0ac644c4d
Author: Oleg Zhurakousky 
Date:   2017-03-30T13:12:07Z

NIFI-1280 added support for RecordSchema in SchemaRegistry

Signed-off-by: Mark Payne 

commit f1b061988352bd556b4a7a49afaf777c7be95315
Author: Mark Payne 
Date:   2017-03-30T14:19:54Z

NIFI-1280: Updated SimpleKeyValueSchemaRegistry to make use of new CHOICE 
RecordFieldType
- Update Record Readers to use SchemaRegistry controller service. Moved 
SchemaRegistry api into its own maven module and added to standard-services-api 
so that we can properly add dependencies on it. Code cleanup and bug fixes




> Create QueryFlowFile Processor
> --
>
> Key: NIFI-1280
> URL: https://issues.apache.org/jira/browse/NIFI-1280
> Project: Apache NiFi
>  Issue Type: Task
>  Components: Extensions
>Reporter: Mark Payne
>Assignee: Mark Payne
> Fix For: 1.2.0
>
>
> We should have a Processor that allows users to easily filter out specific 
> columns from CSV data. For instance, a user would configure two different 
> properties: "Columns of Interest" (a comma-separated list of column indexes) 
> and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it 
> would be with this Processor, as the user has to use Regular Expressions, 
> etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a 
> Sample CSV and choose which columns from there, similar to the way that Excel 
> works when importing CSV by dragging and selecting the desired 

[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor

2017-04-03 Thread Mark Payne (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954024#comment-15954024
 ] 

Mark Payne commented on NIFI-1280:
--

I am getting pretty close to this being in good shape now. It's taken far 
longer than I ever expected, but that's because it's really been transforming a 
lot of how we handle different data formats in the process. It's really 
incorporated most all of the features proposed in 
https://cwiki.apache.org/confluence/display/NIFI/First-class+Avro+Support and 
fairly extensive mailing list conversations that have occurred around 
record-oriented data here: 
http://apache-nifi-developer-list.39713.n7.nabble.com/Looking-for-feedback-on-my-WIP-Design-td13097.html#none
 and in a couple of other emails. I need to do some cleanup and then I think I 
can get a PR in this week. I need to write some more unit tests and ensure that 
we are handling data type coercion consistently across the different 
readers/writers. I also need to update a bit how we are referencing schemas 
when appropriate.

With these Record Readers and Writers, I think we can actually remove the 
TransformXToY processors that were brought in recently in the 
nifi-registry-processors module. [~ozhurakousky] do you agree?

> Create QueryFlowFile Processor
> --
>
> Key: NIFI-1280
> URL: https://issues.apache.org/jira/browse/NIFI-1280
> Project: Apache NiFi
>  Issue Type: Task
>  Components: Extensions
>Reporter: Mark Payne
>Assignee: Mark Payne
> Fix For: 1.2.0
>
>
> We should have a Processor that allows users to easily filter out specific 
> columns from CSV data. For instance, a user would configure two different 
> properties: "Columns of Interest" (a comma-separated list of column indexes) 
> and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it 
> would be with this Processor, as the user has to use Regular Expressions, 
> etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a 
> Sample CSV and choose which columns from there, similar to the way that Excel 
> works when importing CSV by dragging and selecting the desired columns? That 
> would certainly be a larger undertaking and would not need to be done for an 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)