[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16009723#comment-16009723 ] ASF GitHub Bot commented on NIFI-1280: -- Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/420 > Create QueryFlowFile Processor > -- > > Key: NIFI-1280 > URL: https://issues.apache.org/jira/browse/NIFI-1280 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Reporter: Mark Payne >Assignee: Mark Payne > Fix For: 1.2.0 > > Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml > > > We should have a Processor that allows users to easily filter out specific > columns from CSV data. For instance, a user would configure two different > properties: "Columns of Interest" (a comma-separated list of column indexes) > and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns). > We can do this today with ReplaceText, but it is far more difficult than it > would be with this Processor, as the user has to use Regular Expressions, > etc. with ReplaceText. > Eventually a Custom UI could even be built that allows a user to upload a > Sample CSV and choose which columns from there, similar to the way that Excel > works when importing CSV by dragging and selecting the desired columns? That > would certainly be a larger undertaking and would not need to be done for an > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15965135#comment-15965135 ] ASF GitHub Bot commented on NIFI-1280: -- Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/1652 > Create QueryFlowFile Processor > -- > > Key: NIFI-1280 > URL: https://issues.apache.org/jira/browse/NIFI-1280 > Project: Apache NiFi > Issue Type: Task > Components: Extensions >Reporter: Mark Payne >Assignee: Mark Payne > Fix For: 1.2.0 > > Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml > > > We should have a Processor that allows users to easily filter out specific > columns from CSV data. For instance, a user would configure two different > properties: "Columns of Interest" (a comma-separated list of column indexes) > and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns). > We can do this today with ReplaceText, but it is far more difficult than it > would be with this Processor, as the user has to use Regular Expressions, > etc. with ReplaceText. > Eventually a Custom UI could even be built that allows a user to upload a > Sample CSV and choose which columns from there, similar to the way that Excel > works when importing CSV by dragging and selecting the desired columns? That > would certainly be a larger undertaking and would not need to be done for an > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15965137#comment-15965137 ] ASF subversion and git services commented on NIFI-1280: --- Commit 68c592ea43d30754ec07c42cf10563fe9db185ae in nifi's branch refs/heads/master from [~ozhurakousky] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=68c592e ] NIFI-1280 added support for RecordSchema in SchemaRegistry Signed-off-by: Mark PayneSigned-off-by: Matt Burgess NIFI-1280: Updated SimpleKeyValueSchemaRegistry to make use of new CHOICE RecordFieldType - Update Record Readers to use SchemaRegistry controller service. Moved SchemaRegistry api into its own maven module and added to standard-services-api so that we can properly add dependencies on it. Code cleanup and bug fixes Signed-off-by: Matt Burgess NIFI-1280: Fixed checkstyle violations and license exclusions for RAT plugin Signed-off-by: Matt Burgess NIFI-1280: Addressed feedback from PR Review Signed-off-by: Matt Burgess NIFI-1280: Additional changes/doc to support QueryFlowFile and Record Readers/Writers This closes #1652 > Create QueryFlowFile Processor > -- > > Key: NIFI-1280 > URL: https://issues.apache.org/jira/browse/NIFI-1280 > Project: Apache NiFi > Issue Type: Task > Components: Extensions >Reporter: Mark Payne >Assignee: Mark Payne > Fix For: 1.2.0 > > Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml > > > We should have a Processor that allows users to easily filter out specific > columns from CSV data. For instance, a user would configure two different > properties: "Columns of Interest" (a comma-separated list of column indexes) > and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns). > We can do this today with ReplaceText, but it is far more difficult than it > would be with this Processor, as the user has to use Regular Expressions, > etc. with ReplaceText. > Eventually a Custom UI could even be built that allows a user to upload a > Sample CSV and choose which columns from there, similar to the way that Excel > works when importing CSV by dragging and selecting the desired columns? That > would certainly be a larger undertaking and would not need to be done for an > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15965134#comment-15965134 ] ASF subversion and git services commented on NIFI-1280: --- Commit 68c592ea43d30754ec07c42cf10563fe9db185ae in nifi's branch refs/heads/master from [~ozhurakousky] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=68c592e ] NIFI-1280 added support for RecordSchema in SchemaRegistry Signed-off-by: Mark PayneSigned-off-by: Matt Burgess NIFI-1280: Updated SimpleKeyValueSchemaRegistry to make use of new CHOICE RecordFieldType - Update Record Readers to use SchemaRegistry controller service. Moved SchemaRegistry api into its own maven module and added to standard-services-api so that we can properly add dependencies on it. Code cleanup and bug fixes Signed-off-by: Matt Burgess NIFI-1280: Fixed checkstyle violations and license exclusions for RAT plugin Signed-off-by: Matt Burgess NIFI-1280: Addressed feedback from PR Review Signed-off-by: Matt Burgess NIFI-1280: Additional changes/doc to support QueryFlowFile and Record Readers/Writers This closes #1652 > Create QueryFlowFile Processor > -- > > Key: NIFI-1280 > URL: https://issues.apache.org/jira/browse/NIFI-1280 > Project: Apache NiFi > Issue Type: Task > Components: Extensions >Reporter: Mark Payne >Assignee: Mark Payne > Fix For: 1.2.0 > > Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml > > > We should have a Processor that allows users to easily filter out specific > columns from CSV data. For instance, a user would configure two different > properties: "Columns of Interest" (a comma-separated list of column indexes) > and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns). > We can do this today with ReplaceText, but it is far more difficult than it > would be with this Processor, as the user has to use Regular Expressions, > etc. with ReplaceText. > Eventually a Custom UI could even be built that allows a user to upload a > Sample CSV and choose which columns from there, similar to the way that Excel > works when importing CSV by dragging and selecting the desired columns? That > would certainly be a larger undertaking and would not need to be done for an > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15965136#comment-15965136 ] ASF subversion and git services commented on NIFI-1280: --- Commit 68c592ea43d30754ec07c42cf10563fe9db185ae in nifi's branch refs/heads/master from [~ozhurakousky] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=68c592e ] NIFI-1280 added support for RecordSchema in SchemaRegistry Signed-off-by: Mark PayneSigned-off-by: Matt Burgess NIFI-1280: Updated SimpleKeyValueSchemaRegistry to make use of new CHOICE RecordFieldType - Update Record Readers to use SchemaRegistry controller service. Moved SchemaRegistry api into its own maven module and added to standard-services-api so that we can properly add dependencies on it. Code cleanup and bug fixes Signed-off-by: Matt Burgess NIFI-1280: Fixed checkstyle violations and license exclusions for RAT plugin Signed-off-by: Matt Burgess NIFI-1280: Addressed feedback from PR Review Signed-off-by: Matt Burgess NIFI-1280: Additional changes/doc to support QueryFlowFile and Record Readers/Writers This closes #1652 > Create QueryFlowFile Processor > -- > > Key: NIFI-1280 > URL: https://issues.apache.org/jira/browse/NIFI-1280 > Project: Apache NiFi > Issue Type: Task > Components: Extensions >Reporter: Mark Payne >Assignee: Mark Payne > Fix For: 1.2.0 > > Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml > > > We should have a Processor that allows users to easily filter out specific > columns from CSV data. For instance, a user would configure two different > properties: "Columns of Interest" (a comma-separated list of column indexes) > and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns). > We can do this today with ReplaceText, but it is far more difficult than it > would be with this Processor, as the user has to use Regular Expressions, > etc. with ReplaceText. > Eventually a Custom UI could even be built that allows a user to upload a > Sample CSV and choose which columns from there, similar to the way that Excel > works when importing CSV by dragging and selecting the desired columns? That > would certainly be a larger undertaking and would not need to be done for an > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15965132#comment-15965132 ] ASF subversion and git services commented on NIFI-1280: --- Commit 68c592ea43d30754ec07c42cf10563fe9db185ae in nifi's branch refs/heads/master from [~ozhurakousky] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=68c592e ] NIFI-1280 added support for RecordSchema in SchemaRegistry Signed-off-by: Mark PayneSigned-off-by: Matt Burgess NIFI-1280: Updated SimpleKeyValueSchemaRegistry to make use of new CHOICE RecordFieldType - Update Record Readers to use SchemaRegistry controller service. Moved SchemaRegistry api into its own maven module and added to standard-services-api so that we can properly add dependencies on it. Code cleanup and bug fixes Signed-off-by: Matt Burgess NIFI-1280: Fixed checkstyle violations and license exclusions for RAT plugin Signed-off-by: Matt Burgess NIFI-1280: Addressed feedback from PR Review Signed-off-by: Matt Burgess NIFI-1280: Additional changes/doc to support QueryFlowFile and Record Readers/Writers This closes #1652 > Create QueryFlowFile Processor > -- > > Key: NIFI-1280 > URL: https://issues.apache.org/jira/browse/NIFI-1280 > Project: Apache NiFi > Issue Type: Task > Components: Extensions >Reporter: Mark Payne >Assignee: Mark Payne > Fix For: 1.2.0 > > Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml > > > We should have a Processor that allows users to easily filter out specific > columns from CSV data. For instance, a user would configure two different > properties: "Columns of Interest" (a comma-separated list of column indexes) > and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns). > We can do this today with ReplaceText, but it is far more difficult than it > would be with this Processor, as the user has to use Regular Expressions, > etc. with ReplaceText. > Eventually a Custom UI could even be built that allows a user to upload a > Sample CSV and choose which columns from there, similar to the way that Excel > works when importing CSV by dragging and selecting the desired columns? That > would certainly be a larger undertaking and would not need to be done for an > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15965129#comment-15965129 ] ASF GitHub Bot commented on NIFI-1280: -- Github user mattyb149 commented on the issue: https://github.com/apache/nifi/pull/1652 +1 LGTM ran a full build with unit tests and contrib-check, tried various RecordReaders and Writers and QueryFlowFile queries, great work! Merging to master > Create QueryFlowFile Processor > -- > > Key: NIFI-1280 > URL: https://issues.apache.org/jira/browse/NIFI-1280 > Project: Apache NiFi > Issue Type: Task > Components: Extensions >Reporter: Mark Payne >Assignee: Mark Payne > Fix For: 1.2.0 > > Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml > > > We should have a Processor that allows users to easily filter out specific > columns from CSV data. For instance, a user would configure two different > properties: "Columns of Interest" (a comma-separated list of column indexes) > and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns). > We can do this today with ReplaceText, but it is far more difficult than it > would be with this Processor, as the user has to use Regular Expressions, > etc. with ReplaceText. > Eventually a Custom UI could even be built that allows a user to upload a > Sample CSV and choose which columns from there, similar to the way that Excel > works when importing CSV by dragging and selecting the desired columns? That > would certainly be a larger undertaking and would not need to be done for an > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15965131#comment-15965131 ] ASF subversion and git services commented on NIFI-1280: --- Commit a88d3bfa3c53d9cbe375f2b89eaa9248eb92df29 in nifi's branch refs/heads/master from [~markap14] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=a88d3bf ] NIFI-1280: Refactoring to make more generic so that other data types can be supported; created InputStreams to content on-demand so that multiple passes can be made over FlowFile content if required. Created new Controller Services for reading and writing specific data types Signed-off-by: Matt Burgess> Create QueryFlowFile Processor > -- > > Key: NIFI-1280 > URL: https://issues.apache.org/jira/browse/NIFI-1280 > Project: Apache NiFi > Issue Type: Task > Components: Extensions >Reporter: Mark Payne >Assignee: Mark Payne > Fix For: 1.2.0 > > Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml > > > We should have a Processor that allows users to easily filter out specific > columns from CSV data. For instance, a user would configure two different > properties: "Columns of Interest" (a comma-separated list of column indexes) > and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns). > We can do this today with ReplaceText, but it is far more difficult than it > would be with this Processor, as the user has to use Regular Expressions, > etc. with ReplaceText. > Eventually a Custom UI could even be built that allows a user to upload a > Sample CSV and choose which columns from there, similar to the way that Excel > works when importing CSV by dragging and selecting the desired columns? That > would certainly be a larger undertaking and would not need to be done for an > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15965133#comment-15965133 ] ASF subversion and git services commented on NIFI-1280: --- Commit 68c592ea43d30754ec07c42cf10563fe9db185ae in nifi's branch refs/heads/master from [~ozhurakousky] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=68c592e ] NIFI-1280 added support for RecordSchema in SchemaRegistry Signed-off-by: Mark PayneSigned-off-by: Matt Burgess NIFI-1280: Updated SimpleKeyValueSchemaRegistry to make use of new CHOICE RecordFieldType - Update Record Readers to use SchemaRegistry controller service. Moved SchemaRegistry api into its own maven module and added to standard-services-api so that we can properly add dependencies on it. Code cleanup and bug fixes Signed-off-by: Matt Burgess NIFI-1280: Fixed checkstyle violations and license exclusions for RAT plugin Signed-off-by: Matt Burgess NIFI-1280: Addressed feedback from PR Review Signed-off-by: Matt Burgess NIFI-1280: Additional changes/doc to support QueryFlowFile and Record Readers/Writers This closes #1652 > Create QueryFlowFile Processor > -- > > Key: NIFI-1280 > URL: https://issues.apache.org/jira/browse/NIFI-1280 > Project: Apache NiFi > Issue Type: Task > Components: Extensions >Reporter: Mark Payne >Assignee: Mark Payne > Fix For: 1.2.0 > > Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml > > > We should have a Processor that allows users to easily filter out specific > columns from CSV data. For instance, a user would configure two different > properties: "Columns of Interest" (a comma-separated list of column indexes) > and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns). > We can do this today with ReplaceText, but it is far more difficult than it > would be with this Processor, as the user has to use Regular Expressions, > etc. with ReplaceText. > Eventually a Custom UI could even be built that allows a user to upload a > Sample CSV and choose which columns from there, similar to the way that Excel > works when importing CSV by dragging and selecting the desired columns? That > would certainly be a larger undertaking and would not need to be done for an > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962902#comment-15962902 ] ASF GitHub Bot commented on NIFI-1280: -- Github user markap14 commented on the issue: https://github.com/apache/nifi/pull/1652 ok @olegz @mattyb149 I have submitted a new commit that addresses all of the feedback above. > Create QueryFlowFile Processor > -- > > Key: NIFI-1280 > URL: https://issues.apache.org/jira/browse/NIFI-1280 > Project: Apache NiFi > Issue Type: Task > Components: Extensions >Reporter: Mark Payne >Assignee: Mark Payne > Fix For: 1.2.0 > > Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml > > > We should have a Processor that allows users to easily filter out specific > columns from CSV data. For instance, a user would configure two different > properties: "Columns of Interest" (a comma-separated list of column indexes) > and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns). > We can do this today with ReplaceText, but it is far more difficult than it > would be with this Processor, as the user has to use Regular Expressions, > etc. with ReplaceText. > Eventually a Custom UI could even be built that allows a user to upload a > Sample CSV and choose which columns from there, similar to the way that Excel > works when importing CSV by dragging and selecting the desired columns? That > would certainly be a larger undertaking and would not need to be done for an > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962814#comment-15962814 ] ASF GitHub Bot commented on NIFI-1280: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1652#discussion_r110647631 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java --- @@ -2201,7 +2202,10 @@ public int read(final byte[] b, final int off, final int len) throws IOException @Override public void close() throws IOException { -StandardProcessSession.this.bytesRead += countingStream.getBytesRead(); +if (!closed) { +StandardProcessSession.this.bytesRead += countingStream.getBytesRead(); +closed = true; +} --- End diff -- Agreed, it is not thread-safe. But ProcessSession is not intended to be thread-safe :) > Create QueryFlowFile Processor > -- > > Key: NIFI-1280 > URL: https://issues.apache.org/jira/browse/NIFI-1280 > Project: Apache NiFi > Issue Type: Task > Components: Extensions >Reporter: Mark Payne >Assignee: Mark Payne > Fix For: 1.2.0 > > Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml > > > We should have a Processor that allows users to easily filter out specific > columns from CSV data. For instance, a user would configure two different > properties: "Columns of Interest" (a comma-separated list of column indexes) > and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns). > We can do this today with ReplaceText, but it is far more difficult than it > would be with this Processor, as the user has to use Regular Expressions, > etc. with ReplaceText. > Eventually a Custom UI could even be built that allows a user to upload a > Sample CSV and choose which columns from there, similar to the way that Excel > works when importing CSV by dragging and selecting the desired columns? That > would certainly be a larger undertaking and would not need to be done for an > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962811#comment-15962811 ] ASF GitHub Bot commented on NIFI-1280: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1652#discussion_r110647404 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java --- @@ -0,0 +1,550 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import org.apache.calcite.config.CalciteConnectionProperty; +import org.apache.calcite.config.Lex; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.DynamicRelationship; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.queryflowfile.FlowFileTable; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.RowRecordReaderFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.ResultSetRecordSet; +import org.apache.nifi.util.StopWatch; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"sql", "query", "calcite", "route", "record", "transform", "select", "update", "modify", "etl", "filter", "record", "csv", "json", "logs", "text", "avro", "aggregate"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Evaluates one or more SQL queries against the contents of a FlowFile. The result of the " ++ "SQL query then becomes the content of the output FlowFile. This can be used, for
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962813#comment-15962813 ] ASF GitHub Bot commented on NIFI-1280: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1652#discussion_r110647534 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.json; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.RecordDataType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; +import org.codehaus.jackson.JsonNode; + +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.PathNotFoundException; +import com.jayway.jsonpath.spi.json.JacksonJsonProvider; + +public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader { +private static final Configuration STRICT_PROVIDER_CONFIGURATION = Configuration.builder().jsonProvider(new JacksonJsonProvider()).build(); + +private final LinkedHashMapjsonPaths; +private final InputStream in; +private RecordSchema schema; +private final String dateFormat; +private final String timeFormat; +private final String timestampFormat; + +public JsonPathRowRecordReader(final LinkedHashMap jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog logger, +final String dateFormat, final String timeFormat, final String timestampFormat) +throws MalformedRecordException, IOException { +super(in, logger); + +this.dateFormat = dateFormat; +this.timeFormat = timeFormat; +this.timestampFormat = timestampFormat; + +this.schema = schema; +this.jsonPaths = jsonPaths; +this.in = in; +} + +@Override +public void close() throws IOException { +in.close(); +} + +@Override +public RecordSchema getSchema() { +return schema; +} + +@Override +protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema) throws IOException { +if (jsonNode == null) { +return null; +} + +final DocumentContext ctx = JsonPath.using(STRICT_PROVIDER_CONFIGURATION).parse(jsonNode.toString()); +final Map values = new HashMap<>(schema.getFieldCount()); + +for (final Map.Entry entry : jsonPaths.entrySet()) { +final String fieldName = entry.getKey(); +final DataType desiredType = schema.getDataType(fieldName).orElse(null); +if (desiredType == null) { +continue; +} + +final JsonPath jsonPath = entry.getValue(); + +
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962805#comment-15962805 ] ASF GitHub Bot commented on NIFI-1280: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1652#discussion_r110646586 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.grok; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.time.FastDateFormat; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import io.thekraken.grok.api.Grok; +import io.thekraken.grok.api.GrokUtils; +import io.thekraken.grok.api.Match; + +public class GrokRecordReader implements RecordReader { +private final BufferedReader reader; +private final Grok grok; +private RecordSchema schema; + +private String nextLine; + +static final String STACK_TRACE_COLUMN_NAME = "STACK_TRACE"; +private static final Pattern STACK_TRACE_PATTERN = Pattern.compile( +"^\\s*(?:(?:|\\t)+at )|" ++ "(?:(?:|\\t)+\\[CIRCULAR REFERENCE\\:)|" ++ "(?:Caused by\\: )|" ++ "(?:Suppressed\\: )|" ++ "(?:\\s+... \\d+ (?:more|common frames? omitted)$)"); + +private static final FastDateFormat TIME_FORMAT_DATE; +private static final FastDateFormat TIME_FORMAT_TIME; +private static final FastDateFormat TIME_FORMAT_TIMESTAMP; + +static { +final TimeZone gmt = TimeZone.getTimeZone("GMT"); +TIME_FORMAT_DATE = FastDateFormat.getInstance("-MM-dd", gmt); +TIME_FORMAT_TIME = FastDateFormat.getInstance("HH:mm:ss", gmt); +TIME_FORMAT_TIMESTAMP = FastDateFormat.getInstance("-MM-dd HH:mm:ss", gmt); +} + +public GrokRecordReader(final InputStream in, final Grok grok, final RecordSchema schema) { +this.reader = new BufferedReader(new InputStreamReader(in)); +this.grok = grok; +this.schema = schema; +} + +@Override +public void close() throws IOException { +reader.close(); +} + +@Override +public Record nextRecord() throws IOException, MalformedRecordException { +final String line = nextLine == null ? reader.readLine() : nextLine; +nextLine = null; // ensure that we don't process nextLine again +if (line == null) { +return null; +} + +final RecordSchema schema = getSchema(); + +final Match match = grok.match(line); +match.captures(); +final MapvalueMap = match.toMap(); +if (valueMap.isEmpty()) { // We were unable to match the pattern so return an empty Object array. +return new
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962804#comment-15962804 ] ASF GitHub Bot commented on NIFI-1280: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1652#discussion_r110646390 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.grok; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.time.FastDateFormat; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import io.thekraken.grok.api.Grok; +import io.thekraken.grok.api.GrokUtils; +import io.thekraken.grok.api.Match; + +public class GrokRecordReader implements RecordReader { +private final BufferedReader reader; +private final Grok grok; +private RecordSchema schema; + +private String nextLine; + +static final String STACK_TRACE_COLUMN_NAME = "STACK_TRACE"; +private static final Pattern STACK_TRACE_PATTERN = Pattern.compile( +"^\\s*(?:(?:|\\t)+at )|" ++ "(?:(?:|\\t)+\\[CIRCULAR REFERENCE\\:)|" ++ "(?:Caused by\\: )|" ++ "(?:Suppressed\\: )|" ++ "(?:\\s+... \\d+ (?:more|common frames? omitted)$)"); + +private static final FastDateFormat TIME_FORMAT_DATE; +private static final FastDateFormat TIME_FORMAT_TIME; +private static final FastDateFormat TIME_FORMAT_TIMESTAMP; + +static { +final TimeZone gmt = TimeZone.getTimeZone("GMT"); +TIME_FORMAT_DATE = FastDateFormat.getInstance("-MM-dd", gmt); +TIME_FORMAT_TIME = FastDateFormat.getInstance("HH:mm:ss", gmt); +TIME_FORMAT_TIMESTAMP = FastDateFormat.getInstance("-MM-dd HH:mm:ss", gmt); +} + +public GrokRecordReader(final InputStream in, final Grok grok, final RecordSchema schema) { +this.reader = new BufferedReader(new InputStreamReader(in)); +this.grok = grok; +this.schema = schema; +} + +@Override +public void close() throws IOException { +reader.close(); +} + +@Override +public Record nextRecord() throws IOException, MalformedRecordException { +final String line = nextLine == null ? reader.readLine() : nextLine; +nextLine = null; // ensure that we don't process nextLine again +if (line == null) { +return null; +} + +final RecordSchema schema = getSchema(); + +final Match match = grok.match(line); +match.captures(); +final MapvalueMap = match.toMap(); +if (valueMap.isEmpty()) { // We were unable to match the pattern so return an empty Object array. +return new
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962800#comment-15962800 ] ASF GitHub Bot commented on NIFI-1280: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1652#discussion_r110646007 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.grok; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RowRecordReaderFactory; +import org.apache.nifi.serialization.SchemaRegistryRecordReader; +import org.apache.nifi.serialization.record.RecordSchema; + +import io.thekraken.grok.api.Grok; +import io.thekraken.grok.api.exception.GrokException; + +@Tags({"grok", "logs", "logfiles", "parse", "unstructured", "text", "record", "reader", "regex", "pattern", "logstash"}) +@CapabilityDescription("Provides a mechanism for reading unstructured text data, such as log files, and structuring the data " ++ "so that it can be processed. The service is configured using Grok patterns. " ++ "The service reads from a stream of data and splits each message that it finds into a separate Record, each containing the fields that are configured. " ++ "If a line in the input does not match the expected message pattern, the line of text is considered to be part of the previous " ++ "message, with the exception of stack traces. A stack trace that is found at the end of a log message is considered to be part " ++ "of the previous message but is added to the 'STACK_TRACE' field of the Record. If a record has no stack trace, it will have a NULL value " ++ "for the STACK_TRACE field. All fields that are parsed are considered to be of type String by default. If there is need to change the type of a field, " ++ "this can be accomplished by configuring the Schema Registry to use and adding the appropriate schema.") +public class GrokReader extends SchemaRegistryRecordReader implements RowRecordReaderFactory { +private volatile Grok grok; +private volatile boolean useSchemaRegistry; + +private static final String DEFAULT_PATTERN_NAME = "/default-grok-patterns.txt"; + +static final PropertyDescriptor PATTERN_FILE = new PropertyDescriptor.Builder() +.name("Grok Pattern File") +.description("Path to a file that contains Grok Patterns to use for parsing logs. If not specified, a built-in default Pattern file " ++ "will be used. If specified, all patterns in the given pattern file will override the default patterns. See the Controller Service's " ++ "Additional Details for a list of pre-defined patterns.") +.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) +.expressionLanguageSupported(true) +.required(false) +.build(); + +static final PropertyDescriptor GROK_EXPRESSION = new PropertyDescriptor.Builder() +.name("Grok Expression") +.description("Specifies the format of a log line in Grok format. This allows the Record
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962797#comment-15962797 ] ASF GitHub Bot commented on NIFI-1280: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1652#discussion_r110645869 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.csv; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.csv.CSVFormat; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.DateTimeTextRecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; + +@Tags({"csv", "result", "set", "recordset", "record", "writer", "serializer", "row", "tsv", "tab", "separated", "delimited"}) +@CapabilityDescription("Writes the contents of a RecordSet as CSV data. The first line written " ++ "will be the column names. All subsequent lines will be the values corresponding to those columns.") +public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements RecordSetWriterFactory { + +private volatile CSVFormat csvFormat; + +@Override +protected List getSupportedPropertyDescriptors() { +final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); +properties.add(CSVUtils.CSV_FORMAT); +properties.add(CSVUtils.VALUE_SEPARATOR); +properties.add(CSVUtils.QUOTE_CHAR); +properties.add(CSVUtils.ESCAPE_CHAR); +properties.add(CSVUtils.COMMENT_MARKER); +properties.add(CSVUtils.NULL_STRING); +properties.add(CSVUtils.TRIM_FIELDS); +properties.add(CSVUtils.QUOTE_MODE); +properties.add(CSVUtils.RECORD_SEPARATOR); +properties.add(CSVUtils.TRAILING_DELIMITER); +return properties; +} --- End diff -- Can do. > Create QueryFlowFile Processor > -- > > Key: NIFI-1280 > URL: https://issues.apache.org/jira/browse/NIFI-1280 > Project: Apache NiFi > Issue Type: Task > Components: Extensions >Reporter: Mark Payne >Assignee: Mark Payne > Fix For: 1.2.0 > > Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml > > > We should have a Processor that allows users to easily filter out specific > columns from CSV data. For instance, a user would configure two different > properties: "Columns of Interest" (a comma-separated list of column indexes) > and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns). > We can do this today with ReplaceText, but it is far more difficult than it > would be with this Processor, as the user has to use Regular Expressions, > etc. with ReplaceText. > Eventually a Custom UI could even be built that allows a user to upload a > Sample CSV and choose which columns from there, similar to the way that Excel > works when importing CSV by dragging and selecting the desired columns? That > would certainly be a larger undertaking and would not need to be done for an > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962793#comment-15962793 ] ASF GitHub Bot commented on NIFI-1280: -- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1652#discussion_r110645816 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/.gitignore --- @@ -0,0 +1 @@ +/bin/ --- End diff -- Yeah that was added by my IDE. Will fix. > Create QueryFlowFile Processor > -- > > Key: NIFI-1280 > URL: https://issues.apache.org/jira/browse/NIFI-1280 > Project: Apache NiFi > Issue Type: Task > Components: Extensions >Reporter: Mark Payne >Assignee: Mark Payne > Fix For: 1.2.0 > > Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml > > > We should have a Processor that allows users to easily filter out specific > columns from CSV data. For instance, a user would configure two different > properties: "Columns of Interest" (a comma-separated list of column indexes) > and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns). > We can do this today with ReplaceText, but it is far more difficult than it > would be with this Processor, as the user has to use Regular Expressions, > etc. with ReplaceText. > Eventually a Custom UI could even be built that allows a user to upload a > Sample CSV and choose which columns from there, similar to the way that Excel > works when importing CSV by dragging and selecting the desired columns? That > would certainly be a larger undertaking and would not need to be done for an > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962167#comment-15962167 ] ASF GitHub Bot commented on NIFI-1280: -- Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1652#discussion_r110541128 --- Diff: nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java --- @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schemaregistry.services; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.avro.LogicalType; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +@Tags({ "schema", "registry", "avro", "json", "csv" }) +@CapabilityDescription("Provides a service for registering and accessing schemas. You can register a schema " ++ "as a dynamic property where 'name' represents the schema name and 'value' represents the textual " ++ "representation of the actual schema following the syntax and semantics of Avro's Schema format.") +public class AvroSchemaRegistry extends AbstractControllerService implements SchemaRegistry { + +private final MapschemaNameToSchemaMap; + +private static final String LOGICAL_TYPE_DATE = "date"; +private static final String LOGICAL_TYPE_TIME_MILLIS = "time-millis"; +private static final String LOGICAL_TYPE_TIME_MICROS = "time-micros"; +private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = "timestamp-millis"; +private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros"; + + +public AvroSchemaRegistry() { +this.schemaNameToSchemaMap = new HashMap<>(); +} + +@OnEnabled +public void enable(ConfigurationContext configuratiponContext) throws InitializationException { + this.schemaNameToSchemaMap.putAll(configuratiponContext.getProperties().entrySet().stream() +.filter(propEntry -> propEntry.getKey().isDynamic()) +.collect(Collectors.toMap(propEntry -> propEntry.getKey().getName(), propEntry -> propEntry.getValue(; +} + +@Override +public String retrieveSchemaText(String schemaName) { +if (!this.schemaNameToSchemaMap.containsKey(schemaName)) { +throw new IllegalArgumentException("Failed to find schema; Name: '" + schemaName + "."); +} else { +return this.schemaNameToSchemaMap.get(schemaName); +} +} + +@Override +public String retrieveSchemaText(String schemaName, Map attributes) { +throw new UnsupportedOperationException("This version of schema registry does not " ++ "support this operation, since schemas are only identofied by name."); --- End diff -- Perhaps instead of throwing the exception we should just delegate to the
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962169#comment-15962169 ] ASF GitHub Bot commented on NIFI-1280: -- Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1652#discussion_r110541076 --- Diff: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java --- @@ -2201,7 +2202,10 @@ public int read(final byte[] b, final int off, final int len) throws IOException @Override public void close() throws IOException { -StandardProcessSession.this.bytesRead += countingStream.getBytesRead(); +if (!closed) { +StandardProcessSession.this.bytesRead += countingStream.getBytesRead(); +closed = true; +} --- End diff -- Technically the above is not thread safe, consider adding some synchronization. > Create QueryFlowFile Processor > -- > > Key: NIFI-1280 > URL: https://issues.apache.org/jira/browse/NIFI-1280 > Project: Apache NiFi > Issue Type: Task > Components: Extensions >Reporter: Mark Payne >Assignee: Mark Payne > Fix For: 1.2.0 > > Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml > > > We should have a Processor that allows users to easily filter out specific > columns from CSV data. For instance, a user would configure two different > properties: "Columns of Interest" (a comma-separated list of column indexes) > and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns). > We can do this today with ReplaceText, but it is far more difficult than it > would be with this Processor, as the user has to use Regular Expressions, > etc. with ReplaceText. > Eventually a Custom UI could even be built that allows a user to upload a > Sample CSV and choose which columns from there, similar to the way that Excel > works when importing CSV by dragging and selecting the desired columns? That > would certainly be a larger undertaking and would not need to be done for an > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962173#comment-15962173 ] ASF GitHub Bot commented on NIFI-1280: -- Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1652#discussion_r110541830 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.grok; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.time.FastDateFormat; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import io.thekraken.grok.api.Grok; +import io.thekraken.grok.api.GrokUtils; +import io.thekraken.grok.api.Match; + +public class GrokRecordReader implements RecordReader { +private final BufferedReader reader; +private final Grok grok; +private RecordSchema schema; + +private String nextLine; + +static final String STACK_TRACE_COLUMN_NAME = "STACK_TRACE"; +private static final Pattern STACK_TRACE_PATTERN = Pattern.compile( +"^\\s*(?:(?:|\\t)+at )|" ++ "(?:(?:|\\t)+\\[CIRCULAR REFERENCE\\:)|" ++ "(?:Caused by\\: )|" ++ "(?:Suppressed\\: )|" ++ "(?:\\s+... \\d+ (?:more|common frames? omitted)$)"); + +private static final FastDateFormat TIME_FORMAT_DATE; +private static final FastDateFormat TIME_FORMAT_TIME; +private static final FastDateFormat TIME_FORMAT_TIMESTAMP; + +static { +final TimeZone gmt = TimeZone.getTimeZone("GMT"); +TIME_FORMAT_DATE = FastDateFormat.getInstance("-MM-dd", gmt); +TIME_FORMAT_TIME = FastDateFormat.getInstance("HH:mm:ss", gmt); +TIME_FORMAT_TIMESTAMP = FastDateFormat.getInstance("-MM-dd HH:mm:ss", gmt); +} + +public GrokRecordReader(final InputStream in, final Grok grok, final RecordSchema schema) { +this.reader = new BufferedReader(new InputStreamReader(in)); +this.grok = grok; +this.schema = schema; +} + +@Override +public void close() throws IOException { +reader.close(); +} + +@Override +public Record nextRecord() throws IOException, MalformedRecordException { +final String line = nextLine == null ? reader.readLine() : nextLine; +nextLine = null; // ensure that we don't process nextLine again +if (line == null) { +return null; +} + +final RecordSchema schema = getSchema(); + +final Match match = grok.match(line); +match.captures(); +final MapvalueMap = match.toMap(); +if (valueMap.isEmpty()) { // We were unable to match the pattern so return an empty Object array. +return new
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962170#comment-15962170 ] ASF GitHub Bot commented on NIFI-1280: -- Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1652#discussion_r110541217 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java --- @@ -0,0 +1,550 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import org.apache.calcite.config.CalciteConnectionProperty; +import org.apache.calcite.config.Lex; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.DynamicRelationship; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.queryflowfile.FlowFileTable; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.RowRecordReaderFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.ResultSetRecordSet; +import org.apache.nifi.util.StopWatch; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"sql", "query", "calcite", "route", "record", "transform", "select", "update", "modify", "etl", "filter", "record", "csv", "json", "logs", "text", "avro", "aggregate"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Evaluates one or more SQL queries against the contents of a FlowFile. The result of the " ++ "SQL query then becomes the content of the output FlowFile. This can be used, for example, "
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962168#comment-15962168 ] ASF GitHub Bot commented on NIFI-1280: -- Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1652#discussion_r110541764 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.json; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.RecordDataType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; +import org.codehaus.jackson.JsonNode; + +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.PathNotFoundException; +import com.jayway.jsonpath.spi.json.JacksonJsonProvider; + +public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader { +private static final Configuration STRICT_PROVIDER_CONFIGURATION = Configuration.builder().jsonProvider(new JacksonJsonProvider()).build(); + +private final LinkedHashMapjsonPaths; +private final InputStream in; +private RecordSchema schema; +private final String dateFormat; +private final String timeFormat; +private final String timestampFormat; + +public JsonPathRowRecordReader(final LinkedHashMap jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog logger, +final String dateFormat, final String timeFormat, final String timestampFormat) +throws MalformedRecordException, IOException { +super(in, logger); + +this.dateFormat = dateFormat; +this.timeFormat = timeFormat; +this.timestampFormat = timestampFormat; + +this.schema = schema; +this.jsonPaths = jsonPaths; +this.in = in; +} + +@Override +public void close() throws IOException { +in.close(); +} + +@Override +public RecordSchema getSchema() { +return schema; +} + +@Override +protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema) throws IOException { +if (jsonNode == null) { +return null; +} + +final DocumentContext ctx = JsonPath.using(STRICT_PROVIDER_CONFIGURATION).parse(jsonNode.toString()); +final Map values = new HashMap<>(schema.getFieldCount()); + +for (final Map.Entry entry : jsonPaths.entrySet()) { +final String fieldName = entry.getKey(); +final DataType desiredType = schema.getDataType(fieldName).orElse(null); +if (desiredType == null) { +continue; +} + +final JsonPath jsonPath = entry.getValue(); + +Object
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962166#comment-15962166 ] ASF GitHub Bot commented on NIFI-1280: -- Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1652#discussion_r110544739 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java --- @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.grok; + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RowRecordReaderFactory; +import org.apache.nifi.serialization.SchemaRegistryRecordReader; +import org.apache.nifi.serialization.record.RecordSchema; + +import io.thekraken.grok.api.Grok; +import io.thekraken.grok.api.exception.GrokException; + +@Tags({"grok", "logs", "logfiles", "parse", "unstructured", "text", "record", "reader", "regex", "pattern", "logstash"}) +@CapabilityDescription("Provides a mechanism for reading unstructured text data, such as log files, and structuring the data " ++ "so that it can be processed. The service is configured using Grok patterns. " ++ "The service reads from a stream of data and splits each message that it finds into a separate Record, each containing the fields that are configured. " ++ "If a line in the input does not match the expected message pattern, the line of text is considered to be part of the previous " ++ "message, with the exception of stack traces. A stack trace that is found at the end of a log message is considered to be part " ++ "of the previous message but is added to the 'STACK_TRACE' field of the Record. If a record has no stack trace, it will have a NULL value " ++ "for the STACK_TRACE field. All fields that are parsed are considered to be of type String by default. If there is need to change the type of a field, " ++ "this can be accomplished by configuring the Schema Registry to use and adding the appropriate schema.") +public class GrokReader extends SchemaRegistryRecordReader implements RowRecordReaderFactory { +private volatile Grok grok; +private volatile boolean useSchemaRegistry; + +private static final String DEFAULT_PATTERN_NAME = "/default-grok-patterns.txt"; + +static final PropertyDescriptor PATTERN_FILE = new PropertyDescriptor.Builder() +.name("Grok Pattern File") +.description("Path to a file that contains Grok Patterns to use for parsing logs. If not specified, a built-in default Pattern file " ++ "will be used. If specified, all patterns in the given pattern file will override the default patterns. See the Controller Service's " ++ "Additional Details for a list of pre-defined patterns.") +.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) +.expressionLanguageSupported(true) +.required(false) +.build(); + +static final PropertyDescriptor GROK_EXPRESSION = new PropertyDescriptor.Builder() +.name("Grok Expression") +.description("Specifies the format of a log line in Grok format. This allows the Record
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962172#comment-15962172 ] ASF GitHub Bot commented on NIFI-1280: -- Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1652#discussion_r110541788 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java --- @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.grok; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TimeZone; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.time.FastDateFormat; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; + +import io.thekraken.grok.api.Grok; +import io.thekraken.grok.api.GrokUtils; +import io.thekraken.grok.api.Match; + +public class GrokRecordReader implements RecordReader { +private final BufferedReader reader; +private final Grok grok; +private RecordSchema schema; + +private String nextLine; + +static final String STACK_TRACE_COLUMN_NAME = "STACK_TRACE"; +private static final Pattern STACK_TRACE_PATTERN = Pattern.compile( +"^\\s*(?:(?:|\\t)+at )|" ++ "(?:(?:|\\t)+\\[CIRCULAR REFERENCE\\:)|" ++ "(?:Caused by\\: )|" ++ "(?:Suppressed\\: )|" ++ "(?:\\s+... \\d+ (?:more|common frames? omitted)$)"); + +private static final FastDateFormat TIME_FORMAT_DATE; +private static final FastDateFormat TIME_FORMAT_TIME; +private static final FastDateFormat TIME_FORMAT_TIMESTAMP; + +static { +final TimeZone gmt = TimeZone.getTimeZone("GMT"); +TIME_FORMAT_DATE = FastDateFormat.getInstance("-MM-dd", gmt); +TIME_FORMAT_TIME = FastDateFormat.getInstance("HH:mm:ss", gmt); +TIME_FORMAT_TIMESTAMP = FastDateFormat.getInstance("-MM-dd HH:mm:ss", gmt); +} + +public GrokRecordReader(final InputStream in, final Grok grok, final RecordSchema schema) { +this.reader = new BufferedReader(new InputStreamReader(in)); +this.grok = grok; +this.schema = schema; +} + +@Override +public void close() throws IOException { +reader.close(); +} + +@Override +public Record nextRecord() throws IOException, MalformedRecordException { +final String line = nextLine == null ? reader.readLine() : nextLine; +nextLine = null; // ensure that we don't process nextLine again +if (line == null) { +return null; +} + +final RecordSchema schema = getSchema(); + +final Match match = grok.match(line); +match.captures(); +final MapvalueMap = match.toMap(); +if (valueMap.isEmpty()) { // We were unable to match the pattern so return an empty Object array. +return new
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962174#comment-15962174 ] ASF GitHub Bot commented on NIFI-1280: -- Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1652#discussion_r110544779 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.csv; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.csv.CSVFormat; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.DateTimeTextRecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; + +@Tags({"csv", "result", "set", "recordset", "record", "writer", "serializer", "row", "tsv", "tab", "separated", "delimited"}) +@CapabilityDescription("Writes the contents of a RecordSet as CSV data. The first line written " ++ "will be the column names. All subsequent lines will be the values corresponding to those columns.") +public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements RecordSetWriterFactory { + +private volatile CSVFormat csvFormat; + +@Override +protected List getSupportedPropertyDescriptors() { +final List properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); +properties.add(CSVUtils.CSV_FORMAT); +properties.add(CSVUtils.VALUE_SEPARATOR); +properties.add(CSVUtils.QUOTE_CHAR); +properties.add(CSVUtils.ESCAPE_CHAR); +properties.add(CSVUtils.COMMENT_MARKER); +properties.add(CSVUtils.NULL_STRING); +properties.add(CSVUtils.TRIM_FIELDS); +properties.add(CSVUtils.QUOTE_MODE); +properties.add(CSVUtils.RECORD_SEPARATOR); +properties.add(CSVUtils.TRAILING_DELIMITER); +return properties; +} --- End diff -- Similar as before. This will be called at least 3 times, so may be static initializer. This pattern appears in many places throughout this PR so consider fixing it in many places. > Create QueryFlowFile Processor > -- > > Key: NIFI-1280 > URL: https://issues.apache.org/jira/browse/NIFI-1280 > Project: Apache NiFi > Issue Type: Task > Components: Extensions >Reporter: Mark Payne >Assignee: Mark Payne > Fix For: 1.2.0 > > Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml > > > We should have a Processor that allows users to easily filter out specific > columns from CSV data. For instance, a user would configure two different > properties: "Columns of Interest" (a comma-separated list of column indexes) > and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns). > We can do this today with ReplaceText, but it is far more difficult than it > would be with this Processor, as the user has to use Regular Expressions, > etc. with ReplaceText. > Eventually a Custom UI could even be built that allows a user to upload a > Sample CSV and choose which columns from there, similar to the way that Excel > works when importing CSV by dragging and selecting the desired columns? That > would certainly be a larger undertaking and would not need to be done for an > initial implementation. -- This message was sent by Atlassian JIRA
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15962171#comment-15962171 ] ASF GitHub Bot commented on NIFI-1280: -- Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/1652#discussion_r110544874 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/.gitignore --- @@ -0,0 +1 @@ +/bin/ --- End diff -- I think it's covered by the global .gitignore > Create QueryFlowFile Processor > -- > > Key: NIFI-1280 > URL: https://issues.apache.org/jira/browse/NIFI-1280 > Project: Apache NiFi > Issue Type: Task > Components: Extensions >Reporter: Mark Payne >Assignee: Mark Payne > Fix For: 1.2.0 > > Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml > > > We should have a Processor that allows users to easily filter out specific > columns from CSV data. For instance, a user would configure two different > properties: "Columns of Interest" (a comma-separated list of column indexes) > and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns). > We can do this today with ReplaceText, but it is far more difficult than it > would be with this Processor, as the user has to use Regular Expressions, > etc. with ReplaceText. > Eventually a Custom UI could even be built that allows a user to upload a > Sample CSV and choose which columns from there, similar to the way that Excel > works when importing CSV by dragging and selecting the desired columns? That > would certainly be a larger undertaking and would not need to be done for an > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15961299#comment-15961299 ] ASF GitHub Bot commented on NIFI-1280: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1652#discussion_r110460868 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.RowRecordReaderFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Test; + +public class TestQueryFlowFile { + +static { +System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); +System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); +System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard.SQLTransform", "debug"); +} + +private static final String REL_NAME = "success"; + +@Test +public void testSimple() throws InitializationException, IOException, SQLException { +final MockRecordParser parser = new MockRecordParser(); +parser.addSchemaField("name", RecordFieldType.STRING); +parser.addSchemaField("age", RecordFieldType.INT); +parser.addRecord("Tom", 49); + +final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\""); + +final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class); +runner.addControllerService("parser", parser); +runner.enableControllerService(parser); +runner.addControllerService("writer", writer); +runner.enableControllerService(writer); + +runner.setProperty(REL_NAME, "select name, age from FLOWFILE WHERE name <> ''"); +runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser"); +runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer"); + +final int numIterations = 1; +for (int i = 0; i < numIterations; i++) { +runner.enqueue(new byte[0]); +} + +runner.setThreadCount(4); +runner.run(2 * numIterations); + +runner.assertTransferCount(REL_NAME, 1); +final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0); +
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15961298#comment-15961298 ] ASF GitHub Bot commented on NIFI-1280: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1652#discussion_r110460794 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryFlowFile.java --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.RowRecordReaderFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Assert; +import org.junit.Test; + +public class TestQueryFlowFile { + +static { +System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "info"); +System.setProperty("org.slf4j.simpleLogger.showDateTime", "true"); +System.setProperty("org.slf4j.simpleLogger.log.nifi.io.nio", "debug"); + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard.SQLTransform", "debug"); +} + +private static final String REL_NAME = "success"; + +@Test +public void testSimple() throws InitializationException, IOException, SQLException { +final MockRecordParser parser = new MockRecordParser(); +parser.addSchemaField("name", RecordFieldType.STRING); +parser.addSchemaField("age", RecordFieldType.INT); +parser.addRecord("Tom", 49); + +final MockRecordWriter writer = new MockRecordWriter("\"name\",\"points\""); + +final TestRunner runner = TestRunners.newTestRunner(QueryFlowFile.class); +runner.addControllerService("parser", parser); +runner.enableControllerService(parser); +runner.addControllerService("writer", writer); +runner.enableControllerService(writer); + +runner.setProperty(REL_NAME, "select name, age from FLOWFILE WHERE name <> ''"); +runner.setProperty(QueryFlowFile.RECORD_READER_FACTORY, "parser"); +runner.setProperty(QueryFlowFile.RECORD_WRITER_FACTORY, "writer"); + +final int numIterations = 1; +for (int i = 0; i < numIterations; i++) { +runner.enqueue(new byte[0]); +} + +runner.setThreadCount(4); +runner.run(2 * numIterations); + +runner.assertTransferCount(REL_NAME, 1); +final MockFlowFile out = runner.getFlowFilesForRelationship(REL_NAME).get(0); +
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15960845#comment-15960845 ] ASF GitHub Bot commented on NIFI-1280: -- Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1652#discussion_r110397744 --- Diff: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryFlowFile.java --- @@ -0,0 +1,550 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.Closeable; +import java.io.IOException; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import org.apache.calcite.config.CalciteConnectionProperty; +import org.apache.calcite.config.Lex; +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.sql.parser.SqlParser; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.DynamicRelationship; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.queryflowfile.FlowFileTable; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.RowRecordReaderFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.ResultSetRecordSet; +import org.apache.nifi.util.StopWatch; + +@EventDriven +@SideEffectFree +@SupportsBatching +@Tags({"sql", "query", "calcite", "route", "record", "transform", "select", "update", "modify", "etl", "filter", "record", "csv", "json", "logs", "text", "avro", "aggregate"}) +@InputRequirement(Requirement.INPUT_REQUIRED) +@CapabilityDescription("Evaluates one or more SQL queries against the contents of a FlowFile. The result of the " ++ "SQL query then becomes the content of the output FlowFile. This can be used, for
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15960775#comment-15960775 ] ASF GitHub Bot commented on NIFI-1280: -- Github user markap14 commented on the issue: https://github.com/apache/nifi/pull/1652 @olegz thanks for reviewing! Just a heads up - I pushed a new commit to address some RAT exclusions and checkstyle violations -- i had forgotten to run the -Pcontrib-check before creating the PR. > Create QueryFlowFile Processor > -- > > Key: NIFI-1280 > URL: https://issues.apache.org/jira/browse/NIFI-1280 > Project: Apache NiFi > Issue Type: Task > Components: Extensions >Reporter: Mark Payne >Assignee: Mark Payne > Fix For: 1.2.0 > > Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml > > > We should have a Processor that allows users to easily filter out specific > columns from CSV data. For instance, a user would configure two different > properties: "Columns of Interest" (a comma-separated list of column indexes) > and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns). > We can do this today with ReplaceText, but it is far more difficult than it > would be with this Processor, as the user has to use Regular Expressions, > etc. with ReplaceText. > Eventually a Custom UI could even be built that allows a user to upload a > Sample CSV and choose which columns from there, similar to the way that Excel > works when importing CSV by dragging and selecting the desired columns? That > would certainly be a larger undertaking and would not need to be done for an > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15959541#comment-15959541 ] ASF GitHub Bot commented on NIFI-1280: -- Github user olegz commented on the issue: https://github.com/apache/nifi/pull/1652 Reviewing. . . > Create QueryFlowFile Processor > -- > > Key: NIFI-1280 > URL: https://issues.apache.org/jira/browse/NIFI-1280 > Project: Apache NiFi > Issue Type: Task > Components: Extensions >Reporter: Mark Payne >Assignee: Mark Payne > Fix For: 1.2.0 > > Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml > > > We should have a Processor that allows users to easily filter out specific > columns from CSV data. For instance, a user would configure two different > properties: "Columns of Interest" (a comma-separated list of column indexes) > and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns). > We can do this today with ReplaceText, but it is far more difficult than it > would be with this Processor, as the user has to use Regular Expressions, > etc. with ReplaceText. > Eventually a Custom UI could even be built that allows a user to upload a > Sample CSV and choose which columns from there, similar to the way that Excel > works when importing CSV by dragging and selecting the desired columns? That > would certainly be a larger undertaking and would not need to be done for an > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15959520#comment-15959520 ] Mark Payne commented on NIFI-1280: -- I've also attached a Template to aid in testing, since there's a lot going on here. It creates several controller services and processors. Would just need to enable the controller services and the processors run through several scenarios. > Create QueryFlowFile Processor > -- > > Key: NIFI-1280 > URL: https://issues.apache.org/jira/browse/NIFI-1280 > Project: Apache NiFi > Issue Type: Task > Components: Extensions >Reporter: Mark Payne >Assignee: Mark Payne > Fix For: 1.2.0 > > Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml > > > We should have a Processor that allows users to easily filter out specific > columns from CSV data. For instance, a user would configure two different > properties: "Columns of Interest" (a comma-separated list of column indexes) > and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns). > We can do this today with ReplaceText, but it is far more difficult than it > would be with this Processor, as the user has to use Regular Expressions, > etc. with ReplaceText. > Eventually a Custom UI could even be built that allows a user to upload a > Sample CSV and choose which columns from there, similar to the way that Excel > works when importing CSV by dragging and selecting the desired columns? That > would certainly be a larger undertaking and would not need to be done for an > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15959473#comment-15959473 ] Mark Payne commented on NIFI-1280: -- I've created a PR that I think is sufficient. There are a few more things that I would like to do, but this has dragged on long enough without me pushing anything, so I've pushed a PR so that people can review & hopefully get merged. Will create separate JIRA's for the remaining enhances that I would like to perform. The most significant is to allow more flexibility in choosing the schema to use. Rather than requiring a Schema Name be provided with a Schema registry would like to allow user to use an attribute or read schema from the content of the FlowFile itself in cases such as Avro. In addition, I want to add updates to include the schema on the outgoing records when appropriate. > Create QueryFlowFile Processor > -- > > Key: NIFI-1280 > URL: https://issues.apache.org/jira/browse/NIFI-1280 > Project: Apache NiFi > Issue Type: Task > Components: Extensions >Reporter: Mark Payne >Assignee: Mark Payne > Fix For: 1.2.0 > > > We should have a Processor that allows users to easily filter out specific > columns from CSV data. For instance, a user would configure two different > properties: "Columns of Interest" (a comma-separated list of column indexes) > and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns). > We can do this today with ReplaceText, but it is far more difficult than it > would be with this Processor, as the user has to use Regular Expressions, > etc. with ReplaceText. > Eventually a Custom UI could even be built that allows a user to upload a > Sample CSV and choose which columns from there, similar to the way that Excel > works when importing CSV by dragging and selecting the desired columns? That > would certainly be a larger undertaking and would not need to be done for an > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15959470#comment-15959470 ] ASF GitHub Bot commented on NIFI-1280: -- GitHub user markap14 opened a pull request: https://github.com/apache/nifi/pull/1652 NIFI-1280: Renamed FilterCSVColumns to QueryFlowFile; introduced readers and writers as controller services Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/markap14/nifi NIFI-1280 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/1652.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1652 commit 4a10c7826051a85d8fab506a76cfc4144d483f0c Author: Toivo AdamsDate: 2016-05-07T09:29:15Z NIFI-1280 Create FilterCSVColumns Processor. commit 4f262960dcd4e17bc77c78b50c979c3717ab40ec Author: Mark Payne Date: 2016-07-11T18:57:00Z NIFI-1280: Refactoring to make more generic so that other data types can be supported; created InputStreams to content on-demand so that multiple passes can be made over FlowFile content if required. Created new Controller Services for reading and writing specific data types commit 9def3d90374025a8e7172ac6d014f7b0ac644c4d Author: Oleg Zhurakousky Date: 2017-03-30T13:12:07Z NIFI-1280 added support for RecordSchema in SchemaRegistry Signed-off-by: Mark Payne commit f1b061988352bd556b4a7a49afaf777c7be95315 Author: Mark Payne Date: 2017-03-30T14:19:54Z NIFI-1280: Updated SimpleKeyValueSchemaRegistry to make use of new CHOICE RecordFieldType - Update Record Readers to use SchemaRegistry controller service. Moved SchemaRegistry api into its own maven module and added to standard-services-api so that we can properly add dependencies on it. Code cleanup and bug fixes > Create QueryFlowFile Processor > -- > > Key: NIFI-1280 > URL: https://issues.apache.org/jira/browse/NIFI-1280 > Project: Apache NiFi > Issue Type: Task > Components: Extensions >Reporter: Mark Payne >Assignee: Mark Payne > Fix For: 1.2.0 > > > We should have a Processor that allows users to easily filter out specific > columns from CSV data. For instance, a user would configure two different > properties: "Columns of Interest" (a comma-separated list of column indexes) > and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns). > We can do this today with ReplaceText, but it is far more difficult than it > would be with this Processor, as the user has to use Regular Expressions, > etc. with ReplaceText. > Eventually a Custom UI could even be built that allows a user to upload a > Sample CSV and choose which columns from there, similar to the way that Excel > works when importing CSV by dragging and selecting the desired
[jira] [Commented] (NIFI-1280) Create QueryFlowFile Processor
[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954024#comment-15954024 ] Mark Payne commented on NIFI-1280: -- I am getting pretty close to this being in good shape now. It's taken far longer than I ever expected, but that's because it's really been transforming a lot of how we handle different data formats in the process. It's really incorporated most all of the features proposed in https://cwiki.apache.org/confluence/display/NIFI/First-class+Avro+Support and fairly extensive mailing list conversations that have occurred around record-oriented data here: http://apache-nifi-developer-list.39713.n7.nabble.com/Looking-for-feedback-on-my-WIP-Design-td13097.html#none and in a couple of other emails. I need to do some cleanup and then I think I can get a PR in this week. I need to write some more unit tests and ensure that we are handling data type coercion consistently across the different readers/writers. I also need to update a bit how we are referencing schemas when appropriate. With these Record Readers and Writers, I think we can actually remove the TransformXToY processors that were brought in recently in the nifi-registry-processors module. [~ozhurakousky] do you agree? > Create QueryFlowFile Processor > -- > > Key: NIFI-1280 > URL: https://issues.apache.org/jira/browse/NIFI-1280 > Project: Apache NiFi > Issue Type: Task > Components: Extensions >Reporter: Mark Payne >Assignee: Mark Payne > Fix For: 1.2.0 > > > We should have a Processor that allows users to easily filter out specific > columns from CSV data. For instance, a user would configure two different > properties: "Columns of Interest" (a comma-separated list of column indexes) > and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns). > We can do this today with ReplaceText, but it is far more difficult than it > would be with this Processor, as the user has to use Regular Expressions, > etc. with ReplaceText. > Eventually a Custom UI could even be built that allows a user to upload a > Sample CSV and choose which columns from there, similar to the way that Excel > works when importing CSV by dragging and selecting the desired columns? That > would certainly be a larger undertaking and would not need to be done for an > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)