[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...
Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/237 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/237#discussion_r56276158 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java --- @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.InvalidTypeException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.QueryExecutionException; +import com.datastax.driver.core.exceptions.QueryValidationException; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@SupportsBatching +@Tags({"cassandra", "cql", "put", "insert", "update", "set"}) +@EventDriven +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Execute provided Cassandra Query Language (CQL) statement. The content of an incoming FlowFile " ++ "is expected to be the CQL command to execute. The CQL command may use the ? to escape parameters. In this " ++ "case, the parameters to use must exist as FlowFile attributes with the naming convention cql.args.N.type " ++ "and cql.args.N.value, where N is a positive integer. The cql.args.N.type is expected to be " ++ "a lowercase string indicating the Cassandra type.") +@ReadsAttributes({ +@ReadsAttribute(attribute = "cql.args.N.type", +description = "Incoming FlowFiles are expected to be parameterized CQL statements. The type of each " ++ "parameter is specified as a lowercase string corresponding to the Cassandra data type (text, " ++ "int, boolean, e.g.). In the case of collections, the primitive type(s) of the elements in the " ++ "collection should be comma-delimited, follow the collection type, and be enclosed in
[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/237#discussion_r56237562 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java --- @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Configuration; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.Row; +import com.google.common.collect.Sets; +import org.apache.nifi.authorization.exception.ProviderCreationException; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import javax.net.ssl.SSLContext; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +/** + * Unit tests for the AbstractCassandraProcessor class + */ +public class AbstractCassandraProcessorTest { + +MockAbstractCassandraProcessor processor; +private TestRunner testRunner; + +@Before +public void setUp() throws Exception { +processor = new MockAbstractCassandraProcessor(); +testRunner = TestRunners.newTestRunner(processor); +} + +@Test +public void testCustomValidate() throws Exception { --- End diff -- Since you already have a testCustomValidate, might as well add check for testing a port that isn't in the valid range. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/237#discussion_r56237303 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Configuration; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.UnavailableException; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import javax.net.ssl.SSLContext; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for the PutCassandraQL processor + */ +public class PutCassandraQLTest { + +private TestRunner testRunner; +private MockPutCassandraQL processor; + +@Before +public void setUp() throws Exception { +processor = new MockPutCassandraQL(); +testRunner = TestRunners.newTestRunner(processor); +} + +@Test +public void testProcessorConfigValidity() { +testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, "localhost:9042"); +testRunner.assertValid(); +testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, "password"); +testRunner.assertNotValid(); +testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "username"); + testRunner.setProperty(AbstractCassandraProcessor.CONSISTENCY_LEVEL, "ONE"); +testRunner.assertValid(); +} + +@Test +public void testProcessorHappyPath() { +setUpStandardTestConfig(); + +testRunner.run(1, true, true); + testRunner.assertAllFlowFilesTransferred(PutCassandraQL.REL_SUCCESS, 1); +testRunner.clearTransferState(); +} + +@Test +public void testProcessorInvalidQueryException() { +setUpStandardTestConfig(); + +// Test exceptions +processor.setExceptionToThrow( +new InvalidQueryException(new InetSocketAddress("localhost", 9042), "invalid query")); +testRunner.enqueue("UPDATE users SET cities = [ 'New York', 'Los Angeles' ] WHERE user_id = 'coast2coast';"); +testRunner.run(1, true, true); + testRunner.assertAllFlowFilesTransferred(PutCassandraQL.REL_FAILURE, 1); +testRunner.clearTransferState(); +} + +@Test +public void testProcessorUnavailableException() { +setUpStandardTestConfig(); + +processor.setExceptionToThrow( +new UnavailableException(new InetSocketAddress("localhost", 9042), ConsistencyLevel.ALL, 5, 2));
[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...
Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/237#discussion_r56216735 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java --- @@ -0,0 +1,383 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.InvalidTypeException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.QueryExecutionException; +import com.datastax.driver.core.exceptions.QueryValidationException; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@SupportsBatching +@Tags({"cassandra", "cql", "put", "insert", "update", "set"}) +@EventDriven +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Execute provided Cassandra Query Language (CQL) statement. The content of an incoming FlowFile " ++ "is expected to be the CQL command to execute. The CQL command may use the ? to escape parameters. In this " ++ "case, the parameters to use must exist as FlowFile attributes with the naming convention cql.args.N.type " ++ "and cql.args.N.value, where N is a positive integer. The cql.args.N.type is expected to be " ++ "a lowercase string indicating the Cassandra type.") +@ReadsAttributes({ +@ReadsAttribute(attribute = "cql.args.N.type", +description = "Incoming FlowFiles are expected to be parameterized CQL statements. The type of each " ++ "parameter is specified as a lowercase string corresponding to the Cassandra data type (text, " ++ "int, boolean, e.g.). In the case of collections, the primitive type(s) of the elements in the " ++ "collection should be comma-delimited, follow the collection type, and be enclosed in
[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...
Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/237#discussion_r56214671 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java --- @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.SSLOptions; +import com.datastax.driver.core.Session; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.exception.ProviderCreationException; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.security.util.SslContextFactory; +import org.apache.nifi.ssl.SSLContextService; + +import javax.net.ssl.SSLContext; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +/** + * AbstractCassandraProcessor is a base class for Cassandra processors and contains logic and variables common to most + * processors integrating with Apache Cassandra. + */ +public abstract class AbstractCassandraProcessor extends AbstractProcessor { + +public static final int DEFAULT_CASSANDRA_PORT = 9042; + +private static final Validator HOSTNAME_PORT_VALIDATOR = new Validator() { +@Override +public ValidationResult validate(final String subject, final String input, final ValidationContext context) { +final List esList = Arrays.asList(input.split(",")); +for (String hostnamePort : esList) { +String[] addresses = hostnamePort.split(":"); +// Protect against invalid input like http://127.0.0.1:9042 (URL scheme should not be there) +if (addresses.length != 2) { +return new ValidationResult.Builder().subject(subject).input(input).explanation( +"Each entry must be in hostname:port form (no scheme such as http://, and port must be specified)") --- End diff -- I may have commented on this before. Wouldn't it be easier to simply have two fields host/port and avid having this validator all together? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/237#discussion_r56191239 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java --- @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Configuration; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.UnavailableException; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import javax.net.ssl.SSLContext; +import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for the PutCassandraQL processor + */ +public class PutCassandraQLTest { + +private TestRunner testRunner; +private MockPutCassandraQL processor; + +@Before +public void setUp() throws Exception { +processor = new MockPutCassandraQL(); +testRunner = TestRunners.newTestRunner(processor); +} + +@Test +public void testProcessor() { +testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, "localhost:9042"); +testRunner.assertValid(); +testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, "password"); +testRunner.assertNotValid(); +testRunner.setProperty(AbstractCassandraProcessor.USERNAME, "username"); + testRunner.setProperty(AbstractCassandraProcessor.CONSISTENCY_LEVEL, "ONE"); +testRunner.assertValid(); + +testRunner.enqueue("INSERT INTO users (user_id, first_name, last_name, properties) VALUES ?, ?, ?, ?", +new HashMap() { +{ +put("cql.args.1.type", "int"); +put("cql.args.1.value", "1"); +put("cql.args.2.type", "text"); +put("cql.args.2.value", "Joe"); +put("cql.args.3.type", "text"); +// No value for arg 3 to test setNull +put("cql.args.4.type", "map "); +put("cql.args.4.value", "{'a':'Hello', 'b':'World'}"); +put("cql.args.5.type", "list"); +put("cql.args.5.value", "[true,false,true]"); +put("cql.args.6.type", "set"); +put("cql.args.6.value", "{1.0, 2.0}"); +put("cql.args.7.type", "bigint"); +put("cql.args.7.value", "2000"); +put("cql.args.8.type", "float"); +put("cql.args.8.value", "1.0");
[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/237#discussion_r56187278 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java --- @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.InvalidTypeException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.QueryExecutionException; +import com.datastax.driver.core.exceptions.QueryValidationException; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@SupportsBatching +@Tags({"cassandra", "cql", "put", "insert", "update", "set"}) +@EventDriven +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Execute provided Cassandra Query Language (CQL) statement. The content of an incoming FlowFile " ++ "is expected to be the CQL command to execute. The CQL command may use the ? to escape parameters. In this " ++ "case, the parameters to use must exist as FlowFile attributes with the naming convention cql.args.N.type " ++ "and cql.args.N.value, where N is a positive integer. The cql.args.N.type is expected to be " ++ "a lowercase string indicating the Cassandra type. The content of the FlowFile is expected to be in UTF-8 format.") +@ReadsAttributes({ +@ReadsAttribute(attribute = "cql.args.N.type", +description = "Incoming FlowFiles are expected to be parameterized CQL statements. The type of each " ++ "parameter is specified as a lowercase string corresponding to the Cassandra data type (text, " ++ "int, boolean, e.g.). In the case of collections, the primitive type(s) of the elements in the " ++ "collection should be
[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/237#discussion_r56178725 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java --- @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.InvalidTypeException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.QueryExecutionException; +import com.datastax.driver.core.exceptions.QueryValidationException; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@SupportsBatching +@Tags({"cassandra", "cql", "put", "insert", "update", "set"}) +@EventDriven +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Execute provided Cassandra Query Language (CQL) statement. The content of an incoming FlowFile " ++ "is expected to be the CQL command to execute. The CQL command may use the ? to escape parameters. In this " ++ "case, the parameters to use must exist as FlowFile attributes with the naming convention cql.args.N.type " ++ "and cql.args.N.value, where N is a positive integer. The cql.args.N.type is expected to be " ++ "a lowercase string indicating the Cassandra type. The content of the FlowFile is expected to be in UTF-8 format.") +@ReadsAttributes({ +@ReadsAttribute(attribute = "cql.args.N.type", +description = "Incoming FlowFiles are expected to be parameterized CQL statements. The type of each " ++ "parameter is specified as a lowercase string corresponding to the Cassandra data type (text, " ++ "int, boolean, e.g.). In the case of collections, the primitive type(s) of the elements in the " ++ "collection should be
[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...
Github user mattyb149 commented on a diff in the pull request: https://github.com/apache/nifi/pull/237#discussion_r56176199 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java --- @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.InvalidTypeException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.QueryExecutionException; +import com.datastax.driver.core.exceptions.QueryValidationException; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@SupportsBatching +@Tags({"cassandra", "cql", "put", "insert", "update", "set"}) +@EventDriven +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Execute provided Cassandra Query Language (CQL) statement. The content of an incoming FlowFile " ++ "is expected to be the CQL command to execute. The CQL command may use the ? to escape parameters. In this " ++ "case, the parameters to use must exist as FlowFile attributes with the naming convention cql.args.N.type " ++ "and cql.args.N.value, where N is a positive integer. The cql.args.N.type is expected to be " ++ "a lowercase string indicating the Cassandra type. The content of the FlowFile is expected to be in UTF-8 format.") --- End diff -- Doh! Good catch, will update --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/237#discussion_r56175269 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java --- @@ -0,0 +1,403 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSetFuture; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AuthenticationException; +import com.datastax.driver.core.exceptions.InvalidTypeException; +import com.datastax.driver.core.exceptions.NoHostAvailableException; +import com.datastax.driver.core.exceptions.QueryExecutionException; +import com.datastax.driver.core.exceptions.QueryValidationException; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.ReadsAttribute; +import org.apache.nifi.annotation.behavior.ReadsAttributes; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +@SupportsBatching +@Tags({"cassandra", "cql", "put", "insert", "update", "set"}) +@EventDriven +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@CapabilityDescription("Execute provided Cassandra Query Language (CQL) statement. The content of an incoming FlowFile " ++ "is expected to be the CQL command to execute. The CQL command may use the ? to escape parameters. In this " ++ "case, the parameters to use must exist as FlowFile attributes with the naming convention cql.args.N.type " ++ "and cql.args.N.value, where N is a positive integer. The cql.args.N.type is expected to be " ++ "a lowercase string indicating the Cassandra type. The content of the FlowFile is expected to be in UTF-8 format.") --- End diff -- "The content of the FlowFile is expected to be in UTF-8 format." After adding the charset property I don't believe this is the case. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...
Github user olegz commented on a diff in the pull request: https://github.com/apache/nifi/pull/237#discussion_r54299220 --- Diff: nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java --- @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.cassandra; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.Metadata; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.SSLOptions; +import com.datastax.driver.core.Session; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.authorization.exception.ProviderCreationException; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.security.util.SslContextFactory; +import org.apache.nifi.ssl.SSLContextService; + +import javax.net.ssl.SSLContext; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +/** + * AbstractCassandraProcessor is a base class for Cassandra processors and contains logic and variables common to most + * processors integrating with Apache Cassandra. + */ +public abstract class AbstractCassandraProcessor extends AbstractProcessor { + +private static final Validator HOSTNAME_PORT_VALIDATOR = new Validator() { +@Override --- End diff -- Don't you think it would be easier to follow a common convention and simply have 'host', 'port' properties? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---