[jira] [Commented] (NIFI-5021) Move Elastic Search controller service API out of standard services
[ https://issues.apache.org/jira/browse/NIFI-5021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421920#comment-16421920 ] ASF GitHub Bot commented on NIFI-5021: -- Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/2586 @bbende @MikeThomsen since there is another RC of 1.6.0, did you want to merge this in? > Move Elastic Search controller service API out of standard services > --- > > Key: NIFI-5021 > URL: https://issues.apache.org/jira/browse/NIFI-5021 > Project: Apache NiFi > Issue Type: Improvement >Reporter: Bryan Bende >Assignee: Bryan Bende >Priority: Minor > > We should move the elastic search CS API out of standard services and into > it's own NAR under the ES bundle. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi issue #2586: NIFI-5021 Moving nifi-elasticsearch-client-service-api to ...
Github user JPercivall commented on the issue: https://github.com/apache/nifi/pull/2586 @bbende @MikeThomsen since there is another RC of 1.6.0, did you want to merge this in? ---
[jira] [Commented] (NIFI-4637) Add support for HBase visibility labels to HBase processors and controller services
[ https://issues.apache.org/jira/browse/NIFI-4637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421916#comment-16421916 ] ASF GitHub Bot commented on NIFI-4637: -- Github user joshelser commented on a diff in the pull request: https://github.com/apache/nifi/pull/2518#discussion_r178483377 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml --- @@ -79,6 +80,73 @@ test + +org.apache.hbase +hbase-server +1.3.1 +test-jar +test + + +org.apache.hbase +hbase-server +1.3.1 +test + + + +org.apache.hadoop +hadoop-hdfs +2.9.0 +test + + +org.apache.hadoop +hadoop-hdfs +test-jar +2.9.0 +test + + +org.apache.hadoop +hadoop-common +2.9.0 +test + + +org.apache.hadoop +hadoop-common +2.9.0 +test-jar +test + + +org.apache.hadoop +hadoop-auth +2.9.0 +test + + +org.apache.hbase +hbase-hadoop-compat +1.3.1 +test-jar +test + + +org.apache.hbase +hbase-hadoop2-compat +1.3.1 +test-jar +test + + +org.apache.nifi +nifi-hbase_1_1_2-client-service +1.6.0-SNAPSHOT --- End diff -- What I'm used to seeing is that, in dependencyManagement for the appropriate parent pom (in this case, https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-services/pom.xml), this GAV would be declared which would turn this local GAV into just GA. > Add support for HBase visibility labels to HBase processors and controller > services > --- > > Key: NIFI-4637 > URL: https://issues.apache.org/jira/browse/NIFI-4637 > Project: Apache NiFi > Issue Type: Improvement >Reporter: Mike Thomsen >Assignee: Mike Thomsen >Priority: Major > > HBase supports visibility labels, but you can't use them from NiFi because > there is no way to set them. The existing processors and services should be > upgraded to handle this capability. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #2518: NIFI-4637 Added support for visibility labels to th...
Github user joshelser commented on a diff in the pull request: https://github.com/apache/nifi/pull/2518#discussion_r178483377 --- Diff: nifi-nar-bundles/nifi-hbase-bundle/nifi-hbase-processors/pom.xml --- @@ -79,6 +80,73 @@ test + +org.apache.hbase +hbase-server +1.3.1 +test-jar +test + + +org.apache.hbase +hbase-server +1.3.1 +test + + + +org.apache.hadoop +hadoop-hdfs +2.9.0 +test + + +org.apache.hadoop +hadoop-hdfs +test-jar +2.9.0 +test + + +org.apache.hadoop +hadoop-common +2.9.0 +test + + +org.apache.hadoop +hadoop-common +2.9.0 +test-jar +test + + +org.apache.hadoop +hadoop-auth +2.9.0 +test + + +org.apache.hbase +hbase-hadoop-compat +1.3.1 +test-jar +test + + +org.apache.hbase +hbase-hadoop2-compat +1.3.1 +test-jar +test + + +org.apache.nifi +nifi-hbase_1_1_2-client-service +1.6.0-SNAPSHOT --- End diff -- What I'm used to seeing is that, in dependencyManagement for the appropriate parent pom (in this case, https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-services/pom.xml), this GAV would be declared which would turn this local GAV into just GA. ---
[GitHub] nifi issue #2518: NIFI-4637 Added support for visibility labels to the HBase...
Github user joshelser commented on the issue: https://github.com/apache/nifi/pull/2518 > Would you be open to a code review in the future (months away at least) if I were to start adding Accumulo support? Feel free to hit me up :) ---
[jira] [Commented] (NIFI-4637) Add support for HBase visibility labels to HBase processors and controller services
[ https://issues.apache.org/jira/browse/NIFI-4637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421911#comment-16421911 ] ASF GitHub Bot commented on NIFI-4637: -- Github user joshelser commented on the issue: https://github.com/apache/nifi/pull/2518 > Would you be open to a code review in the future (months away at least) if I were to start adding Accumulo support? Feel free to hit me up :) > Add support for HBase visibility labels to HBase processors and controller > services > --- > > Key: NIFI-4637 > URL: https://issues.apache.org/jira/browse/NIFI-4637 > Project: Apache NiFi > Issue Type: Improvement >Reporter: Mike Thomsen >Assignee: Mike Thomsen >Priority: Major > > HBase supports visibility labels, but you can't use them from NiFi because > there is no way to set them. The existing processors and services should be > upgraded to handle this capability. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4932) Enable S2S work behind a Reverse Proxy
[ https://issues.apache.org/jira/browse/NIFI-4932?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421903#comment-16421903 ] ASF GitHub Bot commented on NIFI-4932: -- Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/2510 @mcgilman Thanks for clarifying the String manipulation exception, I just didn't have enough imagination to come up with such invalid inputs. I switched to use regex to check and parse property keys. Now it should be more robust and provide more user friendly error messages. Also, added try/catch for EL parse failure. Thanks! > Enable S2S work behind a Reverse Proxy > -- > > Key: NIFI-4932 > URL: https://issues.apache.org/jira/browse/NIFI-4932 > Project: Apache NiFi > Issue Type: Improvement > Components: Core Framework >Reporter: Koji Kawamura >Assignee: Koji Kawamura >Priority: Major > > Currently, NiFi UI and REST API work through a reverse proxy, but NiFi > Site-to-Site does not. The core issue is how a NiFi node introduce remote > peers to Site-to-Site clients. NiFi should provide more flexible > configuration so that user can define remote Site-to-Site endpoints those can > work for both routes, through a reverse proxy, and directly. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi issue #2510: NIFI-4932: Enable S2S work behind a Reverse Proxy
Github user ijokarumawak commented on the issue: https://github.com/apache/nifi/pull/2510 @mcgilman Thanks for clarifying the String manipulation exception, I just didn't have enough imagination to come up with such invalid inputs. I switched to use regex to check and parse property keys. Now it should be more robust and provide more user friendly error messages. Also, added try/catch for EL parse failure. Thanks! ---
[jira] [Commented] (NIFI-4927) Create InfluxDB Query Processor
[ https://issues.apache.org/jira/browse/NIFI-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421898#comment-16421898 ] ASF GitHub Bot commented on NIFI-4927: -- Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2562 @MikeThomsen - I've updated the code based on your comments. Let me know if you have any more recommendations. Thanks > Create InfluxDB Query Processor > --- > > Key: NIFI-4927 > URL: https://issues.apache.org/jira/browse/NIFI-4927 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.5.0 >Reporter: Mans Singh >Assignee: Mans Singh >Priority: Minor > Labels: measurements,, query, realtime, timeseries > > Create InfluxDB Query processor -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi issue #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2562 @MikeThomsen - I've updated the code based on your comments. Let me know if you have any more recommendations. Thanks ---
[jira] [Commented] (NIFI-4927) Create InfluxDB Query Processor
[ https://issues.apache.org/jira/browse/NIFI-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421896#comment-16421896 ] ASF GitHub Bot commented on NIFI-4927: -- Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178480144 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/AbstractITInfluxDB.java --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.util.TestRunner; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import org.junit.After; + +/** + * Base integration test class for InfluxDB processors + */ +public class AbstractITInfluxDB { +protected TestRunner runner; +protected InfluxDB influxDB; +protected String dbName = "test"; +protected String dbUrl = "http://localhost:8086;; +protected String user = "admin"; +protected String password = "admin"; +protected static final String DEFAULT_RETENTION_POLICY = "autogen"; + +protected void initInfluxDB() throws InterruptedException, Exception { +influxDB = InfluxDBFactory.connect(dbUrl,user,password); +cleanUpDatabase(); --- End diff -- Removed as recommended. > Create InfluxDB Query Processor > --- > > Key: NIFI-4927 > URL: https://issues.apache.org/jira/browse/NIFI-4927 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.5.0 >Reporter: Mans Singh >Assignee: Mans Singh >Priority: Minor > Labels: measurements,, query, realtime, timeseries > > Create InfluxDB Query processor -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178480359 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import static org.junit.Assert.assertEquals; +import org.junit.Assert; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunners; +import org.influxdb.InfluxDB; +import org.influxdb.dto.QueryResult; +import org.junit.Before; +import org.junit.Test; + +/** + * Integration test for executing InfluxDB queries. Please ensure that the InfluxDB is running + * on local host with default port and has database test with table test. Please set user + * and password if applicable before running the integration tests. + */ +public class ITExecuteInfluxDBQuery extends AbstractITInfluxDB { + +@Before +public void setUp() throws Exception { +initInfluxDB(); +runner = TestRunners.newTestRunner(ExecuteInfluxDBQuery.class); +initializeRunner(); +} + +@Test +public void testValidScheduleQueryWithNoIncoming() { +String message = "water,country=US,city=newark rain=1,humidity=0.6 1501002274856668652"; +influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message); + +String query = "select * from water"; +runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_QUERY, query); + +runner.setIncomingConnection(false); +runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1); +List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); +assertEquals("Value should be equal", 1, flowFiles.size()); +assertEquals("Value should be equal",null, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE)); +assertEquals("Value should be equal",query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY)); +flowFiles.get(0).assertContentEquals( --- End diff -- Updated to parse json and compare typed results. ---
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178480131 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder() +.name("influxdb-query") +.displayName("InfluxDB Query") +.description("The InfluxDB query to execute. " ++ "Note: If there are incoming connections, then the query is created from incoming FlowFile's content and scheduled query is ignored.") +.required(false) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.expressionLanguageSupported(true) +
[jira] [Commented] (NIFI-4927) Create InfluxDB Query Processor
[ https://issues.apache.org/jira/browse/NIFI-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421897#comment-16421897 ] ASF GitHub Bot commented on NIFI-4927: -- Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178480359 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import static org.junit.Assert.assertEquals; +import org.junit.Assert; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunners; +import org.influxdb.InfluxDB; +import org.influxdb.dto.QueryResult; +import org.junit.Before; +import org.junit.Test; + +/** + * Integration test for executing InfluxDB queries. Please ensure that the InfluxDB is running + * on local host with default port and has database test with table test. Please set user + * and password if applicable before running the integration tests. + */ +public class ITExecuteInfluxDBQuery extends AbstractITInfluxDB { + +@Before +public void setUp() throws Exception { +initInfluxDB(); +runner = TestRunners.newTestRunner(ExecuteInfluxDBQuery.class); +initializeRunner(); +} + +@Test +public void testValidScheduleQueryWithNoIncoming() { +String message = "water,country=US,city=newark rain=1,humidity=0.6 1501002274856668652"; +influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message); + +String query = "select * from water"; +runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_QUERY, query); + +runner.setIncomingConnection(false); +runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1); +List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); +assertEquals("Value should be equal", 1, flowFiles.size()); +assertEquals("Value should be equal",null, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE)); +assertEquals("Value should be equal",query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY)); +flowFiles.get(0).assertContentEquals( --- End diff -- Updated to parse json and compare typed results. > Create InfluxDB Query Processor > --- > > Key: NIFI-4927 > URL: https://issues.apache.org/jira/browse/NIFI-4927 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.5.0 >Reporter: Mans Singh >Assignee: Mans Singh >Priority: Minor > Labels: measurements,, query, realtime, timeseries > > Create InfluxDB Query processor -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4927) Create InfluxDB Query Processor
[ https://issues.apache.org/jira/browse/NIFI-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421895#comment-16421895 ] ASF GitHub Bot commented on NIFI-4927: -- Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178480131 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder() +.name("influxdb-query") +.displayName("InfluxDB Query") +.description("The InfluxDB query to execute. " ++ "Note: If there are incoming connections, then the query is created
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178480144 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/AbstractITInfluxDB.java --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.util.TestRunner; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import org.junit.After; + +/** + * Base integration test class for InfluxDB processors + */ +public class AbstractITInfluxDB { +protected TestRunner runner; +protected InfluxDB influxDB; +protected String dbName = "test"; +protected String dbUrl = "http://localhost:8086;; +protected String user = "admin"; +protected String password = "admin"; +protected static final String DEFAULT_RETENTION_POLICY = "autogen"; + +protected void initInfluxDB() throws InterruptedException, Exception { +influxDB = InfluxDBFactory.connect(dbUrl,user,password); +cleanUpDatabase(); --- End diff -- Removed as recommended. ---
[jira] [Commented] (NIFI-4927) Create InfluxDB Query Processor
[ https://issues.apache.org/jira/browse/NIFI-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421894#comment-16421894 ] ASF GitHub Bot commented on NIFI-4927: -- Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178480116 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder() +.name("influxdb-query") +.displayName("InfluxDB Query") +.description("The InfluxDB query to execute. " ++ "Note: If there are incoming connections, then the query is created
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178480116 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder() +.name("influxdb-query") +.displayName("InfluxDB Query") +.description("The InfluxDB query to execute. " ++ "Note: If there are incoming connections, then the query is created from incoming FlowFile's content and scheduled query is ignored.") +.required(false) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.expressionLanguageSupported(true) +
[jira] [Commented] (NIFI-4927) Create InfluxDB Query Processor
[ https://issues.apache.org/jira/browse/NIFI-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421893#comment-16421893 ] ASF GitHub Bot commented on NIFI-4927: -- Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178480026 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder() +.name("influxdb-query") +.displayName("InfluxDB Query") +.description("The InfluxDB query to execute. " ++ "Note: If there are incoming connections, then the query is created
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178480026 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder() +.name("influxdb-query") +.displayName("InfluxDB Query") +.description("The InfluxDB query to execute. " ++ "Note: If there are incoming connections, then the query is created from incoming FlowFile's content and scheduled query is ignored.") +.required(false) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.expressionLanguageSupported(true) +
[jira] [Commented] (NIFI-4927) Create InfluxDB Query Processor
[ https://issues.apache.org/jira/browse/NIFI-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421890#comment-16421890 ] ASF GitHub Bot commented on NIFI-4927: -- Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178479729 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder() +.name("influxdb-query") +.displayName("InfluxDB Query") +.description("The InfluxDB query to execute. " ++ "Note: If there are incoming connections, then the query is created
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178479805 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder() +.name("influxdb-query") +.displayName("InfluxDB Query") +.description("The InfluxDB query to execute. " ++ "Note: If there are incoming connections, then the query is created from incoming FlowFile's content and scheduled query is ignored.") +.required(false) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.expressionLanguageSupported(true) +
[jira] [Commented] (NIFI-4927) Create InfluxDB Query Processor
[ https://issues.apache.org/jira/browse/NIFI-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421889#comment-16421889 ] ASF GitHub Bot commented on NIFI-4927: -- Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178479713 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder() +.name("influxdb-query") +.displayName("InfluxDB Query") +.description("The InfluxDB query to execute. " ++ "Note: If there are incoming connections, then the query is created
[jira] [Commented] (NIFI-4927) Create InfluxDB Query Processor
[ https://issues.apache.org/jira/browse/NIFI-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421891#comment-16421891 ] ASF GitHub Bot commented on NIFI-4927: -- Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178479805 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder() +.name("influxdb-query") +.displayName("InfluxDB Query") +.description("The InfluxDB query to execute. " ++ "Note: If there are incoming connections, then the query is created
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178479729 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder() +.name("influxdb-query") +.displayName("InfluxDB Query") +.description("The InfluxDB query to execute. " ++ "Note: If there are incoming connections, then the query is created from incoming FlowFile's content and scheduled query is ignored.") +.required(false) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.expressionLanguageSupported(true) +
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178479713 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder() +.name("influxdb-query") +.displayName("InfluxDB Query") +.description("The InfluxDB query to execute. " ++ "Note: If there are incoming connections, then the query is created from incoming FlowFile's content and scheduled query is ignored.") +.required(false) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.expressionLanguageSupported(true) +
[jira] [Commented] (NIFI-4927) Create InfluxDB Query Processor
[ https://issues.apache.org/jira/browse/NIFI-4927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421888#comment-16421888 ] ASF GitHub Bot commented on NIFI-4927: -- Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178479664 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder() +.name("influxdb-query") +.displayName("InfluxDB Query") +.description("The InfluxDB query to execute. " ++ "Note: If there are incoming connections, then the query is created
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178479664 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder() +.name("influxdb-query") +.displayName("InfluxDB Query") +.description("The InfluxDB query to execute. " ++ "Note: If there are incoming connections, then the query is created from incoming FlowFile's content and scheduled query is ignored.") +.required(false) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.expressionLanguageSupported(true) +
[GitHub] nifi issue #2587: NIFI-4185 Add XML Record Reader
Github user JohannesDaniel commented on the issue: https://github.com/apache/nifi/pull/2587 Can you maybe post the XML that led to the empty record? ---
[jira] [Commented] (NIFI-4185) Add XML record reader & writer services
[ https://issues.apache.org/jira/browse/NIFI-4185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421843#comment-16421843 ] ASF GitHub Bot commented on NIFI-4185: -- Github user JohannesDaniel commented on the issue: https://github.com/apache/nifi/pull/2587 Can you maybe post the XML that led to the empty record? > Add XML record reader & writer services > --- > > Key: NIFI-4185 > URL: https://issues.apache.org/jira/browse/NIFI-4185 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.3.0 >Reporter: Andy LoPresto >Assignee: Johannes Peter >Priority: Major > Labels: json, records, xml > > With the addition of the {{RecordReader}} and {{RecordSetWriter}} paradigm, > XML conversion has not yet been targeted. This will replace the previous > ticket for XML to JSON conversion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4185) Add XML record reader & writer services
[ https://issues.apache.org/jira/browse/NIFI-4185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421842#comment-16421842 ] ASF GitHub Bot commented on NIFI-4185: -- Github user JohannesDaniel commented on the issue: https://github.com/apache/nifi/pull/2587 Hi @pvillard31 thank you for your comments! I realized all your suggestions. I like your news regarding the performance :-) Which kind of transformation did you test? XML => Record or XML => JSON (e. g. with ConvertRecord)? For any reason some tests disappeared for a certain commit at my local git (probably, I wanted to reorder the tests, but deleted them, omg ...). However, I inserted them again (this is why there are many more tests now). In addition, I adjusted the definition about how namespaces shall be treated. I implemented several tests for XMLReader to verify that the usage of expression language works as expected. However, I was not able to reproduce your observation regarding the empty record for the header ``` ``` I implemented the following tests: ``` testSimpleRecordWithHeader() testSimpleRecordWithHeaderNoValidation() ``` Actually, they work as expected. > Add XML record reader & writer services > --- > > Key: NIFI-4185 > URL: https://issues.apache.org/jira/browse/NIFI-4185 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.3.0 >Reporter: Andy LoPresto >Assignee: Johannes Peter >Priority: Major > Labels: json, records, xml > > With the addition of the {{RecordReader}} and {{RecordSetWriter}} paradigm, > XML conversion has not yet been targeted. This will replace the previous > ticket for XML to JSON conversion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi issue #2587: NIFI-4185 Add XML Record Reader
Github user JohannesDaniel commented on the issue: https://github.com/apache/nifi/pull/2587 Hi @pvillard31 thank you for your comments! I realized all your suggestions. I like your news regarding the performance :-) Which kind of transformation did you test? XML => Record or XML => JSON (e. g. with ConvertRecord)? For any reason some tests disappeared for a certain commit at my local git (probably, I wanted to reorder the tests, but deleted them, omg ...). However, I inserted them again (this is why there are many more tests now). In addition, I adjusted the definition about how namespaces shall be treated. I implemented several tests for XMLReader to verify that the usage of expression language works as expected. However, I was not able to reproduce your observation regarding the empty record for the header ``` ``` I implemented the following tests: ``` testSimpleRecordWithHeader() testSimpleRecordWithHeaderNoValidation() ``` Actually, they work as expected. ---
[jira] [Commented] (NIFI-4185) Add XML record reader & writer services
[ https://issues.apache.org/jira/browse/NIFI-4185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421837#comment-16421837 ] ASF GitHub Bot commented on NIFI-4185: -- Github user JohannesDaniel commented on a diff in the pull request: https://github.com/apache/nifi/pull/2587#discussion_r178470638 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLReader.java --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.xml; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.DateTimeUtils; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SchemaRegistryService; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +@Tags({"xml", "record", "reader", "parser"}) +@CapabilityDescription("Reads XML content and creates Record objects. Records are expected in the second level of " + +"XML data, embedded in an enclosing root tag.") +public class XMLReader extends SchemaRegistryService implements RecordReaderFactory { + +public static final PropertyDescriptor VALIDATE_ROOT_TAG = new PropertyDescriptor.Builder() --- End diff -- done. additionally, I added some tests for class XMLReader > Add XML record reader & writer services > --- > > Key: NIFI-4185 > URL: https://issues.apache.org/jira/browse/NIFI-4185 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.3.0 >Reporter: Andy LoPresto >Assignee: Johannes Peter >Priority: Major > Labels: json, records, xml > > With the addition of the {{RecordReader}} and {{RecordSetWriter}} paradigm, > XML conversion has not yet been targeted. This will replace the previous > ticket for XML to JSON conversion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4185) Add XML record reader & writer services
[ https://issues.apache.org/jira/browse/NIFI-4185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421838#comment-16421838 ] ASF GitHub Bot commented on NIFI-4185: -- Github user JohannesDaniel commented on a diff in the pull request: https://github.com/apache/nifi/pull/2587#discussion_r178470648 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.xml.XMLReader/additionalDetails.html --- @@ -0,0 +1,378 @@ + + + + + +XMLReader + + + + + +The XMLReader Controller Service reads XML content and creates Record objects. The Controller Service +must be configured with a schema that describes the structure of the XML data. Fields in the XML data +that are not defined in the schema will be skipped. + + +Records are expected in the second level of the XML data, embedded within an enclosing root tag: + + + +root + record +field1content/field1 +field2content/field2 + /record + record +field1content/field1 +field2content/field2 + /record +/root + + + + +For the following examples, it is assumed that the exemplary records are enclosed by a root tag. + + +Example 1: Simple Fields + + +The simplest kind of data within XML data are tags / fields only containing content (no attributes, no embedded tags). +They can be described in the schema by simple types (e. g. INT, STRING, ...). + + + + +record + simple_fieldcontent/simple_field +/record + + + + +This record can be described by a schema containing one field (e. g. of type string). By providing this schema, +the reader expects zero or one occurrences of "simple_field" in the record. + + + + +{ + "namespace": "nifi", + "name": "test", + "type": "record", + "fields": [ +{ "name": "simple_field", "type": "string" } + ] +} + + + +Example 2: Arrays with Simple Fields + + +Arrays are considered as repetitive tags / fields in XML data. For the following XML data, "array_field" is considered +to be an array enclosing simple fields, whereas "simple_field" is considered to be a simple field not enclosed in +an array. + + + + +record + array_fieldcontent/array_field + array_fieldcontent/array_field + simple_fieldcontent/simple_field +/record + + + + +This record can be described by the following schema: + + + + +{ + "namespace": "nifi", + "name": "test", + "type": "record", + "fields": [ +{ "name": "array_field", "type": + { "type": "array", "items": string } +}, +{ "name": "simple_field", "type": "string" } + ] +} + + + + +If a field in a schema is embedded in an array, the reader expects zero, one or more occurrences of the field +in a record. The field "array_field" principally also could be defined as a simple field, but then the second occurrence +of this field would replace the first in the record object. Moreover, the field "simple_field" could also be defined +as an array. In this case, the reader would put it into the record object as an array with one element. + + +Example 3: Tags with Attributes + + +XML fields frequently not only contain content, but also attributes. The following record contains a field with +an attribute "attr" and content: + + + + +record + field_with_attribute attr="attr_content"content of field/field_with_attribute +
[GitHub] nifi pull request #2587: NIFI-4185 Add XML Record Reader
Github user JohannesDaniel commented on a diff in the pull request: https://github.com/apache/nifi/pull/2587#discussion_r178470648 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.xml.XMLReader/additionalDetails.html --- @@ -0,0 +1,378 @@ + + + + + +XMLReader + + + + + +The XMLReader Controller Service reads XML content and creates Record objects. The Controller Service +must be configured with a schema that describes the structure of the XML data. Fields in the XML data +that are not defined in the schema will be skipped. + + +Records are expected in the second level of the XML data, embedded within an enclosing root tag: + + + +root + record +field1content/field1 +field2content/field2 + /record + record +field1content/field1 +field2content/field2 + /record +/root + + + + +For the following examples, it is assumed that the exemplary records are enclosed by a root tag. + + +Example 1: Simple Fields + + +The simplest kind of data within XML data are tags / fields only containing content (no attributes, no embedded tags). +They can be described in the schema by simple types (e. g. INT, STRING, ...). + + + + +record + simple_fieldcontent/simple_field +/record + + + + +This record can be described by a schema containing one field (e. g. of type string). By providing this schema, +the reader expects zero or one occurrences of "simple_field" in the record. + + + + +{ + "namespace": "nifi", + "name": "test", + "type": "record", + "fields": [ +{ "name": "simple_field", "type": "string" } + ] +} + + + +Example 2: Arrays with Simple Fields + + +Arrays are considered as repetitive tags / fields in XML data. For the following XML data, "array_field" is considered +to be an array enclosing simple fields, whereas "simple_field" is considered to be a simple field not enclosed in +an array. + + + + +record + array_fieldcontent/array_field + array_fieldcontent/array_field + simple_fieldcontent/simple_field +/record + + + + +This record can be described by the following schema: + + + + +{ + "namespace": "nifi", + "name": "test", + "type": "record", + "fields": [ +{ "name": "array_field", "type": + { "type": "array", "items": string } +}, +{ "name": "simple_field", "type": "string" } + ] +} + + + + +If a field in a schema is embedded in an array, the reader expects zero, one or more occurrences of the field +in a record. The field "array_field" principally also could be defined as a simple field, but then the second occurrence +of this field would replace the first in the record object. Moreover, the field "simple_field" could also be defined +as an array. In this case, the reader would put it into the record object as an array with one element. + + +Example 3: Tags with Attributes + + +XML fields frequently not only contain content, but also attributes. The following record contains a field with +an attribute "attr" and content: + + + + +record + field_with_attribute attr="attr_content"content of field/field_with_attribute +/record + + + + +To parse the content of the field "field_with_attribute" together with the attribute "attr", two requirements have +to be fulfilled: + +
[jira] [Commented] (NIFI-4185) Add XML record reader & writer services
[ https://issues.apache.org/jira/browse/NIFI-4185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16421836#comment-16421836 ] ASF GitHub Bot commented on NIFI-4185: -- Github user JohannesDaniel commented on a diff in the pull request: https://github.com/apache/nifi/pull/2587#discussion_r178470625 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.xml.XMLReader/additionalDetails.html --- @@ -0,0 +1,378 @@ + + + + + +XMLReader + + + + + +The XMLReader Controller Service reads XML content and creates Record objects. The Controller Service +must be configured with a schema that describes the structure of the XML data. Fields in the XML data +that are not defined in the schema will be skipped. + + +Records are expected in the second level of the XML data, embedded within an enclosing root tag: + + + +root + record +field1content/field1 +field2content/field2 + /record + record +field1content/field1 +field2content/field2 + /record +/root + + + + +For the following examples, it is assumed that the exemplary records are enclosed by a root tag. + + +Example 1: Simple Fields + + +The simplest kind of data within XML data are tags / fields only containing content (no attributes, no embedded tags). +They can be described in the schema by simple types (e. g. INT, STRING, ...). + + + + +record + simple_fieldcontent/simple_field +/record + + + + +This record can be described by a schema containing one field (e. g. of type string). By providing this schema, +the reader expects zero or one occurrences of "simple_field" in the record. + + + + +{ + "namespace": "nifi", + "name": "test", + "type": "record", + "fields": [ +{ "name": "simple_field", "type": "string" } + ] +} + + + +Example 2: Arrays with Simple Fields + + +Arrays are considered as repetitive tags / fields in XML data. For the following XML data, "array_field" is considered +to be an array enclosing simple fields, whereas "simple_field" is considered to be a simple field not enclosed in +an array. + + + + +record + array_fieldcontent/array_field + array_fieldcontent/array_field + simple_fieldcontent/simple_field +/record + + + + +This record can be described by the following schema: + + + + +{ + "namespace": "nifi", + "name": "test", + "type": "record", + "fields": [ +{ "name": "array_field", "type": + { "type": "array", "items": string } +}, +{ "name": "simple_field", "type": "string" } + ] +} + + + + +If a field in a schema is embedded in an array, the reader expects zero, one or more occurrences of the field +in a record. The field "array_field" principally also could be defined as a simple field, but then the second occurrence +of this field would replace the first in the record object. Moreover, the field "simple_field" could also be defined +as an array. In this case, the reader would put it into the record object as an array with one element. + + +Example 3: Tags with Attributes + + +XML fields frequently not only contain content, but also attributes. The following record contains a field with +an attribute "attr" and content: + + + + +record + field_with_attribute attr="attr_content"content of field/field_with_attribute +
[GitHub] nifi pull request #2587: NIFI-4185 Add XML Record Reader
Github user JohannesDaniel commented on a diff in the pull request: https://github.com/apache/nifi/pull/2587#discussion_r178470638 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/xml/XMLReader.java --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.xml; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.DateTimeUtils; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.SchemaRegistryService; +import org.apache.nifi.serialization.record.RecordSchema; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +@Tags({"xml", "record", "reader", "parser"}) +@CapabilityDescription("Reads XML content and creates Record objects. Records are expected in the second level of " + +"XML data, embedded in an enclosing root tag.") +public class XMLReader extends SchemaRegistryService implements RecordReaderFactory { + +public static final PropertyDescriptor VALIDATE_ROOT_TAG = new PropertyDescriptor.Builder() --- End diff -- done. additionally, I added some tests for class XMLReader ---
[GitHub] nifi pull request #2587: NIFI-4185 Add XML Record Reader
Github user JohannesDaniel commented on a diff in the pull request: https://github.com/apache/nifi/pull/2587#discussion_r178470625 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.xml.XMLReader/additionalDetails.html --- @@ -0,0 +1,378 @@ + + + + + +XMLReader + + + + + +The XMLReader Controller Service reads XML content and creates Record objects. The Controller Service +must be configured with a schema that describes the structure of the XML data. Fields in the XML data +that are not defined in the schema will be skipped. + + +Records are expected in the second level of the XML data, embedded within an enclosing root tag: + + + +root + record +field1content/field1 +field2content/field2 + /record + record +field1content/field1 +field2content/field2 + /record +/root + + + + +For the following examples, it is assumed that the exemplary records are enclosed by a root tag. + + +Example 1: Simple Fields + + +The simplest kind of data within XML data are tags / fields only containing content (no attributes, no embedded tags). +They can be described in the schema by simple types (e. g. INT, STRING, ...). + + + + +record + simple_fieldcontent/simple_field +/record + + + + +This record can be described by a schema containing one field (e. g. of type string). By providing this schema, +the reader expects zero or one occurrences of "simple_field" in the record. + + + + +{ + "namespace": "nifi", + "name": "test", + "type": "record", + "fields": [ +{ "name": "simple_field", "type": "string" } + ] +} + + + +Example 2: Arrays with Simple Fields + + +Arrays are considered as repetitive tags / fields in XML data. For the following XML data, "array_field" is considered +to be an array enclosing simple fields, whereas "simple_field" is considered to be a simple field not enclosed in +an array. + + + + +record + array_fieldcontent/array_field + array_fieldcontent/array_field + simple_fieldcontent/simple_field +/record + + + + +This record can be described by the following schema: + + + + +{ + "namespace": "nifi", + "name": "test", + "type": "record", + "fields": [ +{ "name": "array_field", "type": + { "type": "array", "items": string } +}, +{ "name": "simple_field", "type": "string" } + ] +} + + + + +If a field in a schema is embedded in an array, the reader expects zero, one or more occurrences of the field +in a record. The field "array_field" principally also could be defined as a simple field, but then the second occurrence +of this field would replace the first in the record object. Moreover, the field "simple_field" could also be defined +as an array. In this case, the reader would put it into the record object as an array with one element. + + +Example 3: Tags with Attributes + + +XML fields frequently not only contain content, but also attributes. The following record contains a field with +an attribute "attr" and content: + + + + +record + field_with_attribute attr="attr_content"content of field/field_with_attribute +/record + + + + +To parse the content of the field "field_with_attribute" together with the attribute "attr", two requirements have +to be fulfilled: + +