[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r223046927 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java --- @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.neo4j; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Config.ConfigBuilder; +import org.neo4j.driver.v1.Config.LoadBalancingStrategy; +import org.neo4j.driver.v1.Config.TrustStrategy; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; + +/** + * Abstract base class for Neo4JCypherExecutor processors + */ +abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor { + +protected static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("neo4J-query") +.displayName("Neo4J Query") +.description("Specifies the Neo4j Query.") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) --- End diff -- My thought was that the query results will be the flow file content, while preserving the query as an attribute. This way down stream processors can have access to both the results and the query for additional processing. ---
[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r223045178 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java --- @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.neo4j; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Config.ConfigBuilder; +import org.neo4j.driver.v1.Config.LoadBalancingStrategy; +import org.neo4j.driver.v1.Config.TrustStrategy; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; + +/** + * Abstract base class for Neo4JCypherExecutor processors + */ +abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor { + +protected static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("neo4J-query") +.displayName("Neo4J Query") +.description("Specifies the Neo4j Query.") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONNECTION_URL = new PropertyDescriptor.Builder() +.name("neo4j-connection-url") +.displayName("Neo4j Connection URL") +.description("Neo4J endpoing to connect to.") +.required(true) +.defaultValue("bolt://localhost:7687") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() +.name("neo4j-username") +.displayName("Username") +.description("Username for accessing Neo4J") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() +.name("neo4j-password") +.displayName("Password") +.description("Password for Neo4J user") +.required(true) +.sensitive(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static AllowableValue LOAD_BALANCING_STRATEGY_ROUND_ROBIN = new AllowableValue(LoadBalancingStrategy.ROUND_ROBIN.name(), "Round Robin", "Round Robin Strategy"); + +public static AllowableValue LOAD_BALANCING_STRATEGY_LEAST_CONNECTED = new AllowableValue(LoadBalancingStrategy.LEAST_CONNECTED.n
[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r223045802 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java --- @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.neo4j; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Config.ConfigBuilder; +import org.neo4j.driver.v1.Config.LoadBalancingStrategy; +import org.neo4j.driver.v1.Config.TrustStrategy; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; + +/** + * Abstract base class for Neo4JCypherExecutor processors + */ +abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor { + +protected static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("neo4J-query") +.displayName("Neo4J Query") +.description("Specifies the Neo4j Query.") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONNECTION_URL = new PropertyDescriptor.Builder() +.name("neo4j-connection-url") +.displayName("Neo4j Connection URL") +.description("Neo4J endpoing to connect to.") +.required(true) +.defaultValue("bolt://localhost:7687") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() +.name("neo4j-username") +.displayName("Username") +.description("Username for accessing Neo4J") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() +.name("neo4j-password") +.displayName("Password") +.description("Password for Neo4J user. A dummy non-blank password is required even if it disabled on the server.") +.required(true) +.sensitive(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static AllowableValue LOAD_BALANCING_STRATEGY_ROUND_ROBIN = new AllowableValue(LoadBalancingStrategy.ROUND_ROBIN.name(), "Round Robin", "Round Robin Strategy"); + +public static AllowableValue LOAD_BALANCING_STRATEGY_LEAST_CONNECTED = new AllowableValue(LoadBalancingStrategy.LEAST_CONNECTED.n
[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r223044277 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java --- @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.neo4j; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Config.ConfigBuilder; +import org.neo4j.driver.v1.Config.LoadBalancingStrategy; +import org.neo4j.driver.v1.Config.TrustStrategy; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; + +/** + * Abstract base class for Neo4JCypherExecutor processors + */ +abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor { + +protected static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("neo4J-query") +.displayName("Neo4J Query") +.description("Specifies the Neo4j Query.") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONNECTION_URL = new PropertyDescriptor.Builder() +.name("neo4j-connection-url") +.displayName("Neo4j Connection URL") +.description("Neo4J endpoing to connect to.") +.required(true) +.defaultValue("bolt://localhost:7687") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() +.name("neo4j-username") +.displayName("Username") +.description("Username for accessing Neo4J") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() --- End diff -- @mattyb149 - Neo4j defaults to and prefers enforcing authentication. But let me know if you think we should keep it optional. ---
[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r223043551 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/pom.xml --- @@ -0,0 +1,87 @@ + + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> +4.0.0 + + +org.apache.nifi +nifi-neo4j-bundle +1.8.0-SNAPSHOT + + +nifi-neo4j-processors +jar + + + + org.neo4j.driver + neo4j-java-driver + 1.6.2 + compile + + +org.apache.nifi +nifi-api + + +org.apache.nifi +nifi-utils +1.8.0-SNAPSHOT + + +commons-io +commons-io +2.6 + + +org.apache.commons +commons-lang3 +3.7 + + +com.google.code.gson +gson +2.7 + + +org.apache.nifi +nifi-mock +1.8.0-SNAPSHOT +test + + +com.google.guava +guava +18.0 --- End diff -- Will work on it. ---
[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r221975264 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java --- @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.neo4j; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Config.ConfigBuilder; +import org.neo4j.driver.v1.Config.LoadBalancingStrategy; +import org.neo4j.driver.v1.Config.TrustStrategy; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; + +/** + * Abstract base class for Neo4JCypherExecutor processors + */ +abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor { + +protected static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("neo4J-query") +.displayName("Neo4J Query") +.description("Specifies the Neo4j Query.") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONNECTION_URL = new PropertyDescriptor.Builder() +.name("neo4j-connection-url") +.displayName("Neo4j Connection URL") +.description("Neo4J endpoing to connect to.") +.required(true) +.defaultValue("bolt://localhost:7687") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() +.name("neo4j-username") +.displayName("Username") +.description("Username for accessing Neo4J") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() +.name("neo4j-password") +.displayName("Password") +.description("Password for Neo4J user") +.required(true) +.sensitive(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static AllowableValue LOAD_BALANCING_STRATEGY_ROUND_ROBIN = new AllowableValue(LoadBalancingStrategy.ROUND_ROBIN.name(), "Round Robin", "Round Robin Strategy"); + +public static AllowableValue LOAD_BALANCING_STRATEGY_LEAST_CONNECTED = new AllowableValue(LoadBalancingStrategy.LEAST_CONNECTED.n
[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r221470898 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java --- @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.neo4j; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Config.ConfigBuilder; +import org.neo4j.driver.v1.Config.LoadBalancingStrategy; +import org.neo4j.driver.v1.Config.TrustStrategy; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; + +/** + * Abstract base class for Neo4JCypherExecutor processors + */ +abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor { + +protected static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("neo4J-query") +.displayName("Neo4J Query") +.description("Specifies the Neo4j Query.") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONNECTION_URL = new PropertyDescriptor.Builder() +.name("neo4j-connection-url") +.displayName("Neo4j Connection URL") +.description("Neo4J endpoing to connect to.") +.required(true) +.defaultValue("bolt://localhost:7687") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() +.name("neo4j-username") +.displayName("Username") +.description("Username for accessing Neo4J") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() +.name("neo4j-password") +.displayName("Password") +.description("Password for Neo4J user") +.required(true) +.sensitive(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static AllowableValue LOAD_BALANCING_STRATEGY_ROUND_ROBIN = new AllowableValue(LoadBalancingStrategy.ROUND_ROBIN.name(), "Round Robin", "Round Robin Strategy"); + +public static AllowableValue LOAD_BALANCING_STRATEGY_LEAST_CONNECTED = new AllowableValue(LoadBalancingStrategy.LEAST_CONNECTED.n
[GitHub] nifi issue #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2956 @MikeThomsen - Regarding your graph output questions - Can you please let me know what are the commands/files that were ingested for above output ? ---
[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r221104284 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/test/java/org/apache/nifi/processors/neo4j/TestNeo4JCyperExecutor.java --- @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.neo4j; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.Record; +import org.neo4j.driver.v1.summary.ResultSummary; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.Answers; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +import java.io.File; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * Neo4J Cypher unit tests. + */ +public class TestNeo4JCyperExecutor { --- End diff -- Correcting. ---
[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r221104258 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java --- @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.neo4j; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Config.ConfigBuilder; +import org.neo4j.driver.v1.Config.LoadBalancingStrategy; +import org.neo4j.driver.v1.Config.TrustStrategy; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; + +/** + * Abstract base class for Neo4JCypherExecutor processors + */ +abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor { + +protected static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("neo4J-query") +.displayName("Neo4J Query") +.description("Specifies the Neo4j Query.") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONNECTION_URL = new PropertyDescriptor.Builder() +.name("neo4j-connection-url") +.displayName("Neo4j Connection URL") +.description("Neo4J endpoing to connect to.") +.required(true) +.defaultValue("bolt://localhost:7687") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() +.name("neo4j-username") +.displayName("Username") +.description("Username for accessing Neo4J") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() +.name("neo4j-password") +.displayName("Password") +.description("Password for Neo4J user") +.required(true) +.sensitive(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static AllowableValue LOAD_BALANCING_STRATEGY_ROUND_ROBIN = new AllowableValue(LoadBalancingStrategy.ROUND_ROBIN.name(), "Round Robin", "Round Robin Strategy"); + +public static AllowableValue LOAD_BALANCING_STRATEGY_LEAST_CONNECTED = new AllowableValue(LoadBalancingStrategy.LEAST_CONNECTED.n
[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r221103863 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java --- @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.neo4j; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Config.ConfigBuilder; +import org.neo4j.driver.v1.Config.LoadBalancingStrategy; +import org.neo4j.driver.v1.Config.TrustStrategy; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; + +/** + * Abstract base class for Neo4JCypherExecutor processors + */ +abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor { + +protected static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("neo4J-query") +.displayName("Neo4J Query") +.description("Specifies the Neo4j Query.") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONNECTION_URL = new PropertyDescriptor.Builder() +.name("neo4j-connection-url") +.displayName("Neo4j Connection URL") +.description("Neo4J endpoing to connect to.") +.required(true) +.defaultValue("bolt://localhost:7687") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() +.name("neo4j-username") +.displayName("Username") +.description("Username for accessing Neo4J") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() +.name("neo4j-password") +.displayName("Password") +.description("Password for Neo4J user") +.required(true) +.sensitive(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static AllowableValue LOAD_BALANCING_STRATEGY_ROUND_ROBIN = new AllowableValue(LoadBalancingStrategy.ROUND_ROBIN.name(), "Round Robin", "Round Robin Strategy"); + +public static AllowableValue LOAD_BALANCING_STRATEGY_LEAST_CONNECTED = new AllowableValue(LoadBalancingStrategy.LEAST_CONNECTED.n
[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r221103455 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java --- @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.neo4j; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Config.ConfigBuilder; +import org.neo4j.driver.v1.Config.LoadBalancingStrategy; +import org.neo4j.driver.v1.Config.TrustStrategy; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; + +/** + * Abstract base class for Neo4JCypherExecutor processors + */ +abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor { + +protected static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("neo4J-query") +.displayName("Neo4J Query") +.description("Specifies the Neo4j Query.") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONNECTION_URL = new PropertyDescriptor.Builder() +.name("neo4j-connection-url") +.displayName("Neo4j Connection URL") +.description("Neo4J endpoing to connect to.") +.required(true) +.defaultValue("bolt://localhost:7687") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() +.name("neo4j-username") +.displayName("Username") +.description("Username for accessing Neo4J") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() +.name("neo4j-password") +.displayName("Password") +.description("Password for Neo4J user") +.required(true) +.sensitive(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static AllowableValue LOAD_BALANCING_STRATEGY_ROUND_ROBIN = new AllowableValue(LoadBalancingStrategy.ROUND_ROBIN.name(), "Round Robin", "Round Robin Strategy"); + +public static AllowableValue LOAD_BALANCING_STRATEGY_LEAST_CONNECTED = new AllowableValue(LoadBalancingStrategy.LEAST_CONNECTED.n
[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r221101649 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java --- @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.neo4j; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Config.ConfigBuilder; +import org.neo4j.driver.v1.Config.LoadBalancingStrategy; +import org.neo4j.driver.v1.Config.TrustStrategy; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; + +/** + * Abstract base class for Neo4JCypherExecutor processors + */ +abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor { + +protected static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("neo4J-query") +.displayName("Neo4J Query") +.description("Specifies the Neo4j Query.") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONNECTION_URL = new PropertyDescriptor.Builder() +.name("neo4j-connection-url") +.displayName("Neo4j Connection URL") +.description("Neo4J endpoing to connect to.") +.required(true) +.defaultValue("bolt://localhost:7687") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() +.name("neo4j-username") +.displayName("Username") +.description("Username for accessing Neo4J") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() +.name("neo4j-password") +.displayName("Password") +.description("Password for Neo4J user") +.required(true) +.sensitive(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static AllowableValue LOAD_BALANCING_STRATEGY_ROUND_ROBIN = new AllowableValue(LoadBalancingStrategy.ROUND_ROBIN.name(), "Round Robin", "Round Robin Strategy"); + +public static AllowableValue LOAD_BALANCING_STRATEGY_LEAST_CONNECTED = new AllowableValue(LoadBalancingStrategy.LEAST_CONNECTED.n
[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r221100711 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java --- @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.neo4j; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Config.ConfigBuilder; +import org.neo4j.driver.v1.Config.LoadBalancingStrategy; +import org.neo4j.driver.v1.Config.TrustStrategy; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; + +/** + * Abstract base class for Neo4JCypherExecutor processors + */ +abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor { + +protected static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("neo4J-query") +.displayName("Neo4J Query") +.description("Specifies the Neo4j Query.") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONNECTION_URL = new PropertyDescriptor.Builder() +.name("neo4j-connection-url") +.displayName("Neo4j Connection URL") +.description("Neo4J endpoing to connect to.") +.required(true) +.defaultValue("bolt://localhost:7687") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() +.name("neo4j-username") +.displayName("Username") +.description("Username for accessing Neo4J") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() +.name("neo4j-password") +.displayName("Password") +.description("Password for Neo4J user") +.required(true) +.sensitive(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) --- End diff -- @alopresto - I can remove this. ---
[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r221100628 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java --- @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.neo4j; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Config.ConfigBuilder; +import org.neo4j.driver.v1.Config.LoadBalancingStrategy; +import org.neo4j.driver.v1.Config.TrustStrategy; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; + +/** + * Abstract base class for Neo4JCypherExecutor processors + */ +abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor { + +protected static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("neo4J-query") +.displayName("Neo4J Query") +.description("Specifies the Neo4j Query.") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONNECTION_URL = new PropertyDescriptor.Builder() +.name("neo4j-connection-url") +.displayName("Neo4j Connection URL") +.description("Neo4J endpoing to connect to.") +.required(true) +.defaultValue("bolt://localhost:7687") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder() +.name("neo4j-username") +.displayName("Username") +.description("Username for accessing Neo4J") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() --- End diff -- @MikeThomsen - Should I add a link to the documentation ? ---
[GitHub] nifi issue #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2956 Rebased against master. ---
[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2686 @ijokarumawak - Thanks for your comments. I will checkout the pointers you have given. Mans ---
[GitHub] nifi issue #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2956 @MikeThomsen - There is a docker compose file in the nifi-neo4j-examples project mentioned above. Note that to avoid any conflict with standalone server - the ports exposed in docker compose are different from the default neo4j server. ---
[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r217887181 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/Neo4JCypherExecutor.java --- @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.neo4j; + +import java.io.ByteArrayInputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.summary.ResultSummary; +import org.neo4j.driver.v1.summary.SummaryCounters; + +import com.google.gson.Gson; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@SupportsBatching +@Tags({"neo4j", "graph", "network", "insert", "update", "delete", "put", "get", "node", "relationship", "connection", "executor"}) +@CapabilityDescription("This processor executes a Neo4J Query (https://www.neo4j.com/) defined in the 'Neo4j Query' property of the " ++ "FlowFile and writes the result to the FlowFile body in JSON format. The processor has been tested with Neo4j version 3.4.5") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractNeo4JCypherExecutor.ERROR_MESSAGE, description = "Neo4J error message"), +@WritesAttribute(attribute = AbstractNeo4JCypherExecutor.LABELS_ADDED, description = "Number of labels added"), +@WritesAttribute(attribute = AbstractNeo4JCypherExecutor.NODES_CREATED, description = "Number of nodes created"), +@WritesAttribute(attribute = AbstractNeo4JCypherExecutor.NODES_DELETED, description = "Number of nodes deleted"), +@WritesAttribute(attribute = AbstractNeo4JCypherExecutor.PROPERTIES_SET, description = "Number of properties set"), +@WritesAttribute(attribute = AbstractNeo4JCypherExecutor.RELATIONS_CREATED, description = "Number of relationships created"), +@WritesAttribute(attribute = AbstractNeo4JCypherExecutor.RELATIONS_DELETED, description = "Number of relationships deleted"), +@WritesAttribute(attribute = AbstractNeo4JCypherExecutor.ROWS_RETURNED, description = "Number of rows returned"), +}) +public class Neo4JCypherExecutor extends AbstractNeo4JCypherExecutor { + +private static final Set relationships; +private static final List propertyDescriptors; + +static { +f
[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r217887154 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/AbstractNeo4JCypherExecutor.java --- @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.neo4j; + +import java.io.File; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Config; +import org.neo4j.driver.v1.Config.ConfigBuilder; +import org.neo4j.driver.v1.Config.LoadBalancingStrategy; +import org.neo4j.driver.v1.Config.TrustStrategy; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; + +/** + * Abstract base class for Neo4JCypherExecutor processors + */ +abstract class AbstractNeo4JCypherExecutor extends AbstractProcessor { + +protected static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder() +.name("neo4J-query") +.displayName("Neo4J Query") +.description("Specifies the Neo4j Query.") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONNECTION_URL = new PropertyDescriptor.Builder() --- End diff -- @MikeThomsen - Just wanted to get some clarifications - Currently, the abstract base class has the code for getting the client. Should I move that to a controller class ? If you have any additional pointers, please let know. ---
[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2686 @markap14 @ijokarumawak @jzonthemtn and Nifi Team: Please let me know if you have any additional comments for this processor. Thanks. ---
[GitHub] nifi issue #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2956 @ottobackwards Thanks for your comments. @joewitt - Please let me know if you have any advice/pointers for me. ---
[GitHub] nifi issue #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2956 @ottobackwards - I've update the code/documentation based on your feedback. Please let me know if you have any additional advice for me. Mans ---
[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r215476392 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/test/java/org/apache/nifi/processors/neo4j/ITNeo4JCyperExecutor.java --- @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.neo4j; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.neo4j.driver.v1.AuthTokens; +import org.neo4j.driver.v1.Driver; +import org.neo4j.driver.v1.GraphDatabase; +import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.StatementResult; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + --- End diff -- I've added the steps to the javadoc for the integration tests. ---
[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2956#discussion_r215476339 --- Diff: nifi-nar-bundles/nifi-neo4j-bundle/nifi-neo4j-processors/src/main/java/org/apache/nifi/processors/neo4j/Neo4JCypherExecutor.java --- @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.neo4j; + +import java.io.ByteArrayInputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.neo4j.driver.v1.Session; +import org.neo4j.driver.v1.StatementResult; +import org.neo4j.driver.v1.summary.ResultSummary; +import org.neo4j.driver.v1.summary.SummaryCounters; + +import com.google.gson.Gson; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@SupportsBatching +@Tags({"neo4j", "graph", "network", "insert", "update", "delete", "put", "get", "node", "relationship", "connection", "executor"}) +@CapabilityDescription("This processor executes a Neo4J Query (https://www.neo4j.com/) defined in the 'Neo4j Query' property of the " ++ "FlowFile and writes the result to the FlowFile body in JSON format.") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractNeo4JCypherExecutor.ERROR_MESSAGE, description = "Neo4J error message"), +@WritesAttribute(attribute = AbstractNeo4JCypherExecutor.LABELS_ADDED, description = "Number of labels added"), +@WritesAttribute(attribute = AbstractNeo4JCypherExecutor.NODES_CREATED, description = "Number of nodes created"), +@WritesAttribute(attribute = AbstractNeo4JCypherExecutor.NODES_DELETED, description = "Number of nodes deleted"), +@WritesAttribute(attribute = AbstractNeo4JCypherExecutor.PROPERTIES_SET, description = "Number of properties set"), +@WritesAttribute(attribute = AbstractNeo4JCypherExecutor.RELATIONS_CREATED, description = "Number of relationships created"), +@WritesAttribute(attribute = AbstractNeo4JCypherExecutor.RELATIONS_DELETED, description = "Number of relationships deleted"), +@WritesAttribute(attribute = AbstractNeo4JCypherExecutor.ROWS_RETURNED, description = "Number of rows returned"), +}) +public class Neo4JCypherExecutor extends AbstractNeo4JCypherExecutor { + +private static final Set relationships; +private static final List propertyDescriptors; + +static { +final Set tempRelationships = new
[GitHub] nifi issue #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2956 Hello @ottobackwards I've documented the steps for setting up neo4j and testing this processor in a project [nifi-neo4j-examples](https://github.com/mans2singh/nifi-flow-examples/tree/nifi-neo4j-examples) (branch nifi-neo4j-examples). The project's [README.md](https://github.com/mans2singh/nifi-flow-examples/blob/nifi-neo4j-examples/README.md) has the steps, along with input sample files (creating a node, select all the nodes, delete nodes, etc) and a NIFI Neo4J sample template. The nifi-neo4j-processors project also has full suite of integration tests which can be executed after setting up neo4j. Please let me know if this helps, If you have andy feedback or advice for me, please let me know. Thanks again. Mans ---
[GitHub] nifi issue #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2956 Hi @ottobackwards @joewitt I've renamed the bundle based on your feedback. Please let me know if you have any more comments/feedback. Thanks for your advice. ---
[GitHub] nifi issue #2956: NIFI-5537 Create Neo4J cypher execution processor
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2956 @ottobackwards - I am open to changing the name to your recommendation. Mans ---
[GitHub] nifi pull request #2956: NIFI-5537 Create Neo4J cypher execution processor
GitHub user mans2singh opened a pull request: https://github.com/apache/nifi/pull/2956 NIFI-5537 Create Neo4J cypher execution processor Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [x] Have you written or updated unit tests to verify your changes? - [x] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [x] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [x] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ x] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mans2singh/nifi NIFI-5537 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2956.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2956 commit 1e35578166fd0fbd588a4fe06571d1f4d9efc3b1 Author: mans2singh Date: 2018-08-20T02:18:13Z NIFI-5537 Create Neo4J cypher execution processor ---
[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2686 Hi @ijokarumawak - I've merged your changes. Please let me know if you have any more recommendations. Thanks for your help. Mans ---
[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2686#discussion_r200638844 --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/pom.xml --- @@ -0,0 +1,114 @@ + + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> +4.0.0 + + +org.apache.nifi +nifi-deeplearning4j-bundle +1.7.0-SNAPSHOT + + +nifi-deeplearning4j-processors +jar + + + +org.nd4j +nd4j-api +1.0.0-alpha + + +org.nd4j +nd4j-native-platform +1.0.0-alpha + + + org.nd4j + nd4j-cuda-8.0-platform +1.0.0-alpha + + + org.nd4j + nd4j-cuda-9.0-platform +1.0.0-alpha + + + org.nd4j + nd4j-cuda-9.1-platform +1.0.0-alpha + --- End diff -- @ijokarumawak I tried you changes locally and they look great. Is it ok with you if I merge them into my branch ? Once again, thanks for your help. ---
[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2686#discussion_r199823948 --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/pom.xml --- @@ -0,0 +1,114 @@ + + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> +4.0.0 + + +org.apache.nifi +nifi-deeplearning4j-bundle +1.7.0-SNAPSHOT + + +nifi-deeplearning4j-processors +jar + + + +org.nd4j +nd4j-api +1.0.0-alpha + + +org.nd4j +nd4j-native-platform +1.0.0-alpha + + + org.nd4j + nd4j-cuda-8.0-platform +1.0.0-alpha + + + org.nd4j + nd4j-cuda-9.0-platform +1.0.0-alpha + + + org.nd4j + nd4j-cuda-9.1-platform +1.0.0-alpha + --- End diff -- @ijokarumawak - Thanks for your updates, I will try them out locally. ---
[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2686#discussion_r199823718 --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/pom.xml --- @@ -0,0 +1,114 @@ + + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> +4.0.0 + + +org.apache.nifi +nifi-deeplearning4j-bundle +1.7.0-SNAPSHOT + + +nifi-deeplearning4j-processors +jar + + + +org.nd4j +nd4j-api +1.0.0-alpha + + +org.nd4j +nd4j-native-platform +1.0.0-alpha + + + org.nd4j + nd4j-cuda-8.0-platform +1.0.0-alpha + + + org.nd4j + nd4j-cuda-9.0-platform +1.0.0-alpha + + + org.nd4j + nd4j-cuda-9.1-platform +1.0.0-alpha + --- End diff -- @ijokarumawak - That sounds great, I will remove the remove cuda dependencies. ---
[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2686 @ijokarumawak - Just following-up: 1. Regarding implementing record look up service - I believe that the processor and a record service lookup can be separate components useful for different use cases. As the file/rdbms based flows I've mentioned above, show - the processor can be used as a transformer. 2. Regarding providing more tools to prepare data - You are right, we can do that once we have the basics in place and there is a need for it. 3. You had mentioned the concern of writing the results (predictions) in the body of the flow file - if you/community think we should keep the observations in the body and put the output in an attribute, I'd be happy to change that. Thanks again for your thoughts and let me know if you have any more advice/recommendations. Mans ---
[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2686#discussion_r196994774 --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JMultiLayerPredictor.java --- @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.deeplearning4j; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.stream.io.StreamUtils; +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork; +import org.deeplearning4j.util.ModelSerializer; +import org.nd4j.linalg.api.ndarray.INDArray; +import org.nd4j.linalg.factory.Nd4j; +import com.google.gson.Gson; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +@EventDriven +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"deeplearning4j", "dl4j", "multilayer", "predict", "classification", "regression", "deep", "learning", "neural", "network"}) +@CapabilityDescription("The DeepLearning4JMultiLayerPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. " ++ "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. " ++ "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. " ++ "Each record can contain multiple fields with each field separated by the 'Field Separator' property." +) +@WritesAttributes({ +@WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"), +@WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"), +}) +public class DeepLearning4JMultiLayerPredictor extends AbstractDeepLearning4JProcessor { + +static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("Successful DeepLearning4j results are routed to this relationship").build(); + +static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("Failed DeepLearning4j results are routed to this relationship").build(); + +protected final Gson gson = new Gson(); + +protected MultiLayerNetwork model = null; + +@OnStopp
[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2686 @ijokarumawak - I've created two flow templates for testing the DL4J processor. The first with a file input and file output. The second flow reads a row from rdbms based on id input file containing the single id, classifies the observation and save the classification results in same row. The supporting files along with a sample classification model that will work with the two templates are in the repository [nifi-flow-examples](https://github.com/mans2singh/nifi-flow-examples.git) branch nifi-dl4j-flow. Here are some details of the two flow templates: 1. The first template is simple one [NifiDL4JFileInputOutput.xml](https://github.com/mans2singh/nifi-flow-examples/blob/nifi-dl4j-flow/dl4jtemplate/NifiDL4JFileInputOutput.xml) that ingests file containing an observation record from directory (sample input in dl4jinput), applies the classification model and writes results to the output directory with the same file name as input. In this scenario, the correlation is based on the file names. 2. The second [NifiDL4JFileToRdbms.xml](https://github.com/mans2singh/nifi-flow-examples/blob/nifi-dl4j-flow/dl4jtemplate/NifiDL4JFileToRdbms.xml) reads a single id from input file (sample in dl4jinputid directory), queries a rdbms table for the observations for the input id, classifies the observation and updates the db row with classification results. In this flow template, the row id of the input is used as a correlation id which is used to update the output column of the corresponding row after the classification is done. The flow uses other Nifi processors to ingest, transform, save the classification results. The table creation and observation row insertion commands are in dl4jsql directory. The flow templates require setting the appropriate input/output files, dl4j model, db controller and rdbms table with the records. Please let me know your thoughts/feedback. Thanks Mans ---
[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2686 @joewitt - I will remove the nar from the assembly. Let me know if there is any additional feedback. Thanks. ---
[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2686 @ijokarumawak - I was thinking that the processor will be used as a transformer (predictor) and there would be a correlation attribute in the flow file which would be used to associate the results with the observations. This will keep the focus on transformation with simple outputs while still allowing the user the flexibility to use the correlation id to combine/enrich it with other data using a enrichment processor. I've added test cases which show how to use correlation id. What's your thought ? ---
[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2686#discussion_r194944047 --- Diff: nifi-assembly/pom.xml --- @@ -379,6 +379,12 @@ language governing permissions and limitations under the License. --> 1.7.0-SNAPSHOT nar + +org.apache.nifi +nifi-deeplearning4j-nar +1.7.0-SNAPSHOT +nar + --- End diff -- I would be happy to create a profile if that is the decision. ---
[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2686#discussion_r194943820 --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/resources/classification_test.txt --- @@ -0,0 +1,100 @@ +1.1,0.5,0.5,0.2,0 --- End diff -- @ijokarumawak - The tests included are to show how we can generate a model and configure the processor for regression and classification in the integration tests. My thought while creating the mock data was create a the model with reproducible results even with limited observations and few iterations. In real life, a multilayer model will be created, tested and validated by the user prior to plugging it into the component. ---
[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2686 @ijokarumawak - Thanks for your feedback. I was away for a few days and will respond to your comments soon. Mans ---
[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2686 @markap14, NIFI team Just wondering if you have any feedback on this processor. Thanks ---
[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2686#discussion_r187663098 --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.deeplearning4j; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork; +import org.nd4j.linalg.api.ndarray.INDArray; +import org.nd4j.linalg.factory.Nd4j; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +@EventDriven +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning"}) +@CapabilityDescription("The DeepLearning4JPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. " ++ "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. " ++ "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. " ++ "Each record can contain multiple fields with each field separated by the 'Field Separator' property." +) +@WritesAttributes({ +@WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"), +@WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"), +}) +public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor { + +static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("Successful DeepLearning4j results are routed to this relationship").build(); + +static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("Failed DeepLearning4j results are routed to this relationship").build(); + +protected Gson gson = new Gson(); + +private static final Set relationships; +private static final List propertyDescriptors; +static { +final Set tempRelationships = new HashSet<>(); +tempRelationships.add(REL_SUCCESS); +tempRelationships.add(REL_FAILURE);
[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2686 @markap14 - Thanks for your prompt review and advice. I've updated the code based on your review and am looking forward to your/other members feedback. Thanks again. Mans ---
[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2686#discussion_r187655735 --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.deeplearning4j; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork; +import org.nd4j.linalg.api.ndarray.INDArray; +import org.nd4j.linalg.factory.Nd4j; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +@EventDriven +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning"}) +@CapabilityDescription("The DeepLearning4JPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. " ++ "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. " ++ "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. " ++ "Each record can contain multiple fields with each field separated by the 'Field Separator' property." +) +@WritesAttributes({ +@WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"), +@WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"), +}) +public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor { + +static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("Successful DeepLearning4j results are routed to this relationship").build(); + +static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("Failed DeepLearning4j results are routed to this relationship").build(); + +protected Gson gson = new Gson(); + +private static final Set relationships; +private static final List propertyDescriptors; +static { +final Set tempRelationships = new HashSet<>(); +tempRelationships.add(REL_SUCCESS); +tempRelationships.add(REL_FAILURE);
[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2686#discussion_r187655641 --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.deeplearning4j; +import java.io.IOException; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork; +import org.deeplearning4j.util.ModelSerializer; + +/** + * Base class for deeplearning4j processors + */ +public abstract class AbstractDeepLearning4JProcessor extends AbstractProcessor { --- End diff -- @markap14 - This is to establish a foundation for future extensions which will be easier if some common base classes are present. I found this pattern to be useful and hope it's not an overkill. ---
[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2686#discussion_r187655109 --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/test/resources/classification_test.txt --- @@ -0,0 +1,100 @@ +1.1,0.5,0.5,0.2,0 --- End diff -- I've mentioned at the beginning of the tests that these are based on deeplearning4j examples. I generated this very simple/small sample file to make consistent predictions for the tests even with just a few observations. ---
[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2686#discussion_r187654464 --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.deeplearning4j; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork; +import org.nd4j.linalg.api.ndarray.INDArray; +import org.nd4j.linalg.factory.Nd4j; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +@EventDriven +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning"}) +@CapabilityDescription("The DeepLearning4JPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. " ++ "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. " ++ "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. " ++ "Each record can contain multiple fields with each field separated by the 'Field Separator' property." +) +@WritesAttributes({ +@WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"), +@WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"), +}) +public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor { + +static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("Successful DeepLearning4j results are routed to this relationship").build(); + +static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("Failed DeepLearning4j results are routed to this relationship").build(); + +protected Gson gson = new Gson(); + +private static final Set relationships; +private static final List propertyDescriptors; +static { +final Set tempRelationships = new HashSet<>(); +tempRelationships.add(REL_SUCCESS); +tempRelationships.add(REL_FAILURE);
[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2686#discussion_r187654383 --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.deeplearning4j; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork; +import org.nd4j.linalg.api.ndarray.INDArray; +import org.nd4j.linalg.factory.Nd4j; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +@EventDriven +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning"}) +@CapabilityDescription("The DeepLearning4JPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. " ++ "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. " ++ "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. " ++ "Each record can contain multiple fields with each field separated by the 'Field Separator' property." +) +@WritesAttributes({ +@WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"), +@WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"), +}) +public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor { + +static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("Successful DeepLearning4j results are routed to this relationship").build(); + +static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("Failed DeepLearning4j results are routed to this relationship").build(); + +protected Gson gson = new Gson(); + +private static final Set relationships; +private static final List propertyDescriptors; +static { +final Set tempRelationships = new HashSet<>(); +tempRelationships.add(REL_SUCCESS); +tempRelationships.add(REL_FAILURE);
[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2686#discussion_r187654258 --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.deeplearning4j; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork; +import org.nd4j.linalg.api.ndarray.INDArray; +import org.nd4j.linalg.factory.Nd4j; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +@EventDriven +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning"}) +@CapabilityDescription("The DeepLearning4JPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. " ++ "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. " ++ "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. " ++ "Each record can contain multiple fields with each field separated by the 'Field Separator' property." +) +@WritesAttributes({ +@WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"), +@WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"), +}) +public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor { + +static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("Successful DeepLearning4j results are routed to this relationship").build(); + +static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("Failed DeepLearning4j results are routed to this relationship").build(); + +protected Gson gson = new Gson(); + +private static final Set relationships; +private static final List propertyDescriptors; +static { +final Set tempRelationships = new HashSet<>(); +tempRelationships.add(REL_SUCCESS); +tempRelationships.add(REL_FAILURE);
[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2686#discussion_r187654197 --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/DeepLearning4JPredictor.java --- @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.deeplearning4j; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork; +import org.nd4j.linalg.api.ndarray.INDArray; +import org.nd4j.linalg.factory.Nd4j; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +@EventDriven +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"deeplearning4j", "dl4j", "predict", "classification", "regression", "deep", "learning"}) +@CapabilityDescription("The DeepLearning4JPredictor predicts one or more value(s) based on provided deeplearning4j (https://github.com/deeplearning4j) model and the content of a FlowFile. " ++ "The processor supports both classification and regression by extracting the record from the FlowFile body and applying the model. " ++ "The processor supports batch by allowing multiple records to be passed in the FlowFile body with each record separated by the 'Record Separator' property. " ++ "Each record can contain multiple fields with each field separated by the 'Field Separator' property." +) +@WritesAttributes({ +@WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_ERROR_MESSAGE, description = "Deeplearning4J error message"), +@WritesAttribute(attribute = AbstractDeepLearning4JProcessor.DEEPLEARNING4J_OUTPUT_SHAPE, description = "Deeplearning4J output shape"), +}) +public class DeepLearning4JPredictor extends AbstractDeepLearning4JProcessor { --- End diff -- This is the intuitive name I could come up with. Please let me know if you have any other recommendations. ---
[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2686#discussion_r187653950 --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.deeplearning4j; +import java.io.IOException; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork; +import org.deeplearning4j.util.ModelSerializer; + +/** + * Base class for deeplearning4j processors + */ +public abstract class AbstractDeepLearning4JProcessor extends AbstractProcessor { + +public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() +.name("deeplearning4j-charset") +.displayName("Character Set") +.description("Specifies the character set of the document data.") +.required(true) +.defaultValue("UTF-8") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) +.build(); + +public static final PropertyDescriptor FIELD_SEPARATOR = new PropertyDescriptor.Builder() +.name("deeplearning4j-field-separator") +.displayName("Field Separator") +.description("Specifies the field separator in the records. (default is comma)") +.required(true) +.defaultValue(",") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor RECORD_SEPARATOR = new PropertyDescriptor.Builder() +.name("deeplearning4j-record-separator") +.displayName("Record Separator") +.description("Specifies the records separator in the message body. (defaults to new line)") +.required(true) +.defaultValue(System.lineSeparator()) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor MODEL_FILE = new PropertyDescriptor.Builder() +.name("model-file") +.displayName("Model File") +.description("Location of the Deeplearning4J model zip file") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) +.build(); + +public static final PropertyDescriptor RECORD_DIMENSIONS = new PropertyDescriptor.Builder() +.name("deeplearning4j-record-dimension") +.displayName("Record dimensions separated by field separator") +.description("Dimension of array in each a record (eg: 2,4 - a 2x4 array)") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final String DEEPLEARNING4J_ERROR_MESSAGE = "deeplear
[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2686#discussion_r187653982 --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.deeplearning4j; +import java.io.IOException; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork; +import org.deeplearning4j.util.ModelSerializer; + +/** + * Base class for deeplearning4j processors + */ +public abstract class AbstractDeepLearning4JProcessor extends AbstractProcessor { + +public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() +.name("deeplearning4j-charset") +.displayName("Character Set") +.description("Specifies the character set of the document data.") +.required(true) +.defaultValue("UTF-8") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) +.build(); + +public static final PropertyDescriptor FIELD_SEPARATOR = new PropertyDescriptor.Builder() +.name("deeplearning4j-field-separator") +.displayName("Field Separator") +.description("Specifies the field separator in the records. (default is comma)") +.required(true) +.defaultValue(",") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor RECORD_SEPARATOR = new PropertyDescriptor.Builder() +.name("deeplearning4j-record-separator") +.displayName("Record Separator") +.description("Specifies the records separator in the message body. (defaults to new line)") +.required(true) +.defaultValue(System.lineSeparator()) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor MODEL_FILE = new PropertyDescriptor.Builder() +.name("model-file") +.displayName("Model File") +.description("Location of the Deeplearning4J model zip file") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) +.build(); + +public static final PropertyDescriptor RECORD_DIMENSIONS = new PropertyDescriptor.Builder() +.name("deeplearning4j-record-dimension") +.displayName("Record dimensions separated by field separator") +.description("Dimension of array in each a record (eg: 2,4 - a 2x4 array)") +.required(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final String DEEPLEARNING4J_ERROR_MESSAGE = &
[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2686#discussion_r187653539 --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.deeplearning4j; +import java.io.IOException; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork; +import org.deeplearning4j.util.ModelSerializer; + +/** + * Base class for deeplearning4j processors + */ +public abstract class AbstractDeepLearning4JProcessor extends AbstractProcessor { + +public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() +.name("deeplearning4j-charset") +.displayName("Character Set") +.description("Specifies the character set of the document data.") +.required(true) +.defaultValue("UTF-8") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) +.build(); + +public static final PropertyDescriptor FIELD_SEPARATOR = new PropertyDescriptor.Builder() +.name("deeplearning4j-field-separator") +.displayName("Field Separator") +.description("Specifies the field separator in the records. (default is comma)") +.required(true) +.defaultValue(",") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) +.build(); + +public static final PropertyDescriptor RECORD_SEPARATOR = new PropertyDescriptor.Builder() +.name("deeplearning4j-record-separator") +.displayName("Record Separator") +.description("Specifies the records separator in the message body. (defaults to new line)") --- End diff -- Corrected ---
[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2686#discussion_r187653485 --- Diff: nifi-nar-bundles/nifi-deeplearning4j-bundle/nifi-deeplearning4j-processors/src/main/java/org/apache/nifi/processors/deeplearning4j/AbstractDeepLearning4JProcessor.java --- @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.deeplearning4j; +import java.io.IOException; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.deeplearning4j.nn.multilayer.MultiLayerNetwork; +import org.deeplearning4j.util.ModelSerializer; + +/** + * Base class for deeplearning4j processors + */ +public abstract class AbstractDeepLearning4JProcessor extends AbstractProcessor { + +public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() +.name("deeplearning4j-charset") +.displayName("Character Set") +.description("Specifies the character set of the document data.") +.required(true) +.defaultValue("UTF-8") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) +.build(); + +public static final PropertyDescriptor FIELD_SEPARATOR = new PropertyDescriptor.Builder() +.name("deeplearning4j-field-separator") +.displayName("Field Separator") +.description("Specifies the field separator in the records. (default is comma)") --- End diff -- Corrected. ---
[GitHub] nifi issue #2686: NIFI-5166 - Deep learning classification and regression pr...
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2686 Good Morning Nifi Folks: The appveyor build is passing for this PR but travis-ci build is failing with the following message: `[ERROR] Failed to execute goal org.apache.maven.plugins:maven-surefire-plugin:2.20.1:test (default-test) on project nifi-cdc-mysql-processors: There are test failures. [ERROR] [ERROR] Please refer to /home/travis/build/apache/nifi/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/target/surefire-reports for the individual test results. [ERROR] Please refer to dump files (if any exist) [date]-jvmRun[N].dump, [date].dumpstream and [date]-jvmRun[N].dumpstream.` Can you please advice on how to resolve this error ? Thanks Mans ---
[GitHub] nifi pull request #2686: NIFI-5166 - Deep learning classification and regres...
GitHub user mans2singh opened a pull request: https://github.com/apache/nifi/pull/2686 NIFI-5166 - Deep learning classification and regression processor wit⦠â¦h deeplearning4j Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ x] Is your initial contribution a single, squashed commit? ### For code changes: - [ x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ x] Have you written or updated unit tests to verify your changes? - [ x] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ x] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [x] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [x] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mans2singh/nifi NIFI-5166 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2686.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2686 commit f58316a906af820f9f056c6ebee171015685f86b Author: mans2singh Date: 2018-05-08T05:11:00Z NIFI-5166 - Deep learning classification and regression processor with deeplearning4j ---
[GitHub] nifi issue #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2562 @MikeThomsen @joewitt - Thanks for your help in making this contribution possible. @timhallinflux - Please let me know if you have any additional enhancements possible. I will contact you via email. Thanks again everyone. ---
[GitHub] nifi issue #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2562 @MikeThomsen - I've updated the code (added expression language scope and updated tests) based your review comments. The integration tests are passing. Please let me know if there is any other comment. Thanks Mans ---
[GitHub] nifi issue #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2562 Thanks @MikeThomsen for your reveiw/advice. Also thanks @timhallinflux for the pointers on influxdb sandbox. Please let me know if there is other feedback. Mans ---
[GitHub] nifi issue #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2562 @MikeThomsen - I have updated the test cases regarding usage of assertions. Let me know if there is anything else outstanding. Thanks ---
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r179012098 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java --- @@ -110,8 +125,72 @@ public void testCreateDB() { assertEquals("Value should be equal",null, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE)); assertEquals("Value should be equal",query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY)); -flowFiles.get(0).assertContentEquals("{\"results\":[{}]}"); +QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class); +assertEquals("results array should be empty", 1, queryResult.getResults().size()); +assertEquals("No series", null, queryResult.getResults().get(0).getSeries()); --- End diff -- Corrected. ---
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r179012073 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java --- @@ -110,8 +125,72 @@ public void testCreateDB() { assertEquals("Value should be equal",null, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE)); assertEquals("Value should be equal",query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY)); -flowFiles.get(0).assertContentEquals("{\"results\":[{}]}"); +QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class); +assertEquals("results array should be empty", 1, queryResult.getResults().size()); --- End diff -- Corrected. ---
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r179012036 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java --- @@ -110,8 +125,72 @@ public void testCreateDB() { assertEquals("Value should be equal",null, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE)); assertEquals("Value should be equal",query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY)); -flowFiles.get(0).assertContentEquals("{\"results\":[{}]}"); +QueryResult queryResult = gson.fromJson(new StringReader(new String(flowFiles.get(0).toByteArray())), QueryResult.class); +assertEquals("results array should be empty", 1, queryResult.getResults().size()); +assertEquals("No series", null, queryResult.getResults().get(0).getSeries()); +} + +@Test +public void testEmptyFlowFileQueryWithScheduledQuery() { +String message = "water,country=US,city=newark rain=1,humidity=0.6 1501002274856668652"; +influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message); + +String query = "select * from water"; +runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_QUERY, query); + +byte [] bytes = new byte [] {}; +runner.enqueue(bytes); +runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1); + +List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); +assertEquals("Value should be equal", 1, flowFiles.size()); +assertEquals("Value should be equal",null, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE)); --- End diff -- Corrected. ---
[GitHub] nifi issue #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2562 @MikeThomsen - I've updated the code based on your comments. Let me know if you have any more recommendations. Thanks ---
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178480359 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java --- @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import static org.junit.Assert.assertEquals; +import org.junit.Assert; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunners; +import org.influxdb.InfluxDB; +import org.influxdb.dto.QueryResult; +import org.junit.Before; +import org.junit.Test; + +/** + * Integration test for executing InfluxDB queries. Please ensure that the InfluxDB is running + * on local host with default port and has database test with table test. Please set user + * and password if applicable before running the integration tests. + */ +public class ITExecuteInfluxDBQuery extends AbstractITInfluxDB { + +@Before +public void setUp() throws Exception { +initInfluxDB(); +runner = TestRunners.newTestRunner(ExecuteInfluxDBQuery.class); +initializeRunner(); +} + +@Test +public void testValidScheduleQueryWithNoIncoming() { +String message = "water,country=US,city=newark rain=1,humidity=0.6 1501002274856668652"; +influxDB.write(dbName, DEFAULT_RETENTION_POLICY, InfluxDB.ConsistencyLevel.ONE, message); + +String query = "select * from water"; +runner.setProperty(ExecuteInfluxDBQuery.INFLUX_DB_QUERY, query); + +runner.setIncomingConnection(false); +runner.run(1,true,true); + runner.assertAllFlowFilesTransferred(ExecuteInfluxDBQuery.REL_SUCCESS, 1); +List flowFiles = runner.getFlowFilesForRelationship(ExecuteInfluxDBQuery.REL_SUCCESS); +assertEquals("Value should be equal", 1, flowFiles.size()); +assertEquals("Value should be equal",null, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_ERROR_MESSAGE)); +assertEquals("Value should be equal",query, flowFiles.get(0).getAttribute(ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY)); +flowFiles.get(0).assertContentEquals( --- End diff -- Updated to parse json and compare typed results. ---
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178480131 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder() +.name("influxdb-query") +.displayName("InfluxDB Query") +.description("The InfluxDB query to execute. " ++ "Note: If there are incoming connections, then the query is created from incoming FlowFile's conte
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178480144 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/AbstractITInfluxDB.java --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.util.TestRunner; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import org.junit.After; + +/** + * Base integration test class for InfluxDB processors + */ +public class AbstractITInfluxDB { +protected TestRunner runner; +protected InfluxDB influxDB; +protected String dbName = "test"; +protected String dbUrl = "http://localhost:8086";; +protected String user = "admin"; +protected String password = "admin"; +protected static final String DEFAULT_RETENTION_POLICY = "autogen"; + +protected void initInfluxDB() throws InterruptedException, Exception { +influxDB = InfluxDBFactory.connect(dbUrl,user,password); +cleanUpDatabase(); --- End diff -- Removed as recommended. ---
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178480116 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder() +.name("influxdb-query") +.displayName("InfluxDB Query") +.description("The InfluxDB query to execute. " ++ "Note: If there are incoming connections, then the query is created from incoming FlowFile's conte
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178480026 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder() +.name("influxdb-query") +.displayName("InfluxDB Query") +.description("The InfluxDB query to execute. " ++ "Note: If there are incoming connections, then the query is created from incoming FlowFile's conte
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178479805 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder() +.name("influxdb-query") +.displayName("InfluxDB Query") +.description("The InfluxDB query to execute. " ++ "Note: If there are incoming connections, then the query is created from incoming FlowFile's conte
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178479729 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder() +.name("influxdb-query") +.displayName("InfluxDB Query") +.description("The InfluxDB query to execute. " ++ "Note: If there are incoming connections, then the query is created from incoming FlowFile's conte
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178479713 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder() +.name("influxdb-query") +.displayName("InfluxDB Query") +.description("The InfluxDB query to execute. " ++ "Note: If there are incoming connections, then the query is created from incoming FlowFile's conte
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r178479664 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement", "get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile (preferred) or a scheduled query. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +public static final PropertyDescriptor INFLUX_DB_QUERY = new PropertyDescriptor.Builder() +.name("influxdb-query") +.displayName("InfluxDB Query") +.description("The InfluxDB query to execute. " ++ "Note: If there are incoming connections, then the query is created from incoming FlowFile's conte
[GitHub] nifi issue #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2562 @MikeThomsen - I've updated the code based on your comments (added check for query result null, changed scheduled query property name). One note, if we use the influxdb-compose.xml file for integration testing, we will need to change the name of the db (from test to something else) which the curl loader uses, since integration test use their own data. Please let me know your thoughts/recommendations. Thanks. ---
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r177296540 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement","get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("Successful InfluxDB queries are routed to this relationship").build(); + +static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("Falied InfluxDB queries are routed to this relationship").build(); + +static final Relationship REL_RETRY = new Relationship.Builder().name("retry") +
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r177296457 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/AbstractITInfluxDB.java --- @@ -36,14 +36,7 @@ protected void initInfluxDB() throws InterruptedException, Exception { influxDB = InfluxDBFactory.connect(dbUrl,user,password); -if ( influxDB.databaseExists(dbName) ) { -QueryResult result = influxDB.query(new Query("DROP measurement water", dbName)); -checkError(result); -result = influxDB.query(new Query("DROP measurement testm", dbName)); -checkError(result); -result = influxDB.query(new Query("DROP database " + dbName, dbName)); -Thread.sleep(1000); -} +cleanUpDatabase(); --- End diff -- I call the cleanup in setup as a precaution so that if there is any conflicting/previously existing data in the test database, it is removed and does not fail the integration test which depend on number of rows inserted. If you think it is unnecessary, I can remove it. ---
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r177296275 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -72,6 +74,16 @@ .sensitive(false) .build(); +public static final PropertyDescriptor INFLUX_DB_SCHEDULED_QUERY = new PropertyDescriptor.Builder() +.name("influxdb-scheduled-query") +.displayName("InfluxDB Schedued Query") --- End diff -- Changed the attribute to INFLUX_DB_QUERY. The description mentions that flow files and timed query both are allowed. ---
[GitHub] nifi issue #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2562 @joewitt @MikeThomsen - I've updated the code based on your comments. I've also added a docker gist [influxdb-compose.xml](https://gist.github.com/mans2singh/5ee90620314d4bcf26d6c65de2540c77#file-influxdb-compose-xml) based on docker compose xml mentioned by @MikeThomsen . This docker compose starts a local influxdb and runs curl commands to create a `test` database and populate it with a metric every 5 seconds. To launch it we need to run `docker-compose -f up` on the local machine and then we can run the integration tests for the query processor or run Nifi locally with processor configured for scheduled query. I am also including two templates ([flow file driven](https://gist.github.com/mans2singh/5ee90620314d4bcf26d6c65de2540c77#file-nifi-influxdb-flow-file-driven-template) and [timer driven](https://gist.github.com/mans2singh/5ee90620314d4bcf26d6c65de2540c77#file-nifi-influxdb-scheduled-query-template)) to assist in testing the InfluxDB query processor. Please let me know if your comments/recommendations. Thanks again. ---
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r176974725 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/AbstractITInfluxDB.java --- @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.util.TestRunner; +import org.influxdb.InfluxDB; +import org.influxdb.InfluxDBFactory; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import org.junit.After; + +/** + * Base integration test class for InfluxDB processors + */ +public class AbstractITInfluxDB { +protected TestRunner runner; +protected InfluxDB influxDB; +protected String dbName = "test"; +protected String dbUrl = "http://localhost:8086";; +protected String user = "admin"; +protected String password = "admin"; +protected static final String DEFAULT_RETENTION_POLICY = "autogen"; + +protected void initInfluxDB() throws InterruptedException, Exception { +influxDB = InfluxDBFactory.connect(dbUrl,user,password); +if ( influxDB.databaseExists(dbName) ) { --- End diff -- I've refactored the code and call it both from `Before` and `After` to make sure that the test setup and tear down is clean. ---
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r176974663 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/test/java/org/apache/nifi/processors/influxdb/ITExecuteInfluxDBQuery.java --- @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import static org.junit.Assert.assertEquals; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunners; +import org.influxdb.InfluxDB; +import org.influxdb.dto.QueryResult; +import org.junit.Before; +import org.junit.Test; + +/** + * Integration test for executing InfluxDB queries. Please ensure that the InfluxDB is running + * on local host with default port and has database test with table test. Please set user + * and password if applicable before running the integration tests. + */ +public class ITExecuteInfluxDBQuery extends AbstractITInfluxDB { + +@Before +public void setUp() throws Exception { +runner = TestRunners.newTestRunner(ExecuteInfluxDBQuery.class); +initializeRunner(); --- End diff -- I've changed the order and call init database before initializing runner. Let me know if that is ok. ---
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r176974557 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) --- End diff -- Added support for timer based queries based on ExecuteSQL processesor as you had recommended and updated test cases. ---
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r176974503 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement","get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") --- End diff -- Influx queries supports data queries, dml and schema exploration. I've added integration tests for these. ---
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r176974419 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement","get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), --- End diff -- Let me know if you have any recommendation for this. ---
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r176974380 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement","get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("Successful InfluxDB queries are routed to this relationship").build(); + +static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("Falied InfluxDB queries are routed to this relationship").build(); + +static final Relationship REL_RETRY = new Relationship.Builder().name("retry") +
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r176974361 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement","get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("Successful InfluxDB queries are routed to this relationship").build(); + +static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("Falied InfluxDB queries are routed to this relationship").build(); + +static final Relationship REL_RETRY = new Relationship.Builder().name("retry") +
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r176974332 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement","get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("Successful InfluxDB queries are routed to this relationship").build(); + +static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("Falied InfluxDB queries are routed to this relationship").build(); + +static final Relationship REL_RETRY = new Relationship.Builder().name("retry") +
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r176974260 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement","get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("Successful InfluxDB queries are routed to this relationship").build(); + +static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("Falied InfluxDB queries are routed to this relationship").build(); + +static final Relationship REL_RETRY = new Relationship.Builder().name("retry") +
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2562#discussion_r176974126 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/ExecuteInfluxDBQuery.java --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.influxdb.dto.Query; +import org.influxdb.dto.QueryResult; +import com.google.gson.Gson; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.net.SocketTimeoutException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement","get", "read", "query", "timeseries"}) +@CapabilityDescription("Processor to execute InfluxDB query from the content of a FlowFile. Please check details of the supported queries in InfluxDB documentation (https://www.influxdb.com/).") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +@WritesAttribute(attribute = ExecuteInfluxDBQuery.INFLUX_DB_EXECUTED_QUERY, description = "InfluxDB executed query"), +}) +public class ExecuteInfluxDBQuery extends AbstractInfluxDBProcessor { + +public static final String INFLUX_DB_EXECUTED_QUERY = "influxdb.executed.query"; + +public static final PropertyDescriptor INFLUX_DB_QUERY_RESULT_TIMEUNIT = new PropertyDescriptor.Builder() +.name("influxdb-query-result-time-unit") +.displayName("Query Result Time Units") +.description("The time unit of query results from the InfluxDB") +.defaultValue(TimeUnit.NANOSECONDS.name()) +.required(true) +.expressionLanguageSupported(true) +.allowableValues(Arrays.stream(TimeUnit.values()).map( v -> v.name()).collect(Collectors.toSet())) +.sensitive(false) +.build(); + +static final Relationship REL_SUCCESS = new Relationship.Builder().name("success") +.description("Successful InfluxDB queries are routed to this relationship").build(); + +static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") +.description("Falied InfluxDB queries are routed to this relationship").build(); + +static final Relationship REL_RETRY = new Relationship.Builder().name("retry") +
[GitHub] nifi issue #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2562 Hi @MikeThomsen - Thanks for your review and comments. I will work on your and @joewitt 's recommendation this weekend. ---
[GitHub] nifi issue #2562: NIFI-4927 - InfluxDB Query Processor
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2562 Hi Nifi Team: Please let me know your suggestions/recommendations on this InfluxDB Query Processsor. Thanks ---
[GitHub] nifi pull request #2562: NIFI-4927 - InfluxDB Query Processor
GitHub user mans2singh opened a pull request: https://github.com/apache/nifi/pull/2562 NIFI-4927 - InfluxDB Query Processor Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [x] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [x] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [x] Has your PR been rebased against the latest commit within the target branch (typically master)? - [x] Is your initial contribution a single, squashed commit? ### For code changes: - [x] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [x] Have you written or updated unit tests to verify your changes? - [x] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [x] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [x] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [x] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [x] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/mans2singh/nifi NIFI-4927 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2562.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2562 commit 7c607eacb1aadb15295bc646ba380fed952cce6e Author: mans2singh Date: 2018-03-18T01:50:08Z NIFI-4927 - InfluxDB Query Processor ---
[GitHub] nifi issue #2101: NIFI-4289 - InfluxDB put processor
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2101 Thanks @MikeThomsen @pvillard31 @mattyb149 @joewitt @jskora and everyone for your advice/support. ---
[GitHub] nifi issue #2101: NIFI-4289 - InfluxDB put processor
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2101 @pvillard31 @mattyb149 @MikeThomsen I've added expression language support for username and password. Please let me know if there is any other recommendation. Thanks Mans ---
[GitHub] nifi issue #2101: NIFI-4289 - InfluxDB put processor
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2101 @pvillard31 - You were right - After renaming the integration tests they don't get executed by default mvn install. Also removed test profile from pom.xml as recommended. Please let me know if you have any additional recommendation. Thanks again. ---
[GitHub] nifi pull request #2101: NIFI-4289 - InfluxDB put processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2101#discussion_r169968256 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/pom.xml --- @@ -0,0 +1,88 @@ + + +http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> +4.0.0 + + +org.apache.nifi +nifi-influxdb-bundle +1.6.0-SNAPSHOT + + +nifi-influxdb-processors +jar + + + + org.influxdb + influxdb-java + + +org.apache.commons +commons-lang3 + + +org.apache.nifi +nifi-api + + +org.apache.nifi +nifi-utils + + +org.apache.nifi +nifi-mock +test + + +org.slf4j +slf4j-simple +test + + +junit +junit +test + + +com.google.guava +guava +test + + + + +default + +true + + + + +org.apache.maven.plugins +maven-surefire-plugin + + +**/IT*.java --- End diff -- @pvillard31 If I remove this profile/configuration from the pom.xml and run `mvn clean install` in that directory, the integration tests get executed and fail if InfluxDb is not running on the local server. ``` Tests in error: ITPutInfluxDBTest.setUp:58 » InfluxDBIO java.net.ConnectException: Failed to c... ITPutInfluxDBTest.setUp:58 » InfluxDBIO java.net.ConnectException: Failed to c... ITPutInfluxDBTest.setUp:58 » InfluxDBIO java.net.ConnectException: Failed to c... ITPutInfluxDBTest.setUp:58 » InfluxDBIO java.net.ConnectException: Failed to c... ITPutInfluxDBTest.setUp:58 » InfluxDBIO java.net.ConnectException: Failed to c... ITPutInfluxDBTest.setUp:58 » InfluxDBIO java.net.ConnectException: Failed to c... ``` Can you please let me know how I can skip running the integration tests without using this profile ? Thanks for @MikeThomsen and your feedback. ---
[GitHub] nifi issue #2101: NIFI-4289 - InfluxDB put processor
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2101 Hi @MikeThomsen @mattyb149 @joewitt I believe I have implemented all the review changes. Please let me know if there is anything I have missed or you have any additional recommendations. Thanks for your feedback. ---
[GitHub] nifi pull request #2101: NIFI-4289 - InfluxDB put processor
Github user mans2singh commented on a diff in the pull request: https://github.com/apache/nifi/pull/2101#discussion_r165818221 --- Diff: nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/PutInfluxDB.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.influxdb; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.influxdb.InfluxDB; +import java.io.ByteArrayOutputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@EventDriven +@SupportsBatching +@Tags({"influxdb", "measurement","insert", "write", "put", "timeseries"}) +@CapabilityDescription("Processor to write the content of a FlowFile (in line protocol https://docs.influxdata.com/influxdb/v1.3/write_protocols/line_protocol_tutorial/) to InfluxDB (https://www.influxdb.com/). " ++ " The flow file can contain single measurement point or multiple measurement points separated by line seperator. The timestamp (last field) should be in nano-seconds resolution.") +@WritesAttributes({ +@WritesAttribute(attribute = AbstractInfluxDBProcessor.INFLUX_DB_ERROR_MESSAGE, description = "InfluxDB error message"), +}) +public class PutInfluxDB extends AbstractInfluxDBProcessor { + +public static AllowableValue CONSISTENCY_LEVEL_ALL = new AllowableValue("ALL", "All", "Return success when all nodes have responded with write success"); +public static AllowableValue CONSISTENCY_LEVEL_ANY = new AllowableValue("ANY", "Any", "Return success when any nodes have responded with write success"); +public static AllowableValue CONSISTENCY_LEVEL_ONE = new AllowableValue("ONE", "One", "Return success when one node has responded with write success"); +public static AllowableValue CONSISTENCY_LEVEL_QUORUM = new AllowableValue("QUORUM", "Quorum", "Return success when a majority of nodes have responded with write success"); + +public static final PropertyDescriptor CONSISTENCY_LEVEL = new PropertyDescriptor.Builder() +.name("influxdb-consistency-level") +.displayName("Consistency Level") +.description("InfluxDB consistency level") +.required(true) +.defaultValue(CONSISTENCY_LEVEL_ONE.getValue()) +.expressionLanguageSupported(true) +.allowableValues(CONSISTENCY_LEVEL_ONE, CONSISTENCY_LEVEL_ANY, CONSISTENCY_LEVEL_ALL, CONSISTENCY_LEVEL_QUORUM) +.build(); + +public static final PropertyDescriptor RETENTION_P
[GitHub] nifi issue #2101: NIFI-4289 - InfluxDB put processor
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2101 @MikeThomsen - If the fields, tags and timestamp for a measurement are the same, they are considered to be the same record. Regarding size limit - I did not find any mention of size limit for posting data in the influxdb docs. I think this all depends on use cases and with the size limit processor property available, the nifi admin configure the values based on their influx db cluster and load. Let me know if I have missed anything or anything else required. Thanks again for your advice/comments. ---
[GitHub] nifi issue #2101: NIFI-4289 - InfluxDB put processor
Github user mans2singh commented on the issue: https://github.com/apache/nifi/pull/2101 @mattyb149 @MikeThomsen Please let me know if you have any addiional comments/recommendations for this processor. Thanks ---