Repository: nifi
Updated Branches:
  refs/heads/master e7e349631 -> 7e63b0036


NIFI-2165: fix support for inserting timestamps into cassandra

Signed-off-by: Matt Burgess <mattyb...@apache.org>

This closes #602


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7e63b003
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7e63b003
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7e63b003

Branch: refs/heads/master
Commit: 7e63b0036429d739b5d2ee4bca578d99db2b0037
Parents: e7e3496
Author: jeffoxenberg <joxenb...@gmail.com>
Authored: Fri Jul 1 13:26:27 2016 -0500
Committer: Matt Burgess <mattyb...@apache.org>
Committed: Tue Jul 5 13:38:38 2016 -0400

----------------------------------------------------------------------
 .../processors/cassandra/PutCassandraQL.java    |  6 ++-
 .../cassandra/PutCassandraQLTest.java           | 43 +++++++++++++++++++-
 2 files changed, 45 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7e63b003/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
index 4af3aa7..1e0c973 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/PutCassandraQL.java
@@ -55,6 +55,7 @@ import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -318,7 +319,6 @@ public class PutCassandraQL extends 
AbstractCassandraProcessor {
                     if (mainType.equals(DataType.ascii())
                             || mainType.equals(DataType.text())
                             || mainType.equals(DataType.varchar())
-                            || mainType.equals(DataType.timestamp())
                             || mainType.equals(DataType.timeuuid())
                             || mainType.equals(DataType.uuid())
                             || mainType.equals(DataType.inet())
@@ -345,6 +345,8 @@ public class PutCassandraQL extends 
AbstractCassandraProcessor {
                     } else if (mainType.equals(DataType.blob())) {
                         statement.setBytes(paramIndex, (ByteBuffer) 
typeCodec.parse(paramValue));
 
+                    } else if (mainType.equals(DataType.timestamp())) {
+                        statement.setTimestamp(paramIndex, (Date) 
typeCodec.parse(paramValue));
                     }
                     return;
                 } else {
@@ -399,4 +401,4 @@ public class PutCassandraQL extends 
AbstractCassandraProcessor {
         super.stop();
     }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/7e63b003/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
index 1a40556..b3e4fe2 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/test/java/org/apache/nifi/processors/cassandra/PutCassandraQLTest.java
@@ -80,7 +80,7 @@ public class PutCassandraQLTest {
     public void testProcessorHappyPath() {
         setUpStandardTestConfig();
 
-        testRunner.enqueue("INSERT INTO users (user_id, first_name, last_name, 
properties, bits, scaleset, largenum, scale, byteobject) VALUES ?, ?, ?, ?, ?, 
?, ?, ?, ?",
+        testRunner.enqueue("INSERT INTO users (user_id, first_name, last_name, 
properties, bits, scaleset, largenum, scale, byteobject, ts) VALUES ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?",
                 new HashMap<String, String>() {
                     {
                         put("cql.args.1.type", "int");
@@ -101,6 +101,8 @@ public class PutCassandraQLTest {
                         put("cql.args.8.value", "1.0");
                         put("cql.args.9.type", "blob");
                         put("cql.args.9.value", "0xDEADBEEF");
+                        put("cql.args.10.type", "timestamp");
+                        put("cql.args.10.value", "2016-07-01T15:21:05Z");
 
                     }
                 });
@@ -111,6 +113,43 @@ public class PutCassandraQLTest {
     }
 
     @Test
+    public void testProcessorBadTimestamp() {
+        setUpStandardTestConfig();
+        processor.setExceptionToThrow(
+                new InvalidQueryException(new InetSocketAddress("localhost", 
9042), "invalid timestamp"));
+        testRunner.enqueue("INSERT INTO users (user_id, first_name, last_name, 
properties, bits, scaleset, largenum, scale, byteobject, ts) VALUES ?, ?, ?, ?, 
?, ?, ?, ?, ?, ?",
+                new HashMap<String, String>() {
+                    {
+                        put("cql.args.1.type", "int");
+                        put("cql.args.1.value", "1");
+                        put("cql.args.2.type", "text");
+                        put("cql.args.2.value", "Joe");
+                        put("cql.args.3.type", "text");
+                        // No value for arg 3 to test setNull
+                        put("cql.args.4.type", "map<text,text>");
+                        put("cql.args.4.value", "{'a':'Hello', 'b':'World'}");
+                        put("cql.args.5.type", "list<boolean>");
+                        put("cql.args.5.value", "[true,false,true]");
+                        put("cql.args.6.type", "set<double>");
+                        put("cql.args.6.value", "{1.0, 2.0}");
+                        put("cql.args.7.type", "bigint");
+                        put("cql.args.7.value", "20000000");
+                        put("cql.args.8.type", "float");
+                        put("cql.args.8.value", "1.0");
+                        put("cql.args.9.type", "blob");
+                        put("cql.args.9.value", "0xDEADBEEF");
+                        put("cql.args.10.type", "timestamp");
+                        put("cql.args.10.value", "not a timestamp");
+
+                    }
+                });
+
+        testRunner.run(1, true, true);
+        testRunner.assertAllFlowFilesTransferred(PutCassandraQL.REL_FAILURE, 
1);
+        testRunner.clearTransferState();
+    }
+
+    @Test
     public void testProcessorInvalidQueryException() {
         setUpStandardTestConfig();
 
@@ -216,4 +255,4 @@ public class PutCassandraQLTest {
         }
 
     }
-}
\ No newline at end of file
+}

Reply via email to