[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...

2016-03-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/237


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...

2016-03-15 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/237#discussion_r56276158
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
 ---
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AuthenticationException;
+import com.datastax.driver.core.exceptions.InvalidTypeException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.exceptions.QueryExecutionException;
+import com.datastax.driver.core.exceptions.QueryValidationException;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@SupportsBatching
+@Tags({"cassandra", "cql", "put", "insert", "update", "set"})
+@EventDriven
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Execute provided Cassandra Query Language (CQL) 
statement. The content of an incoming FlowFile "
++ "is expected to be the CQL command to execute. The CQL command 
may use the ? to escape parameters. In this "
++ "case, the parameters to use must exist as FlowFile attributes 
with the naming convention cql.args.N.type "
++ "and cql.args.N.value, where N is a positive integer. The 
cql.args.N.type is expected to be "
++ "a lowercase string indicating the Cassandra type.")
+@ReadsAttributes({
+@ReadsAttribute(attribute = "cql.args.N.type",
+description = "Incoming FlowFiles are expected to be 
parameterized CQL statements. The type of each "
++ "parameter is specified as a lowercase string 
corresponding to the Cassandra data type (text, "
++ "int, boolean, e.g.). In the case of 
collections, the primitive type(s) of the elements in the "
++ "collection should be comma-delimited, follow 
the collection type, and be enclosed in 

[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...

2016-03-15 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/237#discussion_r56237562
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessorTest.java
 ---
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Configuration;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.Row;
+import com.google.common.collect.Sets;
+import org.apache.nifi.authorization.exception.ProviderCreationException;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.net.ssl.SSLContext;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+/**
+ * Unit tests for the AbstractCassandraProcessor class
+ */
+public class AbstractCassandraProcessorTest {
+
+MockAbstractCassandraProcessor processor;
+private TestRunner testRunner;
+
+@Before
+public void setUp() throws Exception {
+processor = new MockAbstractCassandraProcessor();
+testRunner = TestRunners.newTestRunner(processor);
+}
+
+@Test
+public void testCustomValidate() throws Exception {
--- End diff --

Since you already have a testCustomValidate, might as well add check for 
testing a port that isn't in the valid range.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...

2016-03-15 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/237#discussion_r56237303
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Configuration;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.exceptions.UnavailableException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.net.ssl.SSLContext;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for the PutCassandraQL processor
+ */
+public class PutCassandraQLTest {
+
+private TestRunner testRunner;
+private MockPutCassandraQL processor;
+
+@Before
+public void setUp() throws Exception {
+processor = new MockPutCassandraQL();
+testRunner = TestRunners.newTestRunner(processor);
+}
+
+@Test
+public void testProcessorConfigValidity() {
+testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, 
"localhost:9042");
+testRunner.assertValid();
+testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, 
"password");
+testRunner.assertNotValid();
+testRunner.setProperty(AbstractCassandraProcessor.USERNAME, 
"username");
+
testRunner.setProperty(AbstractCassandraProcessor.CONSISTENCY_LEVEL, "ONE");
+testRunner.assertValid();
+}
+
+@Test
+public void testProcessorHappyPath() {
+setUpStandardTestConfig();
+
+testRunner.run(1, true, true);
+
testRunner.assertAllFlowFilesTransferred(PutCassandraQL.REL_SUCCESS, 1);
+testRunner.clearTransferState();
+}
+
+@Test
+public void testProcessorInvalidQueryException() {
+setUpStandardTestConfig();
+
+// Test exceptions
+processor.setExceptionToThrow(
+new InvalidQueryException(new 
InetSocketAddress("localhost", 9042), "invalid query"));
+testRunner.enqueue("UPDATE users SET cities = [ 'New York', 'Los 
Angeles' ] WHERE user_id = 'coast2coast';");
+testRunner.run(1, true, true);
+
testRunner.assertAllFlowFilesTransferred(PutCassandraQL.REL_FAILURE, 1);
+testRunner.clearTransferState();
+}
+
+@Test
+public void testProcessorUnavailableException() {
+setUpStandardTestConfig();
+
+processor.setExceptionToThrow(
+new UnavailableException(new 
InetSocketAddress("localhost", 9042), ConsistencyLevel.ALL, 5, 2));
   

[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...

2016-03-15 Thread olegz
Github user olegz commented on a diff in the pull request:

https://github.com/apache/nifi/pull/237#discussion_r56216735
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
 ---
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AuthenticationException;
+import com.datastax.driver.core.exceptions.InvalidTypeException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.exceptions.QueryExecutionException;
+import com.datastax.driver.core.exceptions.QueryValidationException;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@SupportsBatching
+@Tags({"cassandra", "cql", "put", "insert", "update", "set"})
+@EventDriven
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Execute provided Cassandra Query Language (CQL) 
statement. The content of an incoming FlowFile "
++ "is expected to be the CQL command to execute. The CQL command 
may use the ? to escape parameters. In this "
++ "case, the parameters to use must exist as FlowFile attributes 
with the naming convention cql.args.N.type "
++ "and cql.args.N.value, where N is a positive integer. The 
cql.args.N.type is expected to be "
++ "a lowercase string indicating the Cassandra type.")
+@ReadsAttributes({
+@ReadsAttribute(attribute = "cql.args.N.type",
+description = "Incoming FlowFiles are expected to be 
parameterized CQL statements. The type of each "
++ "parameter is specified as a lowercase string 
corresponding to the Cassandra data type (text, "
++ "int, boolean, e.g.). In the case of 
collections, the primitive type(s) of the elements in the "
++ "collection should be comma-delimited, follow 
the collection type, and be enclosed in 

[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...

2016-03-15 Thread olegz
Github user olegz commented on a diff in the pull request:

https://github.com/apache/nifi/pull/237#discussion_r56214671
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
 ---
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.SSLOptions;
+import com.datastax.driver.core.Session;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.authorization.exception.ProviderCreationException;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.security.util.SslContextFactory;
+import org.apache.nifi.ssl.SSLContextService;
+
+import javax.net.ssl.SSLContext;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * AbstractCassandraProcessor is a base class for Cassandra processors and 
contains logic and variables common to most
+ * processors integrating with Apache Cassandra.
+ */
+public abstract class AbstractCassandraProcessor extends AbstractProcessor 
{
+
+public static final int DEFAULT_CASSANDRA_PORT = 9042;
+
+private static final Validator HOSTNAME_PORT_VALIDATOR = new 
Validator() {
+@Override
+public ValidationResult validate(final String subject, final 
String input, final ValidationContext context) {
+final List esList = Arrays.asList(input.split(","));
+for (String hostnamePort : esList) {
+String[] addresses = hostnamePort.split(":");
+// Protect against invalid input like 
http://127.0.0.1:9042 (URL scheme should not be there)
+if (addresses.length != 2) {
+return new 
ValidationResult.Builder().subject(subject).input(input).explanation(
+"Each entry must be in hostname:port form (no 
scheme such as http://, and port must be specified)")
--- End diff --

I may have commented on this before. Wouldn't it be easier to simply have 
two fields host/port and avid having this validator all together?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...

2016-03-15 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/237#discussion_r56191239
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
 ---
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Configuration;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.exceptions.UnavailableException;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.net.ssl.SSLContext;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for the PutCassandraQL processor
+ */
+public class PutCassandraQLTest {
+
+private TestRunner testRunner;
+private MockPutCassandraQL processor;
+
+@Before
+public void setUp() throws Exception {
+processor = new MockPutCassandraQL();
+testRunner = TestRunners.newTestRunner(processor);
+}
+
+@Test
+public void testProcessor() {
+testRunner.setProperty(AbstractCassandraProcessor.CONTACT_POINTS, 
"localhost:9042");
+testRunner.assertValid();
+testRunner.setProperty(AbstractCassandraProcessor.PASSWORD, 
"password");
+testRunner.assertNotValid();
+testRunner.setProperty(AbstractCassandraProcessor.USERNAME, 
"username");
+
testRunner.setProperty(AbstractCassandraProcessor.CONSISTENCY_LEVEL, "ONE");
+testRunner.assertValid();
+
+testRunner.enqueue("INSERT INTO users (user_id, first_name, 
last_name, properties) VALUES ?, ?, ?, ?",
+new HashMap() {
+{
+put("cql.args.1.type", "int");
+put("cql.args.1.value", "1");
+put("cql.args.2.type", "text");
+put("cql.args.2.value", "Joe");
+put("cql.args.3.type", "text");
+// No value for arg 3 to test setNull
+put("cql.args.4.type", "map");
+put("cql.args.4.value", "{'a':'Hello', 
'b':'World'}");
+put("cql.args.5.type", "list");
+put("cql.args.5.value", "[true,false,true]");
+put("cql.args.6.type", "set");
+put("cql.args.6.value", "{1.0, 2.0}");
+put("cql.args.7.type", "bigint");
+put("cql.args.7.value", "2000");
+put("cql.args.8.type", "float");
+put("cql.args.8.value", "1.0");

[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...

2016-03-15 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/237#discussion_r56187278
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
 ---
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AuthenticationException;
+import com.datastax.driver.core.exceptions.InvalidTypeException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.exceptions.QueryExecutionException;
+import com.datastax.driver.core.exceptions.QueryValidationException;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@SupportsBatching
+@Tags({"cassandra", "cql", "put", "insert", "update", "set"})
+@EventDriven
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Execute provided Cassandra Query Language (CQL) 
statement. The content of an incoming FlowFile "
++ "is expected to be the CQL command to execute. The CQL command 
may use the ? to escape parameters. In this "
++ "case, the parameters to use must exist as FlowFile attributes 
with the naming convention cql.args.N.type "
++ "and cql.args.N.value, where N is a positive integer. The 
cql.args.N.type is expected to be "
++ "a lowercase string indicating the Cassandra type. The content 
of the FlowFile is expected to be in UTF-8 format.")
+@ReadsAttributes({
+@ReadsAttribute(attribute = "cql.args.N.type",
+description = "Incoming FlowFiles are expected to be 
parameterized CQL statements. The type of each "
++ "parameter is specified as a lowercase string 
corresponding to the Cassandra data type (text, "
++ "int, boolean, e.g.). In the case of 
collections, the primitive type(s) of the elements in the "
++ "collection should be 

[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...

2016-03-15 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/237#discussion_r56178725
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
 ---
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AuthenticationException;
+import com.datastax.driver.core.exceptions.InvalidTypeException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.exceptions.QueryExecutionException;
+import com.datastax.driver.core.exceptions.QueryValidationException;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@SupportsBatching
+@Tags({"cassandra", "cql", "put", "insert", "update", "set"})
+@EventDriven
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Execute provided Cassandra Query Language (CQL) 
statement. The content of an incoming FlowFile "
++ "is expected to be the CQL command to execute. The CQL command 
may use the ? to escape parameters. In this "
++ "case, the parameters to use must exist as FlowFile attributes 
with the naming convention cql.args.N.type "
++ "and cql.args.N.value, where N is a positive integer. The 
cql.args.N.type is expected to be "
++ "a lowercase string indicating the Cassandra type. The content 
of the FlowFile is expected to be in UTF-8 format.")
+@ReadsAttributes({
+@ReadsAttribute(attribute = "cql.args.N.type",
+description = "Incoming FlowFiles are expected to be 
parameterized CQL statements. The type of each "
++ "parameter is specified as a lowercase string 
corresponding to the Cassandra data type (text, "
++ "int, boolean, e.g.). In the case of 
collections, the primitive type(s) of the elements in the "
++ "collection should be 

[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...

2016-03-15 Thread mattyb149
Github user mattyb149 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/237#discussion_r56176199
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
 ---
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AuthenticationException;
+import com.datastax.driver.core.exceptions.InvalidTypeException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.exceptions.QueryExecutionException;
+import com.datastax.driver.core.exceptions.QueryValidationException;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@SupportsBatching
+@Tags({"cassandra", "cql", "put", "insert", "update", "set"})
+@EventDriven
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Execute provided Cassandra Query Language (CQL) 
statement. The content of an incoming FlowFile "
++ "is expected to be the CQL command to execute. The CQL command 
may use the ? to escape parameters. In this "
++ "case, the parameters to use must exist as FlowFile attributes 
with the naming convention cql.args.N.type "
++ "and cql.args.N.value, where N is a positive integer. The 
cql.args.N.type is expected to be "
++ "a lowercase string indicating the Cassandra type. The content 
of the FlowFile is expected to be in UTF-8 format.")
--- End diff --

Doh! Good catch, will update


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...

2016-03-15 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi/pull/237#discussion_r56175269
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
 ---
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AuthenticationException;
+import com.datastax.driver.core.exceptions.InvalidTypeException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.exceptions.QueryExecutionException;
+import com.datastax.driver.core.exceptions.QueryValidationException;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.ReadsAttributes;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@SupportsBatching
+@Tags({"cassandra", "cql", "put", "insert", "update", "set"})
+@EventDriven
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Execute provided Cassandra Query Language (CQL) 
statement. The content of an incoming FlowFile "
++ "is expected to be the CQL command to execute. The CQL command 
may use the ? to escape parameters. In this "
++ "case, the parameters to use must exist as FlowFile attributes 
with the naming convention cql.args.N.type "
++ "and cql.args.N.value, where N is a positive integer. The 
cql.args.N.type is expected to be "
++ "a lowercase string indicating the Cassandra type. The content 
of the FlowFile is expected to be in UTF-8 format.")
--- End diff --

"The content of the FlowFile is expected to be in UTF-8 format." After 
adding the charset property I don't believe this is the case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] nifi pull request: NIFI-901: Add QueryCassandra and PutCassandraQL...

2016-02-26 Thread olegz
Github user olegz commented on a diff in the pull request:

https://github.com/apache/nifi/pull/237#discussion_r54299220
  
--- Diff: 
nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java
 ---
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.SSLOptions;
+import com.datastax.driver.core.Session;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.authorization.exception.ProviderCreationException;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.logging.ProcessorLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.security.util.SslContextFactory;
+import org.apache.nifi.ssl.SSLContextService;
+
+import javax.net.ssl.SSLContext;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * AbstractCassandraProcessor is a base class for Cassandra processors and 
contains logic and variables common to most
+ * processors integrating with Apache Cassandra.
+ */
+public abstract class AbstractCassandraProcessor extends AbstractProcessor 
{
+
+private static final Validator HOSTNAME_PORT_VALIDATOR = new 
Validator() {
+@Override
--- End diff --

Don't you think it would be easier to follow a common convention and simply 
have 'host', 'port' properties? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---