Pig support for hadoop CqlInputFormat

Patch by Alex Liu, reviewed by brandonwilliams for CASSANDRA-6454


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1e2a5b0e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1e2a5b0e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1e2a5b0e

Branch: refs/heads/trunk
Commit: 1e2a5b0ee334327f1ed62e181542328a0824c27c
Parents: ef894c2
Author: Brandon Williams <brandonwilli...@apache.org>
Authored: Wed Jul 23 10:26:50 2014 -0500
Committer: Brandon Williams <brandonwilli...@apache.org>
Committed: Wed Jul 23 10:26:50 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 build.xml                                       |   1 +
 .../cassandra/hadoop/cql3/CqlConfigHelper.java  |   1 -
 .../cassandra/hadoop/cql3/CqlInputFormat.java   |   2 -
 .../cassandra/hadoop/pig/CqlNativeStorage.java  | 289 +++++++++++++++++++
 .../apache/cassandra/hadoop/pig/CqlStorage.java |  12 +-
 test/conf/cassandra.yaml                        |   2 +
 .../cassandra/pig/CqlTableDataTypeTest.java     |  94 +++++-
 .../org/apache/cassandra/pig/CqlTableTest.java  | 101 ++++++-
 .../org/apache/cassandra/pig/PigTestBase.java   |   3 +
 .../cassandra/pig/ThriftColumnFamilyTest.java   |  73 +++--
 11 files changed, 529 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9909760..6f67720 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.10
+ * Pig support for hadoop CqlInputFormat (CASSANDRA-6454)
  * Fix ReversedType(DateType) mapping to native protocol (CASSANDRA-7576)
  * (Windows) force range-based repair to non-sequential mode (CASSANDRA-7541)
  * Fix range merging when DES scores are zero (CASSANDRA-7535)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index ddf74e4..68907df 100644
--- a/build.xml
+++ b/build.xml
@@ -1275,6 +1275,7 @@
   <classpathentry kind="src" path="interface/thrift/gen-java"/>
   <classpathentry kind="src" path="test/unit"/>
   <classpathentry kind="src" path="test/long"/>
+  <classpathentry kind="src" path="test/pig"/>
   <classpathentry kind="src" path="tools/stress/src"/>
   <classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
   <classpathentry kind="output" path="build/classes/main"/>

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java 
b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
index 63279d1..b375ce2 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlConfigHelper.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.hadoop.cql3;
 *
 */
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.security.KeyManagementException;
 import java.security.KeyStore;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java 
b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
index e1cdf32..09bd80c 100644
--- a/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
+++ b/src/java/org/apache/cassandra/hadoop/cql3/CqlInputFormat.java
@@ -18,8 +18,6 @@
 package org.apache.cassandra.hadoop.cql3;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Map;
 
 import org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat;
 import org.apache.hadoop.mapred.InputSplit;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java 
b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
new file mode 100644
index 0000000..948d21c
--- /dev/null
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlNativeStorage.java
@@ -0,0 +1,289 @@
+package org.apache.cassandra.hadoop.pig;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.hadoop.ConfigHelper;
+import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
+import org.apache.cassandra.thrift.CfDef;
+import org.apache.cassandra.thrift.ColumnDef;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import com.datastax.driver.core.Row;
+
+public class CqlNativeStorage extends CqlStorage
+{
+    private RecordReader<Long, Row> reader;
+    private String nativePort;
+    private String nativeCoreConnections;
+    private String nativeMaxConnections;
+    private String nativeMinSimultReqs;
+    private String nativeMaxSimultReqs;
+    private String nativeConnectionTimeout;
+    private String nativeReadConnectionTimeout;
+    private String nativeReceiveBufferSize;
+    private String nativeSendBufferSize;
+    private String nativeSolinger;
+    private String nativeTcpNodelay;
+    private String nativeReuseAddress;
+    private String nativeKeepAlive;
+    private String nativeAuthProvider;
+    private String nativeSSLTruststorePath;
+    private String nativeSSLKeystorePath;
+    private String nativeSSLTruststorePassword;
+    private String nativeSSLKeystorePassword;
+    private String nativeSSLCipherSuites;
+    private String inputCql;
+
+    public CqlNativeStorage()
+    {
+        this(1000);
+    }
+
+    /** @param pageSize limit number of CQL rows to fetch in a thrift request 
*/
+    public CqlNativeStorage(int pageSize)
+    {
+        super(pageSize);
+        DEFAULT_INPUT_FORMAT = 
"org.apache.cassandra.hadoop.cql3.CqlInputFormat";
+    }
+
+    public void prepareToRead(RecordReader reader, PigSplit split)
+    {
+        this.reader = reader;
+    }
+
+    /** get next row */
+    public Tuple getNext() throws IOException
+    {
+        try
+        {
+            // load the next pair
+            if (!reader.nextKeyValue())
+                return null;
+
+            CfInfo cfInfo = getCfInfo(loadSignature);
+            CfDef cfDef = cfInfo.cfDef;
+            Row row = reader.getCurrentValue();
+            Tuple tuple = 
TupleFactory.getInstance().newTuple(cfDef.column_metadata.size());
+            Iterator<ColumnDef> itera = cfDef.column_metadata.iterator();
+            int i = 0;
+            while (itera.hasNext())
+            {
+                ColumnDef cdef = itera.next();
+                ByteBuffer columnValue = 
row.getBytesUnsafe(ByteBufferUtil.string(cdef.name.duplicate()));
+                if (columnValue != null)
+                {
+                    Column column = new Column(cdef.name, columnValue);
+                    AbstractType<?> validator = 
getValidatorMap(cfDef).get(column.name());
+                    setTupleValue(tuple, i, cqlColumnToObj(column, cfDef), 
validator);
+                }
+                else
+                    tuple.set(i, null);
+                i++;
+            }
+            return tuple;
+        }
+        catch (InterruptedException e)
+        {
+            throw new IOException(e.getMessage());
+        }
+    }
+
+    /** set read configuration settings */
+    public void setLocation(String location, Job job) throws IOException
+    {
+        conf = job.getConfiguration();
+        setLocationFromUri(location);
+
+        if (username != null && password != null)
+        {
+            ConfigHelper.setInputKeyspaceUserNameAndPassword(conf, username, 
password);
+            CqlConfigHelper.setUserNameAndPassword(conf, username, password);
+        }
+        if (splitSize > 0)
+            ConfigHelper.setInputSplitSize(conf, splitSize);
+        if (partitionerClass!= null)
+            ConfigHelper.setInputPartitioner(conf, partitionerClass);
+        if (initHostAddress != null)
+            ConfigHelper.setInputInitialAddress(conf, initHostAddress);
+        if (rpcPort != null)
+            ConfigHelper.setInputRpcPort(conf, rpcPort);
+        if (nativePort != null)
+            CqlConfigHelper.setInputNativePort(conf, nativePort);
+        if (nativeCoreConnections != null)
+            CqlConfigHelper.setInputCoreConnections(conf, 
nativeCoreConnections);
+        if (nativeMaxConnections != null)
+            CqlConfigHelper.setInputMaxConnections(conf, nativeMaxConnections);
+        if (nativeMinSimultReqs != null)
+            CqlConfigHelper.setInputMinSimultReqPerConnections(conf, 
nativeMinSimultReqs);
+        if (nativeMinSimultReqs != null)
+            CqlConfigHelper.setInputMaxSimultReqPerConnections(conf, 
nativeMaxSimultReqs);
+        if (nativeConnectionTimeout != null)
+            CqlConfigHelper.setInputNativeConnectionTimeout(conf, 
nativeConnectionTimeout);
+        if (nativeReadConnectionTimeout != null)
+            CqlConfigHelper.setInputNativeReadConnectionTimeout(conf, 
nativeReadConnectionTimeout);
+        if (nativeReceiveBufferSize != null)
+            CqlConfigHelper.setInputNativeReceiveBufferSize(conf, 
nativeReceiveBufferSize);
+        if (nativeSendBufferSize != null)
+            CqlConfigHelper.setInputNativeSendBufferSize(conf, 
nativeSendBufferSize);
+        if (nativeSolinger != null)
+            CqlConfigHelper.setInputNativeSolinger(conf, nativeSolinger);
+        if (nativeTcpNodelay != null)
+            CqlConfigHelper.setInputNativeTcpNodelay(conf, nativeTcpNodelay);
+        if (nativeReuseAddress != null)
+            CqlConfigHelper.setInputNativeReuseAddress(conf, 
nativeReuseAddress);
+        if (nativeKeepAlive != null)
+            CqlConfigHelper.setInputNativeKeepAlive(conf, nativeKeepAlive);
+        if (nativeAuthProvider != null)
+            CqlConfigHelper.setInputNativeAuthProvider(conf, 
nativeAuthProvider);
+        if (nativeSSLTruststorePath != null)
+            CqlConfigHelper.setInputNativeSSLTruststorePath(conf, 
nativeSSLTruststorePath);
+        if (nativeSSLKeystorePath != null)
+            CqlConfigHelper.setInputNativeSSLKeystorePath(conf, 
nativeSSLKeystorePath);
+        if (nativeSSLTruststorePassword != null)
+            CqlConfigHelper.setInputNativeSSLTruststorePassword(conf, 
nativeSSLTruststorePassword);
+        if (nativeSSLKeystorePassword != null)
+            CqlConfigHelper.setInputNativeSSLKeystorePassword(conf, 
nativeSSLKeystorePassword);
+        if (nativeSSLCipherSuites != null)
+            CqlConfigHelper.setInputNativeSSLCipherSuites(conf, 
nativeSSLCipherSuites);
+
+        ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
+        setConnectionInformation();
+
+        CqlConfigHelper.setInputCQLPageRowSize(conf, String.valueOf(pageSize));
+        CqlConfigHelper.setInputCql(conf, inputCql);
+        if (System.getenv(PIG_INPUT_SPLIT_SIZE) != null)
+        {
+            try
+            {
+                ConfigHelper.setInputSplitSize(conf, 
Integer.parseInt(System.getenv(PIG_INPUT_SPLIT_SIZE)));
+            }
+            catch (NumberFormatException e)
+            {
+                throw new IOException("PIG_INPUT_SPLIT_SIZE is not a number", 
e);
+            }           
+        }
+
+        if (ConfigHelper.getInputInitialAddress(conf) == null)
+            throw new IOException("PIG_INPUT_INITIAL_ADDRESS or 
PIG_INITIAL_ADDRESS environment variable not set");
+        if (ConfigHelper.getInputPartitioner(conf) == null)
+            throw new IOException("PIG_INPUT_PARTITIONER or PIG_PARTITIONER 
environment variable not set");
+        if (loadSignature == null)
+            loadSignature = location;
+
+        initSchema(loadSignature);
+    }
+
+    private void setLocationFromUri(String location) throws IOException
+    {
+        try
+        {
+            if (!location.startsWith("cql://"))
+                throw new Exception("Bad scheme: " + location);
+
+            String[] urlParts = location.split("\\?");
+            if (urlParts.length > 1)
+            {
+                Map<String, String> urlQuery = getQueryMap(urlParts[1]);
+
+                // each page row size
+                if (urlQuery.containsKey("page_size"))
+                    pageSize = Integer.parseInt(urlQuery.get("page_size"));
+
+                // output prepared statement
+                if (urlQuery.containsKey("output_query"))
+                    outputQuery = urlQuery.get("output_query");
+
+                //split size
+                if (urlQuery.containsKey("split_size"))
+                    splitSize = Integer.parseInt(urlQuery.get("split_size"));
+                if (urlQuery.containsKey("partitioner"))
+                    partitionerClass = urlQuery.get("partitioner");
+                if (urlQuery.containsKey("use_secondary"))
+                    usePartitionFilter = 
Boolean.parseBoolean(urlQuery.get("use_secondary"));
+                if (urlQuery.containsKey("init_address"))
+                    initHostAddress = urlQuery.get("init_address");
+
+                if (urlQuery.containsKey("native_port"))
+                    nativePort = urlQuery.get("native_port");
+                if (urlQuery.containsKey("core_conns"))
+                    nativeCoreConnections = urlQuery.get("core_conns");
+                if (urlQuery.containsKey("max_conns"))
+                    nativeMaxConnections = urlQuery.get("max_conns");
+                if (urlQuery.containsKey("min_simult_reqs"))
+                    nativeMinSimultReqs = urlQuery.get("min_simult_reqs");
+                if (urlQuery.containsKey("max_simult_reqs"))
+                    nativeMaxSimultReqs = urlQuery.get("max_simult_reqs");
+                if (urlQuery.containsKey("native_timeout"))
+                    nativeConnectionTimeout = urlQuery.get("native_timeout");
+                if (urlQuery.containsKey("native_read_timeout"))
+                    nativeReadConnectionTimeout = 
urlQuery.get("native_read_timeout");
+                if (urlQuery.containsKey("rec_buff_size"))
+                    nativeReceiveBufferSize = urlQuery.get("rec_buff_size");
+                if (urlQuery.containsKey("send_buff_size"))
+                    nativeSendBufferSize = urlQuery.get("send_buff_size");
+                if (urlQuery.containsKey("solinger"))
+                    nativeSolinger = urlQuery.get("solinger");
+                if (urlQuery.containsKey("tcp_nodelay"))
+                    nativeTcpNodelay = urlQuery.get("tcp_nodelay");
+                if (urlQuery.containsKey("reuse_address"))
+                    nativeReuseAddress = urlQuery.get("reuse_address");
+                if (urlQuery.containsKey("keep_alive"))
+                    nativeKeepAlive = urlQuery.get("keep_alive");
+                if (urlQuery.containsKey("auth_provider"))
+                    nativeAuthProvider = urlQuery.get("auth_provider");
+                if (urlQuery.containsKey("trust_store_path"))
+                    nativeSSLTruststorePath = urlQuery.get("trust_store_path");
+                if (urlQuery.containsKey("key_store_path"))
+                    nativeSSLKeystorePath = urlQuery.get("key_store_path");
+                if (urlQuery.containsKey("trust_store_password"))
+                    nativeSSLTruststorePassword = 
urlQuery.get("trust_store_password");
+                if (urlQuery.containsKey("key_store_password"))
+                    nativeSSLKeystorePassword = 
urlQuery.get("key_store_password");
+                if (urlQuery.containsKey("cipher_suites"))
+                    nativeSSLCipherSuites = urlQuery.get("cipher_suites");
+                if (urlQuery.containsKey("input_cql"))
+                    inputCql = urlQuery.get("input_cql");
+                if (urlQuery.containsKey("rpc_port"))
+                    rpcPort = urlQuery.get("rpc_port");
+            }
+            String[] parts = urlParts[0].split("/+");
+            String[] credentialsAndKeyspace = parts[1].split("@");
+            if (credentialsAndKeyspace.length > 1)
+            {
+                String[] credentials = credentialsAndKeyspace[0].split(":");
+                username = credentials[0];
+                password = credentials[1];
+                keyspace = credentialsAndKeyspace[1];
+            }
+            else
+            {
+                keyspace = parts[1];
+            }
+            column_family = parts[2];
+        }
+        catch (Exception e)
+        {
+            throw new IOException("Expected 
'cql://[username:password@]<keyspace>/<columnfamily>" +
+                    
"[?[page_size=<size>][&columns=<col1,col2>][&output_query=<prepared_statement>]"
 +
+                    
"[&where_clause=<clause>][&split_size=<size>][&partitioner=<partitioner>][&use_secondary=true|false]"
 +
+                    
"[&init_address=<host>][&native_port=<native_port>][&core_conns=<core_conns>]" +
+                    
"[&max_conns=<max_conns>][&min_simult_reqs=<min_simult_reqs>][&max_simult_reqs=<max_simult_reqs>]"
 +
+                    
"[&native_timeout=<native_timeout>][&native_read_timeout=<native_read_timeout>][&rec_buff_size=<rec_buff_size>]"
 +
+                    
"[&send_buff_size=<send_buff_size>][&solinger=<solinger>][&tcp_nodelay=<tcp_nodelay>][&reuse_address=<reuse_address>]"
 +
+                    
"[&keep_alive=<keep_alive>][&auth_provider=<auth_provider>][&trust_store_path=<trust_store_path>]"
 +
+                    
"[&key_store_path=<key_store_path>][&trust_store_password=<trust_store_password>]"
 +
+                    
"[&key_store_password=<key_store_password>][&cipher_suites=<cipher_suites>][&input_cql=<input_cql>]]':
 " + e.getMessage());
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java 
b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
index 284b72a..02a6d98 100644
--- a/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
+++ b/src/java/org/apache/cassandra/hadoop/pig/CqlStorage.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
 import java.nio.charset.CharacterCodingException;
 import java.util.*;
 
-
 import org.apache.cassandra.hadoop.HadoopCompat;
 import org.apache.cassandra.cql3.CFDefinition;
 import org.apache.cassandra.db.Column;
@@ -33,7 +32,6 @@ import org.apache.cassandra.hadoop.cql3.CqlConfigHelper;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
-
 import org.apache.hadoop.mapreduce.*;
 import org.apache.pig.Expression;
 import org.apache.pig.Expression.OpType;
@@ -58,11 +56,11 @@ public class CqlStorage extends AbstractCassandraStorage
     private static final Logger logger = 
LoggerFactory.getLogger(CqlStorage.class);
 
     private RecordReader<Map<String, ByteBuffer>, Map<String, ByteBuffer>> 
reader;
-    private RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
+    protected RecordWriter<Map<String, ByteBuffer>, List<ByteBuffer>> writer;
 
-    private int pageSize = 1000;
+    protected int pageSize = 1000;
     private String columns;
-    private String outputQuery;
+    protected String outputQuery;
     private String whereClause;
     private boolean hasCompactValueAlias = false;
         
@@ -130,7 +128,7 @@ public class CqlStorage extends AbstractCassandraStorage
     }
 
     /** set the value to the position of the tuple */
-    private void setTupleValue(Tuple tuple, int position, Object value, 
AbstractType<?> validator) throws ExecException
+    protected void setTupleValue(Tuple tuple, int position, Object value, 
AbstractType<?> validator) throws ExecException
     {
         if (validator instanceof CollectionType)
             setCollectionTupleValues(tuple, position, value, validator);
@@ -184,7 +182,7 @@ public class CqlStorage extends AbstractCassandraStorage
     }
 
     /** convert a cql column to an object */
-    private Object cqlColumnToObj(Column col, CfDef cfDef) throws IOException
+    protected Object cqlColumnToObj(Column col, CfDef cfDef) throws IOException
     {
         // standard
         Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index a207bc6..d92eba6 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -32,3 +32,5 @@ server_encryption_options:
     truststore_password: cassandra
 incremental_backups: true
 compaction_throughput_mb_per_sec: 0
+start_native_transport: true
+native_transport_port: 9052

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java 
b/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java
index 2020b0a..1819c61 100644
--- a/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java
+++ b/test/pig/org/apache/cassandra/pig/CqlTableDataTypeTest.java
@@ -220,7 +220,24 @@ public class CqlTableDataTypeTest extends PigTestBase
     public void testCqlStorageRegularType()
     throws AuthenticationException, AuthorizationException, 
InvalidRequestException, UnavailableException, TimedOutException, TException, 
NotFoundException, SchemaDisagreementException, IOException
     {
-        pig.registerQuery("rows = LOAD 'cql://cql3ks/cqltable?" + 
defaultParameters + "' USING CqlStorage();");
+        cqlTableTest("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters 
+ "' USING CqlStorage();");
+        counterTableTest("cc_rows = LOAD 'cql://cql3ks/countertable?" + 
defaultParameters + "' USING CqlStorage();");
+    }
+
+    @Test
+    public void testCqlNativeStorageRegularType()
+    throws AuthenticationException, AuthorizationException, 
InvalidRequestException, UnavailableException, TimedOutException, TException, 
NotFoundException, SchemaDisagreementException, IOException
+    {
+        //input_cql=select * from cqltable where token(key) > ? and token(key) 
<= ?
+        cqlTableTest("rows = LOAD 'cql://cql3ks/cqltable?" + defaultParameters 
+ nativeParameters + 
"&input_cql=select%20*%20from%20cqltable%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
 USING CqlStorage();");
+
+        //input_cql=select * from countertable where token(key) > ? and 
token(key) <= ?
+        counterTableTest("cc_rows = LOAD 'cql://cql3ks/countertable?" + 
defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20countertable%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
 USING CqlStorage();");
+    }
+
+    private void cqlTableTest(String initialQuery) throws IOException
+    {
+        pig.registerQuery(initialQuery);
         Iterator<Tuple> it = pig.openIterator("rows");
         //{key: int, 
         //col_ascii: chararray, 
@@ -257,21 +274,45 @@ public class CqlTableDataTypeTest extends PigTestBase
             Assert.assertEquals(t.get(14), "varchar");
             Assert.assertEquals(t.get(15), 123);
         }
-        
-        pig.registerQuery("cc_rows = LOAD 'cql://cql3ks/countertable?" + 
defaultParameters + "' USING CqlStorage();");
-        it = pig.openIterator("cc_rows");
+        else
+        {
+            Assert.fail("Failed to get data for query " + initialQuery);
+        }
+    }
+
+    private void counterTableTest(String initialQuery) throws IOException
+    {
+        pig.registerQuery(initialQuery);
+        Iterator<Tuple>  it = pig.openIterator("cc_rows");
         if (it.hasNext()) {
             Tuple t = it.next();
             Assert.assertEquals(t.get(0), 1);
             Assert.assertEquals(t.get(1), 3L);
         }
+        else
+        {
+            Assert.fail("Failed to get data for query " + initialQuery);
+        }
     }
 
     @Test
     public void testCqlStorageSetType()
     throws AuthenticationException, AuthorizationException, 
InvalidRequestException, UnavailableException, TimedOutException, TException, 
NotFoundException, SchemaDisagreementException, IOException
     {
-        pig.registerQuery("set_rows = LOAD 'cql://cql3ks/settable?" + 
defaultParameters + "' USING CqlStorage();");
+        settableTest("set_rows = LOAD 'cql://cql3ks/settable?" + 
defaultParameters + "' USING CqlStorage();");
+    }
+
+    @Test
+    public void testCqlNativeStorageSetType()
+    throws AuthenticationException, AuthorizationException, 
InvalidRequestException, UnavailableException, TimedOutException, TException, 
NotFoundException, SchemaDisagreementException, IOException
+    {
+        //input_cql=select * from settable where token(key) > ? and token(key) 
<= ?
+        settableTest("set_rows = LOAD 'cql://cql3ks/settable?" + 
defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20settable%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
 USING CqlStorage();");
+    }
+
+    private void settableTest(String initialQuery) throws IOException
+    {
+        pig.registerQuery(initialQuery);
         Iterator<Tuple> it = pig.openIterator("set_rows");
         if (it.hasNext()) {
             Tuple t = it.next();
@@ -322,13 +363,30 @@ public class CqlTableDataTypeTest extends PigTestBase
             Assert.assertEquals(innerTuple.get(0), 123);
             Assert.assertEquals(innerTuple.get(1), 124);
         }
+        else
+        {
+            Assert.fail("Failed to get data for query " + initialQuery);
+        }
     }
 
     @Test
     public void testCqlStorageListType()
     throws AuthenticationException, AuthorizationException, 
InvalidRequestException, UnavailableException, TimedOutException, TException, 
NotFoundException, SchemaDisagreementException, IOException
     {
-        pig.registerQuery("list_rows = LOAD 'cql://cql3ks/listtable?" + 
defaultParameters + "' USING CqlStorage();");
+        listtableTest("list_rows = LOAD 'cql://cql3ks/listtable?" + 
defaultParameters + "' USING CqlStorage();");
+    }
+
+    @Test
+    public void testCqlNativeStorageListType()
+    throws AuthenticationException, AuthorizationException, 
InvalidRequestException, UnavailableException, TimedOutException, TException, 
NotFoundException, SchemaDisagreementException, IOException
+    {
+        //input_cql=select * from listtable where token(key) > ? and 
token(key) <= ?
+        listtableTest("list_rows = LOAD 'cql://cql3ks/listtable?" + 
defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20listtable%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
 USING CqlStorage();");
+    }
+
+    private void listtableTest(String initialQuery) throws IOException
+    {
+        pig.registerQuery(initialQuery);
         Iterator<Tuple> it = pig.openIterator("list_rows");
         if (it.hasNext()) {
             Tuple t = it.next();
@@ -379,13 +437,30 @@ public class CqlTableDataTypeTest extends PigTestBase
             Assert.assertEquals(innerTuple.get(1), 123);
             Assert.assertEquals(innerTuple.get(0), 124);
         }
+        else
+        {
+            Assert.fail("Failed to get data for query " + initialQuery);
+        }
     }
 
     @Test
     public void testCqlStorageMapType()
     throws AuthenticationException, AuthorizationException, 
InvalidRequestException, UnavailableException, TimedOutException, TException, 
NotFoundException, SchemaDisagreementException, IOException
     {
-        pig.registerQuery("map_rows = LOAD 'cql://cql3ks/maptable?" + 
defaultParameters + "' USING CqlStorage();");
+        maptableTest("map_rows = LOAD 'cql://cql3ks/maptable?" + 
defaultParameters + "' USING CqlStorage();");
+    }
+
+    @Test
+    public void testCqlNativeStorageMapType()
+    throws AuthenticationException, AuthorizationException, 
InvalidRequestException, UnavailableException, TimedOutException, TException, 
NotFoundException, SchemaDisagreementException, IOException
+    {
+        //input_cql=select * from maptable where token(key) > ? and token(key) 
<= ?
+        maptableTest("map_rows = LOAD 'cql://cql3ks/maptable?" + 
defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20maptable%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
 USING CqlStorage();");
+    }
+
+    private void maptableTest(String initialQuery) throws IOException
+    {
+        pig.registerQuery(initialQuery);
         Iterator<Tuple> it = pig.openIterator("map_rows");
         if (it.hasNext()) {
             Tuple t = it.next();
@@ -436,5 +511,10 @@ public class CqlTableDataTypeTest extends PigTestBase
             Assert.assertEquals(innerTuple.get(0), 123);
             Assert.assertEquals(innerTuple.get(1), 124);
         }
+        else
+        {
+            Assert.fail("Failed to get data for query " + initialQuery);
+        }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/test/pig/org/apache/cassandra/pig/CqlTableTest.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/CqlTableTest.java 
b/test/pig/org/apache/cassandra/pig/CqlTableTest.java
index 15d49f2..f5adef8 100644
--- a/test/pig/org/apache/cassandra/pig/CqlTableTest.java
+++ b/test/pig/org/apache/cassandra/pig/CqlTableTest.java
@@ -93,7 +93,41 @@ public class CqlTableTest extends PigTestBase
     public void testCqlStorageSchema()
     throws AuthenticationException, AuthorizationException, 
InvalidRequestException, UnavailableException, TimedOutException, TException, 
NotFoundException, SchemaDisagreementException, IOException
     {
-        pig.registerQuery("rows = LOAD 'cql://cql3ks/cqltable?" + 
defaultParameters + "' USING CqlStorage();");
+        cqlTableSchemaTest("rows = LOAD 'cql://cql3ks/cqltable?" + 
defaultParameters + "' USING CqlStorage();");
+        compactCqlTableSchemaTest("rows = LOAD 'cql://cql3ks/compactcqltable?" 
+ defaultParameters + "' USING CqlStorage();");
+    }
+
+    @Test
+    public void testCqlNativeStorageSchema()
+    throws AuthenticationException, AuthorizationException, 
InvalidRequestException, UnavailableException, TimedOutException, TException, 
NotFoundException, SchemaDisagreementException, IOException
+    {
+        //input_cql=select * from cqltable where token(key1) > ? and 
token(key1) <= ?
+        cqlTableSchemaTest("rows = LOAD 'cql://cql3ks/cqltable?" + 
defaultParameters + nativeParameters +  
"&input_cql=select%20*%20from%20cqltable%20where%20token(key1)%20%3E%20%3F%20and%20token(key1)%20%3C%3D%20%3F'
 USING CqlNativeStorage();");
+
+        //input_cql=select * from compactcqltable where token(key1) > ? and 
token(key1) <= ?
+        compactCqlTableSchemaTest("rows = LOAD 'cql://cql3ks/compactcqltable?" 
+ defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20compactcqltable%20where%20token(key1)%20%3E%20%3F%20and%20token(key1)%20%3C%3D%20%3F'
 USING CqlNativeStorage();");
+    }
+
+    private void compactCqlTableSchemaTest(String initialQuery) throws 
IOException
+    {
+        pig.registerQuery(initialQuery);
+        Iterator<Tuple>  it = pig.openIterator("rows");
+        if (it.hasNext()) {
+            Tuple t = it.next();
+            Assert.assertEquals(t.get(0).toString(), "key1");
+            Assert.assertEquals(t.get(1), 100);
+            Assert.assertEquals(t.get(2), 10.1f);
+            Assert.assertEquals(3, t.size());
+        }
+        else
+        {
+            Assert.fail("Failed to get data for query " + initialQuery);
+        }
+    }
+
+    private void cqlTableSchemaTest(String initialQuery) throws IOException
+    {
+        pig.registerQuery(initialQuery);
         Iterator<Tuple> it = pig.openIterator("rows");
         if (it.hasNext()) {
             Tuple t = it.next();
@@ -103,15 +137,9 @@ public class CqlTableTest extends PigTestBase
             Assert.assertEquals(t.get(3), 10.1f);
             Assert.assertEquals(4, t.size());
         }
-
-        pig.registerQuery("rows = LOAD 'cql://cql3ks/compactcqltable?" + 
defaultParameters + "' USING CqlStorage();");
-        it = pig.openIterator("rows");
-        if (it.hasNext()) {
-            Tuple t = it.next();
-            Assert.assertEquals(t.get(0).toString(), "key1");
-            Assert.assertEquals(t.get(1), 100);
-            Assert.assertEquals(t.get(2), 10.1f);
-            Assert.assertEquals(3, t.size());
+        else
+        {
+            Assert.fail("Failed to get data for query " + initialQuery);
         }
     }
 
@@ -119,8 +147,23 @@ public class CqlTableTest extends PigTestBase
     public void testCqlStorageSingleKeyTable()
     throws AuthenticationException, AuthorizationException, 
InvalidRequestException, UnavailableException, TimedOutException, TException, 
NotFoundException, SchemaDisagreementException, IOException
     {
+        SingleKeyTableTest("moretestvalues= LOAD 'cql://cql3ks/moredata?" + 
defaultParameters + "' USING CqlStorage();");
+
+    }
+
+    @Test
+    public void testCqlNativeStorageSingleKeyTable()
+    throws AuthenticationException, AuthorizationException, 
InvalidRequestException, UnavailableException, TimedOutException, TException, 
NotFoundException, SchemaDisagreementException, IOException
+    {
+        //input_cql=select * from moredata where token(x) > ? and token(x) <= ?
+        SingleKeyTableTest("moretestvalues= LOAD 'cql://cql3ks/moredata?" + 
defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20moredata%20where%20token(x)%20%3E%20%3F%20and%20token(x)%20%3C%3D%20%3F'
 USING CqlNativeStorage();");
+    }
+
+    private void SingleKeyTableTest(String initialQuery)
+    throws AuthenticationException, AuthorizationException, 
InvalidRequestException, UnavailableException, TimedOutException, TException, 
NotFoundException, SchemaDisagreementException, IOException
+    {
         pig.setBatchOn();
-        pig.registerQuery("moretestvalues= LOAD 'cql://cql3ks/moredata?" + 
defaultParameters + "' USING CqlStorage();");
+        pig.registerQuery(initialQuery);
         pig.registerQuery("insertformat= FOREACH moretestvalues GENERATE 
TOTUPLE(TOTUPLE('a',x)),TOTUPLE(y);");
         pig.registerQuery("STORE insertformat INTO 'cql://cql3ks/test?" + 
defaultParameters + "&output_query=UPDATE+cql3ks.test+set+b+%3D+%3F' USING 
CqlStorage();");
         pig.executeBatch();
@@ -132,7 +175,7 @@ public class CqlTableTest extends PigTestBase
         //(1,1)
         pig.registerQuery("result= LOAD 'cql://cql3ks/test?" + 
defaultParameters + "' USING CqlStorage();");
         Iterator<Tuple> it = pig.openIterator("result");
-        if (it.hasNext()) {
+        while (it.hasNext()) {
             Tuple t = it.next();
             Assert.assertEquals(t.get(0), t.get(1));
         }
@@ -142,8 +185,22 @@ public class CqlTableTest extends PigTestBase
     public void testCqlStorageCompositeKeyTable()
     throws AuthenticationException, AuthorizationException, 
InvalidRequestException, UnavailableException, TimedOutException, TException, 
NotFoundException, SchemaDisagreementException, IOException
     {
+        CompositeKeyTableTest("moredata= LOAD 'cql://cql3ks/compmore?" + 
defaultParameters + "' USING CqlStorage();");
+    }
+
+    @Test
+    public void testCqlNativeStorageCompositeKeyTable()
+    throws AuthenticationException, AuthorizationException, 
InvalidRequestException, UnavailableException, TimedOutException, TException, 
NotFoundException, SchemaDisagreementException, IOException
+    {
+        //input_cql=select * from compmore where token(id) > ? and token(id) 
<= ?
+        CompositeKeyTableTest("moredata= LOAD 'cql://cql3ks/compmore?" + 
defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20compmore%20where%20token(id)%20%3E%20%3F%20and%20token(id)%20%3C%3D%20%3F'
 USING CqlNativeStorage();");
+    }
+
+    private void CompositeKeyTableTest(String initialQuery)
+    throws AuthenticationException, AuthorizationException, 
InvalidRequestException, UnavailableException, TimedOutException, TException, 
NotFoundException, SchemaDisagreementException, IOException
+    {
         pig.setBatchOn();
-        pig.registerQuery("moredata= LOAD 'cql://cql3ks/compmore?" + 
defaultParameters + "' USING CqlStorage();");
+        pig.registerQuery(initialQuery);
         pig.registerQuery("insertformat = FOREACH moredata GENERATE TOTUPLE 
(TOTUPLE('a',x),TOTUPLE('b',y), TOTUPLE('c',z)),TOTUPLE(data);");
         pig.registerQuery("STORE insertformat INTO 'cql://cql3ks/compotable?" 
+ defaultParameters + 
"&output_query=UPDATE%20cql3ks.compotable%20SET%20d%20%3D%20%3F' USING 
CqlStorage();");
         pig.executeBatch();
@@ -171,8 +228,22 @@ public class CqlTableTest extends PigTestBase
     public void testCqlStorageCollectionColumnTable()
     throws AuthenticationException, AuthorizationException, 
InvalidRequestException, UnavailableException, TimedOutException, TException, 
NotFoundException, SchemaDisagreementException, IOException
     {
+        CollectionColumnTableTest("collectiontable= LOAD 
'cql://cql3ks/collectiontable?" + defaultParameters + "' USING CqlStorage();");
+    }
+
+    @Test
+    public void testCqlNativeStorageCollectionColumnTable()
+    throws AuthenticationException, AuthorizationException, 
InvalidRequestException, UnavailableException, TimedOutException, TException, 
NotFoundException, SchemaDisagreementException, IOException
+    {
+        //input_cql=select * from collectiontable where token(m) < ? and 
token(m) <= ?
+        CollectionColumnTableTest("collectiontable= LOAD 
'cql://cql3ks/collectiontable?" + defaultParameters + nativeParameters + 
"&input_cql=input_cql%3Dselect%20*%20from%20collectiontable%20where%20token(m)%20%3C%20%3F%20and%20token(m)%20%3C%3D%20%3F'
 USING CqlNativeStorage();");
+    }
+
+    private void CollectionColumnTableTest(String initialQuery)
+    throws AuthenticationException, AuthorizationException, 
InvalidRequestException, UnavailableException, TimedOutException, TException, 
NotFoundException, SchemaDisagreementException, IOException
+    {
         pig.setBatchOn();
-        pig.registerQuery("collectiontable= LOAD 
'cql://cql3ks/collectiontable?" + defaultParameters + "' USING CqlStorage();");
+        pig.registerQuery(initialQuery);
         pig.registerQuery("recs= FOREACH collectiontable GENERATE 
TOTUPLE(TOTUPLE('m', m) ), TOTUPLE(TOTUPLE('map', TOTUPLE('m', 'mm'), 
TOTUPLE('n', 'nn')));");
         pig.registerQuery("STORE recs INTO 'cql://cql3ks/collectiontable?" + 
defaultParameters + "&output_query=update+cql3ks.collectiontable+set+n+%3D+%3F' 
USING CqlStorage();");
         pig.executeBatch();
@@ -183,7 +254,7 @@ public class CqlTableTest extends PigTestBase
         //(book1,((m,mm),(n,nn)))
         pig.registerQuery("result= LOAD 'cql://cql3ks/collectiontable?" + 
defaultParameters + "' USING CqlStorage();");
         Iterator<Tuple> it = pig.openIterator("result");
-        while (it.hasNext()) {
+        if (it.hasNext()) {
             Tuple t = it.next();
             Tuple t1 = (Tuple) t.get(1);
             Assert.assertEquals(t1.size(), 2);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/test/pig/org/apache/cassandra/pig/PigTestBase.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/PigTestBase.java 
b/test/pig/org/apache/cassandra/pig/PigTestBase.java
index ea06b8c..ed307f4 100644
--- a/test/pig/org/apache/cassandra/pig/PigTestBase.java
+++ b/test/pig/org/apache/cassandra/pig/PigTestBase.java
@@ -66,6 +66,9 @@ public class PigTestBase extends SchemaLoader
     protected static MiniCluster cluster; 
     protected static PigServer pig;
     protected static String defaultParameters= 
"init_address=localhost&rpc_port=9170&partitioner=org.apache.cassandra.dht.ByteOrderedPartitioner";
+    protected static String nativeParameters = 
"&core_conns=2&max_conns=10&min_simult_reqs=3&max_simult_reqs=10&native_timeout=10000000"
  +
+                                               
"&native_read_timeout=10000000&send_buff_size=4096&receive_buff_size=4096&solinger=3"
 +
+                                               
"&tcp_nodelay=true&reuse_address=true&keep_alive=true&native_port=9052";
 
     @AfterClass
     public static void oneTimeTearDown() throws Exception {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1e2a5b0e/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
----------------------------------------------------------------------
diff --git a/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java 
b/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
index e114d37..3f1d5a1 100644
--- a/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
+++ b/test/pig/org/apache/cassandra/pig/ThriftColumnFamilyTest.java
@@ -183,6 +183,13 @@ public class ThriftColumnFamilyTest extends PigTestBase
                         "and comparator = LongType;"
     };
 
+    private static String[] deleteCopyOfSomeAppTableData = { "use thriftKs;",
+            "DEL CopyOfSomeApp ['foo']",
+            "DEL CopyOfSomeApp ['bar']",
+            "DEL CopyOfSomeApp ['baz']",
+            "DEL CopyOfSomeApp ['qux']"
+    };
+
     @BeforeClass
     public static void setup() throws TTransportException, IOException, 
InterruptedException, ConfigurationException,
                                       AuthenticationException, 
AuthorizationException, InvalidRequestException, UnavailableException, 
TimedOutException, TException, NotFoundException, CharacterCodingException, 
ClassNotFoundException, NoSuchFieldException, IllegalAccessException, 
InstantiationException
@@ -196,7 +203,35 @@ public class ThriftColumnFamilyTest extends PigTestBase
     public void testCqlStorage() throws IOException, ClassNotFoundException, 
TException, TimedOutException, NotFoundException, InvalidRequestException, 
NoSuchFieldException, UnavailableException, IllegalAccessException, 
InstantiationException, AuthenticationException, AuthorizationException
     {
         //regular thrift column families
-        pig.registerQuery("data = load 'cql://thriftKs/SomeApp?" + 
defaultParameters + "' using CqlStorage();");
+        cqlStorageTest("data = load 'cql://thriftKs/SomeApp?" + 
defaultParameters + "' using CqlStorage();");
+
+        //Test counter colun family
+        // This test fails for CASSANDRA-7059
+        //cqlStorageCounterTableTest("cc_data = load 'cql://thriftKs/CC?" + 
defaultParameters + "' using CqlStorage();");
+
+        //Test composite column family
+        cqlStorageCompositeTableTest("compo_data = load 
'cql://thriftKs/Compo?" + defaultParameters + "' using CqlStorage();");
+    }
+
+    @Test
+    public void testCqlNativeStorage() throws IOException, 
ClassNotFoundException, TException, TimedOutException, NotFoundException, 
InvalidRequestException, NoSuchFieldException, UnavailableException, 
IllegalAccessException, InstantiationException, AuthenticationException, 
AuthorizationException
+    {
+        //regular thrift column families
+        //input_cql=select * from "SomeApp" where token(key) > ? and 
token(key) <= ?
+        cqlStorageTest("data = load 'cql://thriftKs/SomeApp?" + 
defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20%22SomeApp%22%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
 using CqlNativeStorage();");
+
+        //Test counter colun family
+        //input_cql=select * from "CC" where token(key) > ? and token(key) <= ?
+        cqlStorageCounterTableTest("cc_data = load 'cql://thriftKs/CC?" + 
defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20%22CC%22%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
 using CqlNativeStorage();");
+
+        //Test composite column family
+        //input_cql=select * from "Compo" where token(key) > ? and token(key) 
<= ?
+        cqlStorageCompositeTableTest("compo_data = load 
'cql://thriftKs/Compo?" + defaultParameters + nativeParameters + 
"&input_cql=select%20*%20from%20%22Compo%22%20where%20token(key)%20%3E%20%3F%20and%20token(key)%20%3C%3D%20%3F'
 using CqlNativeStorage();");
+    }
+
+    private void cqlStorageTest(String initialQuery) throws IOException
+    {
+        pig.registerQuery(initialQuery);
 
         //(bar,3.141592653589793,1335890877,User Bar,35.0,9,15000,like)
         //(baz,1.61803399,1335890877,User Baz,95.3,3,512000,dislike)
@@ -249,16 +284,18 @@ public class ThriftColumnFamilyTest extends PigTestBase
             }
         }
         Assert.assertEquals(count, 4);
+    }
 
-        //Test counter colun family
-        pig.registerQuery("cc_data = load 'cql://thriftKs/CC?" + 
defaultParameters + "' using CqlStorage();");
+    private void cqlStorageCounterTableTest(String initialQuery) throws 
IOException
+    {
+        pig.registerQuery(initialQuery);
 
         //(chuck,fist,1)
         //(chuck,kick,3)
 
         // {key: chararray,column1: chararray,value: long}
-        it = pig.openIterator("cc_data");
-        count = 0;
+        Iterator<Tuple> it = pig.openIterator("cc_data");
+        int count = 0;
         while (it.hasNext()) {
             count ++;
             Tuple t = it.next();
@@ -268,9 +305,11 @@ public class ThriftColumnFamilyTest extends PigTestBase
                 Assert.assertEquals(t.get(2), 3L);
         }
         Assert.assertEquals(count, 2);
+    }
 
-        //Test composite column family
-        pig.registerQuery("compo_data = load 'cql://thriftKs/Compo?" + 
defaultParameters + "' using CqlStorage();");
+    private void cqlStorageCompositeTableTest(String initialQuery) throws 
IOException
+    {
+        pig.registerQuery(initialQuery);
 
         //(kick,bruce,bruce,watch it, mate)
         //(kick,bruce,lee,oww)
@@ -278,8 +317,8 @@ public class ThriftColumnFamilyTest extends PigTestBase
         //(punch,bruce,lee,ouch)
 
         //{key: chararray,column1: chararray,column2: chararray,value: 
chararray}
-        it = pig.openIterator("compo_data");
-        count = 0;
+        Iterator<Tuple> it = pig.openIterator("compo_data");
+        int count = 0;
         while (it.hasNext()) {
             count ++;
             Tuple t = it.next();
@@ -351,7 +390,6 @@ public class ThriftColumnFamilyTest extends PigTestBase
     @Test
     public void testCassandraStorageFullCopy() throws IOException, 
ClassNotFoundException, TException, TimedOutException, NotFoundException, 
InvalidRequestException, NoSuchFieldException, UnavailableException, 
IllegalAccessException, InstantiationException, AuthenticationException, 
AuthorizationException
     {
-        createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]);
         pig.setBatchOn();
         pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + 
defaultParameters + "' USING CassandraStorage();");
         //full copy
@@ -365,7 +403,7 @@ public class ThriftColumnFamilyTest extends PigTestBase
     @Test
     public void testCassandraStorageSigleTupleCopy() throws IOException, 
ClassNotFoundException, TException, TimedOutException, NotFoundException, 
InvalidRequestException, NoSuchFieldException, UnavailableException, 
IllegalAccessException, InstantiationException, AuthenticationException, 
AuthorizationException
     {
-        createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]);
+        executeCliStatements(deleteCopyOfSomeAppTableData);
         pig.setBatchOn();
         pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + 
defaultParameters + "' USING CassandraStorage();");
         //sigle tuple
@@ -399,7 +437,7 @@ public class ThriftColumnFamilyTest extends PigTestBase
     @Test
     public void testCassandraStorageBagOnlyCopy() throws IOException, 
ClassNotFoundException, TException, TimedOutException, NotFoundException, 
InvalidRequestException, NoSuchFieldException, UnavailableException, 
IllegalAccessException, InstantiationException, AuthenticationException, 
AuthorizationException
     {
-        createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]);
+        executeCliStatements(deleteCopyOfSomeAppTableData);
         pig.setBatchOn();
         pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + 
defaultParameters + "' USING CassandraStorage();");
         //bag only
@@ -443,7 +481,7 @@ public class ThriftColumnFamilyTest extends PigTestBase
     @Test
     public void testCassandraStorageFilter() throws IOException, 
ClassNotFoundException, TException, TimedOutException, NotFoundException, 
InvalidRequestException, NoSuchFieldException, UnavailableException, 
IllegalAccessException, InstantiationException, AuthenticationException, 
AuthorizationException
     {
-        createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]);
+        executeCliStatements(deleteCopyOfSomeAppTableData);
         pig.setBatchOn();
         pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + 
defaultParameters + "' USING CassandraStorage();");
 
@@ -476,7 +514,7 @@ public class ThriftColumnFamilyTest extends PigTestBase
         if (value != null)
             Assert.fail();
 
-        createColumnFamily("thriftKs", "CopyOfSomeApp", statements[3]);
+        executeCliStatements(deleteCopyOfSomeAppTableData);
         pig.setBatchOn();
         pig.registerQuery("rows = LOAD 'cassandra://thriftKs/SomeApp?" + 
defaultParameters + "' USING CassandraStorage();");
         pig.registerQuery("dislikes_extras = FILTER rows by vote_type.value eq 
'dislike' AND COUNT(columns) > 0;");
@@ -746,17 +784,16 @@ public class ThriftColumnFamilyTest extends PigTestBase
         return parseType(validator).getString(got.getColumn().value);
     }
 
-    private void createColumnFamily(String ks, String cf, String statement) 
throws CharacterCodingException, ClassNotFoundException, TException, 
TimedOutException, NotFoundException, InvalidRequestException, 
NoSuchFieldException, UnavailableException, IllegalAccessException, 
InstantiationException
+    private void executeCliStatements(String[] statements) throws 
CharacterCodingException, ClassNotFoundException, TException, 
TimedOutException, NotFoundException, InvalidRequestException, 
NoSuchFieldException, UnavailableException, IllegalAccessException, 
InstantiationException
     {
         CliMain.connect("127.0.0.1", 9170);
         try
         {
-            CliMain.processStatement("use " + ks + ";");
-            CliMain.processStatement("drop column family " + cf + ";");
+            for (String stmt : statements)
+                CliMain.processStatement(stmt);
         }
         catch (Exception e)
         {
         }
-        CliMain.processStatement(statement);
     }
 }

Reply via email to