http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
new file mode 100644
index 0000000..928370c
--- /dev/null
+++ 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
@@ -0,0 +1,836 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.gora.cassandra.serializers;
+
+import com.datastax.driver.core.querybuilder.BuiltStatement;
+import com.datastax.driver.core.querybuilder.Delete;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.core.querybuilder.Select;
+import com.datastax.driver.core.querybuilder.Update;
+import com.datastax.driver.mapping.annotations.UDT;
+import org.apache.avro.Schema;
+import org.apache.gora.cassandra.bean.CassandraKey;
+import org.apache.gora.cassandra.bean.ClusterKeyField;
+import org.apache.gora.cassandra.bean.Field;
+import org.apache.gora.cassandra.bean.KeySpace;
+import org.apache.gora.cassandra.bean.PartitionKeyField;
+import org.apache.gora.cassandra.query.CassandraQuery;
+import org.apache.gora.cassandra.store.CassandraMapping;
+import org.apache.gora.cassandra.store.CassandraStore;
+import org.apache.gora.query.Query;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class is used create Cassandra Queries.
+ */
+class CassandraQueryFactory {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CassandraQueryFactory.class);
+
+  /**
+   * This method returns the CQL query to create key space.
+   * refer : 
http://docs.datastax.com/en/cql/3.1/cql/cql_reference/create_keyspace_r.html
+   *
+   * @param mapping Cassandra Mapping {@link CassandraMapping}
+   * @return CQL Query
+   */
+  static String getCreateKeySpaceQuery(CassandraMapping mapping) {
+    KeySpace keySpace = mapping.getKeySpace();
+    StringBuilder stringBuffer = new StringBuilder();
+    stringBuffer.append("CREATE KEYSPACE IF NOT EXISTS 
").append(keySpace.getName()).append(" WITH REPLICATION = { 'class' : ");
+    KeySpace.PlacementStrategy placementStrategy = 
keySpace.getPlacementStrategy();
+    stringBuffer.append("'").append(placementStrategy).append("'").append(", 
").append("'");
+    switch (placementStrategy) {
+      case SimpleStrategy:
+        stringBuffer.append("replication_factor").append("'").append(" : 
").append(keySpace.getReplicationFactor()).append(" }");
+        break;
+      case NetworkTopologyStrategy:
+        boolean isCommaNeeded = false;
+        for (Map.Entry<String, Integer> entry : 
keySpace.getDataCenters().entrySet()) {
+          if (isCommaNeeded) {
+            stringBuffer.append(", '");
+          }
+          stringBuffer.append(entry.getKey()).append("'").append(" : 
").append(entry.getValue());
+          isCommaNeeded = true;
+        }
+        stringBuffer.append(" }");
+        break;
+    }
+    if (keySpace.isDurableWritesEnabled()) {
+      stringBuffer.append(" AND DURABLE_WRITES = 
").append(keySpace.isDurableWritesEnabled());
+    }
+    return stringBuffer.toString();
+  }
+
+  /**
+   * This method returns the CQL query to table.
+   * refer : 
http://docs.datastax.com/en/cql/3.1/cql/cql_reference/create_table_r.html
+   * <p>
+   * Trick : To have a consistency of the order of the columns, first we 
append partition keys, second cluster keys and finally other columns.
+   * It's very much needed to follow the same order in other CRUD operations 
as well.
+   *
+   * @param mapping Cassandra mapping {@link CassandraMapping}
+   * @return CQL Query
+   */
+  static String getCreateTableQuery(CassandraMapping mapping) {
+    StringBuilder stringBuffer = new StringBuilder();
+    stringBuffer.append("CREATE TABLE IF NOT EXISTS 
").append(mapping.getKeySpace().getName()).append(".").append(mapping.getCoreName()).append("
 (");
+    CassandraKey cassandraKey = mapping.getCassandraKey();
+    // appending Cassandra Persistent columns into db schema
+    processFieldsForCreateTableQuery(mapping.getFieldList(), false, 
stringBuffer);
+
+    if (cassandraKey != null) {
+      processFieldsForCreateTableQuery(cassandraKey.getFieldList(), true, 
stringBuffer);
+      List<PartitionKeyField> partitionKeys = 
cassandraKey.getPartitionKeyFields();
+      if (partitionKeys != null) {
+        stringBuffer.append(", PRIMARY KEY (");
+        boolean isCommaNeededToApply = false;
+        for (PartitionKeyField keyField : partitionKeys) {
+          if (isCommaNeededToApply) {
+            stringBuffer.append(",");
+          }
+          if (keyField.isComposite()) {
+            stringBuffer.append("(");
+            boolean isCommaNeededHere = false;
+            for (Field field : keyField.getFields()) {
+              if (isCommaNeededHere) {
+                stringBuffer.append(", ");
+              }
+              stringBuffer.append(field.getColumnName());
+              isCommaNeededHere = true;
+            }
+            stringBuffer.append(")");
+          } else {
+            stringBuffer.append(keyField.getColumnName());
+          }
+          isCommaNeededToApply = true;
+        }
+        stringBuffer.append(")");
+      }
+    }
+
+    stringBuffer.append(")");
+    boolean isWithNeeded = true;
+    if (Boolean.parseBoolean(mapping.getProperty("compactStorage"))) {
+      stringBuffer.append(" WITH COMPACT STORAGE ");
+      isWithNeeded = false;
+    }
+
+    String id = mapping.getProperty("id");
+    if (id != null) {
+      if (isWithNeeded) {
+        stringBuffer.append(" WITH ");
+      } else {
+        stringBuffer.append(" AND ");
+      }
+      stringBuffer.append("ID = '").append(id).append("'");
+      isWithNeeded = false;
+    }
+    if (cassandraKey != null) {
+      List<ClusterKeyField> clusterKeyFields = 
cassandraKey.getClusterKeyFields();
+      if (clusterKeyFields != null) {
+        if (isWithNeeded) {
+          stringBuffer.append(" WITH ");
+        } else {
+          stringBuffer.append(" AND ");
+        }
+        stringBuffer.append(" CLUSTERING ORDER BY (");
+        boolean isCommaNeededToApply = false;
+        for (ClusterKeyField keyField : clusterKeyFields) {
+          if (isCommaNeededToApply) {
+            stringBuffer.append(", ");
+          }
+          stringBuffer.append(keyField.getColumnName()).append(" ");
+          if (keyField.getOrder() != null) {
+            stringBuffer.append(keyField.getOrder());
+          }
+          isCommaNeededToApply = true;
+        }
+        stringBuffer.append(")");
+      }
+    }
+    return stringBuffer.toString();
+  }
+
+  private static void processFieldsForCreateTableQuery(List<Field> fields, 
boolean isCommaNeeded, StringBuilder stringBuilder) {
+    for (Field field : fields) {
+      if (isCommaNeeded) {
+        stringBuilder.append(", ");
+      }
+      stringBuilder.append(field.getColumnName()).append(" 
").append(field.getType());
+      boolean isStaticColumn = 
Boolean.parseBoolean(field.getProperty("static"));
+      boolean isPrimaryKey = 
Boolean.parseBoolean(field.getProperty("primarykey"));
+      if (isStaticColumn) {
+        stringBuilder.append(" STATIC");
+      }
+      if (isPrimaryKey) {
+        stringBuilder.append("  PRIMARY KEY ");
+      }
+      isCommaNeeded = true;
+    }
+  }
+
+  /**
+   * This method returns the CQL query to drop table.
+   * refer : 
http://docs.datastax.com/en/cql/3.1/cql/cql_reference/drop_table_r.html
+   *
+   * @param mapping Cassandra Mapping {@link CassandraMapping}
+   * @return CQL query
+   */
+  static String getDropTableQuery(CassandraMapping mapping) {
+    return "DROP TABLE IF EXISTS " + mapping.getKeySpace().getName() + "." + 
mapping.getCoreName();
+  }
+
+  /**
+   * This method returns the CQL query to drop key space.
+   * refer : 
http://docs.datastax.com/en/cql/3.1/cql/cql_reference/drop_keyspace_r.html
+   *
+   * @param mapping Cassandra Mapping {@link CassandraMapping}
+   * @return CQL query
+   */
+  static String getDropKeySpaceQuery(CassandraMapping mapping) {
+    return "DROP KEYSPACE IF EXISTS " + mapping.getKeySpace().getName();
+  }
+
+  /**
+   * This method returns the CQL query to truncate (removes all the data) in 
the table.
+   * refer : 
http://docs.datastax.com/en/cql/3.1/cql/cql_reference/truncate_r.html
+   *
+   * @param mapping Cassandra Mapping {@link CassandraMapping}
+   * @return CQL query
+   */
+  static String getTruncateTableQuery(CassandraMapping mapping) {
+    return QueryBuilder.truncate(mapping.getKeySpace().getName(), 
mapping.getCoreName()).getQueryString();
+  }
+
+  /**
+   * This method return the CQL query to insert data in to the table.
+   * refer : 
http://docs.datastax.com/en/cql/3.1/cql/cql_reference/insert_r.html
+   *
+   * @param mapping Cassandra Mapping {@link CassandraMapping}
+   * @param fields  available fields
+   * @return CQL Query
+   */
+  static String getInsertDataQuery(CassandraMapping mapping, List<String> 
fields) {
+    String[] columnNames = getColumnNames(mapping, fields);
+    String[] objects = new String[fields.size()];
+    Arrays.fill(objects, "?");
+    return QueryBuilder.insertInto(mapping.getKeySpace().getName(), 
mapping.getCoreName()).values(columnNames, objects).getQueryString();
+  }
+
+  /**
+   * This method return the CQL query to delete a persistent in the table.
+   * refer : 
http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlDelete.html
+   *
+   * @param mapping Cassandra Mapping {@link CassandraMapping}
+   * @param fields  filed list to be deleted
+   * @return CQL Query
+   */
+  static String getDeleteDataQuery(CassandraMapping mapping, List<String> 
fields) {
+    String[] columnNames = getColumnNames(mapping, fields);
+    String[] objects = new String[fields.size()];
+    Arrays.fill(objects, "?");
+    Delete delete = 
QueryBuilder.delete().from(mapping.getKeySpace().getName(), 
mapping.getCoreName());
+    return processKeys(columnNames, delete);
+  }
+
+  private static String processKeys(String[] columnNames, BuiltStatement 
delete) {
+    BuiltStatement query = null;
+    boolean isWhereNeeded = true;
+    for (String columnName : columnNames) {
+      if (isWhereNeeded) {
+        if (delete instanceof Delete) {
+          query = ((Delete) delete).where(QueryBuilder.eq(columnName, "?"));
+        } else {
+          query = ((Select) delete).where(QueryBuilder.eq(columnName, "?"));
+        }
+        isWhereNeeded = false;
+      } else {
+        if (delete instanceof Delete) {
+          query = ((Delete.Where) query).and(QueryBuilder.eq(columnName, "?"));
+        } else {
+          query = ((Select.Where) query).and(QueryBuilder.eq(columnName, "?"));
+        }
+      }
+    }
+    return query != null ? query.getQueryString() : null;
+  }
+
+  /**
+   * This method returns the CQL Select query to retrieve data from the table.
+   * refer: 
http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlSelect.html
+   *
+   * @param mapping   Cassandra Mapping {@link CassandraMapping}
+   * @param keyFields key fields
+   * @return CQL Query
+   */
+  static String getSelectObjectQuery(CassandraMapping mapping, List<String> 
keyFields) {
+    Select select = 
QueryBuilder.select().from(mapping.getKeySpace().getName(), 
mapping.getCoreName());
+    if (Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) {
+      select.allowFiltering();
+    }
+    String[] columnNames = getColumnNames(mapping, keyFields);
+    return processKeys(columnNames, select);
+  }
+
+  /**
+   * This method returns CQL Select query to retrieve data from the table with 
given fields.
+   * This method is used for Avro Serialization
+   * refer: 
http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlSelect.html
+   *
+   * @param mapping   Cassandra Mapping {@link CassandraMapping}
+   * @param fields    Given fields to retrieve
+   * @param keyFields key fields
+   * @return CQL Query
+   */
+  static String getSelectObjectWithFieldsQuery(CassandraMapping mapping, 
String[] fields, List<String> keyFields) {
+    Select select = QueryBuilder.select(getColumnNames(mapping, 
Arrays.asList(fields))).from(mapping.getKeySpace().getName(), 
mapping.getCoreName());
+    if (Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) {
+      select.allowFiltering();
+    }
+    String[] columnNames = getColumnNames(mapping, keyFields);
+    return processKeys(columnNames, select);
+  }
+
+  /**
+   * This method returns CQL Select query to retrieve data from the table with 
given fields.
+   * This method is used for Native Serialization
+   * refer: 
http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlSelect.html
+   *
+   * @param mapping Cassandra Mapping {@link CassandraMapping}
+   * @param fields  Given fields to retrieve
+   * @return CQL Query
+   */
+  static String getSelectObjectWithFieldsQuery(CassandraMapping mapping, 
String[] fields) {
+    String cqlQuery = null;
+    String[] columnNames = getColumnNames(mapping, Arrays.asList(fields));
+    Select select = 
QueryBuilder.select(columnNames).from(mapping.getKeySpace().getName(), 
mapping.getCoreName());
+    if (Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) {
+      select.allowFiltering();
+    }
+    CassandraKey cKey = mapping.getCassandraKey();
+    if (cKey != null) {
+      Select.Where query = null;
+      boolean isWhereNeeded = true;
+      for (Field field : cKey.getFieldList()) {
+        if (isWhereNeeded) {
+          query = select.where(QueryBuilder.eq(field.getColumnName(), "?"));
+          isWhereNeeded = false;
+        } else {
+          query = query.and(QueryBuilder.eq(field.getColumnName(), "?"));
+        }
+      }
+      cqlQuery = query != null ? query.getQueryString() : null;
+    } else {
+      for (Field field : mapping.getFieldList()) {
+        boolean isPrimaryKey = 
Boolean.parseBoolean(field.getProperty("primarykey"));
+        if (isPrimaryKey) {
+          cqlQuery = select.where(QueryBuilder.eq(field.getColumnName(), 
"?")).getQueryString();
+          break;
+        }
+      }
+    }
+    return cqlQuery;
+  }
+
+
+  /**
+   * This method returns CQL Query for execute method. This CQL contains a 
Select Query to retrieve data from the table
+   *
+   * @param mapping        Cassandra Mapping {@link CassandraMapping}
+   * @param cassandraQuery Query {@link CassandraQuery}
+   * @param objects        object list
+   * @return CQL Query
+   */
+  static String getExecuteQuery(CassandraMapping mapping, Query 
cassandraQuery, List<Object> objects, String[] fields) {
+    long limit = cassandraQuery.getLimit();
+    Select select = QueryBuilder.select(getColumnNames(mapping, 
Arrays.asList(fields))).from(mapping.getKeySpace().getName(), 
mapping.getCoreName());
+    if (limit > 0) {
+      select = select.limit((int) limit);
+    }
+    if (Boolean.parseBoolean(mapping.getProperty("allowFiltering"))) {
+      select.allowFiltering();
+    }
+    return processQuery(cassandraQuery, select, mapping, objects);
+  }
+
+  private static String processQuery(Query cassandraQuery, BuiltStatement 
select, CassandraMapping mapping, List<Object> objects) {
+    String primaryKey = null;
+    BuiltStatement query = null;
+    Object startKey = cassandraQuery.getStartKey();
+    Object endKey = cassandraQuery.getEndKey();
+    Object key = cassandraQuery.getKey();
+    boolean isWhereNeeded = true;
+    if (key != null) {
+      if (mapping.getCassandraKey() != null) {
+        ArrayList<String> cassandraKeys = new ArrayList<>();
+        ArrayList<Object> cassandraValues = new ArrayList<>();
+        AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, 
cassandraValues);
+        String[] columnKeys = getColumnNames(mapping, cassandraKeys);
+        for (int i = 0; i < cassandraKeys.size(); i++) {
+          if (isWhereNeeded) {
+            if (select instanceof Select) {
+              query = ((Select) select).where(QueryBuilder.eq(columnKeys[i], 
"?"));
+            } else if (select instanceof Delete) {
+              query = ((Delete) select).where(QueryBuilder.eq(columnKeys[i], 
"?"));
+            } else {
+              query = ((Update.Assignments) 
select).where(QueryBuilder.eq(columnKeys[i], "?"));
+            }
+            objects.add(cassandraValues.get(i));
+            isWhereNeeded = false;
+          } else {
+            if (select instanceof Select) {
+              query = ((Select.Where) 
query).and(QueryBuilder.eq(columnKeys[i], "?"));
+            } else if (select instanceof Delete) {
+              query = ((Delete.Where) 
query).and(QueryBuilder.eq(columnKeys[i], "?"));
+            } else {
+              query = ((Update.Where) 
query).and(QueryBuilder.eq(columnKeys[i], "?"));
+            }
+            objects.add(cassandraValues.get(i));
+          }
+        }
+      } else {
+        primaryKey = getPKey(mapping.getFieldList());
+        if (select instanceof Select) {
+          query = ((Select) select).where(QueryBuilder.eq(primaryKey, "?"));
+        } else if (select instanceof Delete) {
+          query = ((Delete) select).where(QueryBuilder.eq(primaryKey, "?"));
+        } else {
+          query = ((Update.Assignments) 
select).where(QueryBuilder.eq(primaryKey, "?"));
+        }
+        objects.add(key);
+      }
+    } else {
+      if (startKey != null) {
+        if (mapping.getCassandraKey() != null) {
+          ArrayList<String> cassandraKeys = new ArrayList<>();
+          ArrayList<Object> cassandraValues = new ArrayList<>();
+          AvroCassandraUtils.processKeys(mapping, startKey, cassandraKeys, 
cassandraValues);
+          String[] columnKeys = getColumnNames(mapping, cassandraKeys);
+          for (int i = 0; i < cassandraKeys.size(); i++) {
+            if (isWhereNeeded) {
+              if (select instanceof Select) {
+                query = ((Select) 
select).where(QueryBuilder.gte(columnKeys[i], "?"));
+              } else if (select instanceof Delete) {
+                /*
+                According to the JIRA 
https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but 
It seems this not fixed yet.
+                 */
+                throw new RuntimeException("Delete by Query is not suppoted 
for Key Ranges.");
+              } else {
+                query = ((Update.Assignments) 
select).where(QueryBuilder.gte(columnKeys[i], "?"));
+              }
+              objects.add(cassandraValues.get(i));
+              isWhereNeeded = false;
+            } else {
+              if (select instanceof Select) {
+                query = ((Select.Where) 
query).and(QueryBuilder.gte(columnKeys[i], "?"));
+              } else if (select instanceof Delete) {
+                       /*
+                According to the JIRA 
https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but 
It seems this not fixed yet.
+                 */
+                throw new RuntimeException("Delete by Query is not suppoted 
for Key Ranges.");
+              } else {
+                query = ((Update.Where) 
query).and(QueryBuilder.gte(columnKeys[i], "?"));
+              }
+              objects.add(cassandraValues.get(i));
+            }
+          }
+        } else {
+          primaryKey = getPKey(mapping.getFieldList());
+          if (select instanceof Select) {
+            query = ((Select) select).where(QueryBuilder.gte(primaryKey, "?"));
+          } else if (select instanceof Delete) {
+                           /*
+                According to the JIRA 
https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but 
It seems this not fixed yet.
+                 */
+            throw new RuntimeException("Delete by Query is not suppoted for 
Key Ranges.");
+          } else {
+            query = ((Update.Assignments) 
select).where(QueryBuilder.gte(primaryKey, "?"));
+          }
+          objects.add(startKey);
+          isWhereNeeded = false;
+        }
+      }
+      if (endKey != null) {
+        if (mapping.getCassandraKey() != null) {
+          ArrayList<String> cassandraKeys = new ArrayList<>();
+          ArrayList<Object> cassandraValues = new ArrayList<>();
+          AvroCassandraUtils.processKeys(mapping, endKey, cassandraKeys, 
cassandraValues);
+          String[] columnKeys = getColumnNames(mapping, cassandraKeys);
+          for (int i = 0; i < cassandraKeys.size(); i++) {
+            if (isWhereNeeded) {
+              if (select instanceof Select) {
+                query = ((Select) 
select).where(QueryBuilder.lte(columnKeys[i], "?"));
+              } else if (select instanceof Delete) {
+                               /*
+                According to the JIRA 
https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but 
It seems this not fixed yet.
+                 */
+                throw new RuntimeException("Delete by Query is not suppoted 
for Key Ranges.");
+              } else {
+                query = ((Update.Assignments) 
select).where(QueryBuilder.lte(columnKeys[i], "?"));
+              }
+              objects.add(cassandraValues.get(i));
+              isWhereNeeded = false;
+            } else {
+              if (select instanceof Select) {
+                query = ((Select.Where) 
query).and(QueryBuilder.lte(columnKeys[i], "?"));
+              } else if (select instanceof Delete) {
+                               /*
+                According to the JIRA 
https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but 
It seems this not fixed yet.
+                 */
+                throw new RuntimeException("Delete by Query is not suppoted 
for Key Ranges.");
+              } else {
+                query = ((Update.Where) 
query).and(QueryBuilder.lte(columnKeys[i], "?"));
+              }
+              objects.add(cassandraValues.get(i));
+            }
+          }
+        } else {
+          primaryKey = primaryKey != null ? primaryKey : 
getPKey(mapping.getFieldList());
+          if (isWhereNeeded) {
+            if (select instanceof Select) {
+              query = ((Select) select).where(QueryBuilder.lte(primaryKey, 
"?"));
+            } else if (select instanceof Delete) {
+                             /*
+                According to the JIRA 
https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but 
It seems this not fixed yet.
+                 */
+              throw new RuntimeException("Delete by Query is not suppoted for 
Key Ranges.");
+            } else {
+              query = ((Update.Assignments) 
select).where(QueryBuilder.lte(primaryKey, "?"));
+            }
+          } else {
+            if (select instanceof Select) {
+              query = ((Select.Where) query).and(QueryBuilder.lte(primaryKey, 
"?"));
+            } else if (select instanceof Delete) {
+              /*
+                According to the JIRA 
https://issues.apache.org/jira/browse/CASSANDRA-7651 this has been fixed, but 
It seems this not fixed yet.
+                */
+              throw new RuntimeException("Delete by Query is not suppoted for 
Key Ranges.");
+            } else {
+              query = ((Update.Where) query).and(QueryBuilder.lte(primaryKey, 
"?"));
+            }
+          }
+          objects.add(endKey);
+        }
+      }
+    }
+    if (startKey == null && endKey == null && key == null) {
+      return select.getQueryString();
+    }
+    return query != null ? query.getQueryString() : null;
+  }
+
+  private static String[] getColumnNames(CassandraMapping mapping, 
List<String> fields) {
+    ArrayList<String> columnNames = new ArrayList<>();
+    for (String field : fields) {
+      Field fieldBean = mapping.getFieldFromFieldName(field);
+      CassandraKey cassandraKey = mapping.getCassandraKey();
+      Field keyBean = null;
+      if (cassandraKey != null) {
+        keyBean = cassandraKey.getFieldFromFieldName(field);
+      }
+      if (fieldBean != null) {
+        columnNames.add(fieldBean.getColumnName());
+      } else if (keyBean != null) {
+        columnNames.add(keyBean.getColumnName());
+      } else {
+        LOG.warn("{} field is ignored, couldn't find relevant field in the 
persistent mapping", field);
+      }
+    }
+    return columnNames.toArray(new String[0]);
+  }
+
+  private static String getPKey(List<Field> fields) {
+    for (Field field : fields) {
+      boolean isPrimaryKey = 
Boolean.parseBoolean(field.getProperty("primarykey"));
+      if (isPrimaryKey) {
+        return field.getColumnName();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * This method returns CQL Qeury for DeleteByQuery method.
+   * refer: 
http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlDelete.html
+   *
+   * @param mapping        Cassandra Mapping {@link CassandraMapping}
+   * @param cassandraQuery Cassandra Query {@link CassandraQuery}
+   * @param objects        field values
+   * @return CQL Query
+   */
+  static String getDeleteByQuery(CassandraMapping mapping, Query 
cassandraQuery, List<Object> objects) {
+    String[] columns = null;
+    if (cassandraQuery.getFields() != null) {
+      columns = getColumnNames(mapping, 
Arrays.asList(cassandraQuery.getFields()));
+    }
+    Delete delete;
+    if (columns != null) {
+      delete = 
QueryBuilder.delete(columns).from(mapping.getKeySpace().getName(), 
mapping.getCoreName());
+    } else {
+      delete = QueryBuilder.delete().from(mapping.getKeySpace().getName(), 
mapping.getCoreName());
+    }
+    return processQuery(cassandraQuery, delete, mapping, objects);
+  }
+
+  /**
+   * This method returns the CQL Query for UpdateByQuery method
+   * refer : 
http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlUpdate.html
+   *
+   * @param mapping        Cassandra mapping {@link CassandraMapping}
+   * @param cassandraQuery Cassandra Query {@link CassandraQuery}
+   * @param objects        field Objects list
+   * @return CQL Query
+   */
+  static String getUpdateByQueryForAvro(CassandraMapping mapping, Query 
cassandraQuery, List<Object> objects, Schema schema) {
+    Update update = QueryBuilder.update(mapping.getKeySpace().getName(), 
mapping.getCoreName());
+    Update.Assignments updateAssignments = null;
+    if (cassandraQuery instanceof CassandraQuery) {
+      String[] columnNames = getColumnNames(mapping, 
Arrays.asList(cassandraQuery.getFields()));
+        for (String column : columnNames) {
+          updateAssignments = update.with(QueryBuilder.set(column, "?"));
+          Field field = mapping.getFieldFromColumnName(column);
+          Object value = ((CassandraQuery) 
cassandraQuery).getUpdateFieldValue(field.getFieldName());
+          try {
+            Schema schemaField = 
schema.getField(field.getFieldName()).schema();
+            
objects.add(AvroCassandraUtils.getFieldValueFromAvroBean(schemaField, 
schemaField.getType(), value, field));
+          } catch (NullPointerException e) {
+            throw new RuntimeException(field + " field couldn't find in the 
class " + mapping.getPersistentClass() + ".");
+          }
+        }
+    } else {
+      throw new RuntimeException("Please use Cassandra Query object to invoke, 
UpdateByQuery method.");
+    }
+    return processQuery(cassandraQuery, updateAssignments, mapping, objects);
+  }
+
+
+  /**
+   * This method returns the CQL Query for UpdateByQuery method
+   * refer : 
http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlUpdate.html
+   *
+   * @param mapping        Cassandra mapping {@link CassandraMapping}
+   * @param cassandraQuery Cassandra Query {@link CassandraQuery}
+   * @param objects        field Objects list
+   * @return CQL Query
+   */
+  static String getUpdateByQueryForNative(CassandraMapping mapping, Query 
cassandraQuery, List<Object> objects) {
+    Update update = QueryBuilder.update(mapping.getKeySpace().getName(), 
mapping.getCoreName());
+    Update.Assignments updateAssignments = null;
+    if (cassandraQuery instanceof CassandraQuery) {
+      String[] columnNames = getColumnNames(mapping, 
Arrays.asList(cassandraQuery.getFields()));
+        for (String column : columnNames) {
+          updateAssignments = update.with(QueryBuilder.set(column, "?"));
+          objects.add(((CassandraQuery) 
cassandraQuery).getUpdateFieldValue(mapping.getFieldFromColumnName(column).getFieldName()));
+        }
+    } else {
+      throw new RuntimeException("Please use Cassandra Query object to invoke, 
UpdateByQuery method.");
+    }
+    return processQuery(cassandraQuery, updateAssignments, mapping, objects);
+  }
+
+
+  private static void populateFieldsToQuery(Schema schema, StringBuilder 
builder) throws Exception {
+    switch (schema.getType()) {
+      case INT:
+        builder.append("int");
+        break;
+      case MAP:
+        builder.append("map<text,");
+        populateFieldsToQuery(schema.getValueType(), builder);
+        builder.append(">");
+        break;
+      case ARRAY:
+        builder.append("list<");
+        populateFieldsToQuery(schema.getElementType(), builder);
+        builder.append(">");
+        break;
+      case LONG:
+        builder.append("bigint");
+        break;
+      case FLOAT:
+        builder.append("float");
+        break;
+      case DOUBLE:
+        builder.append("double");
+        break;
+      case BOOLEAN:
+        builder.append("boolean");
+        break;
+      case BYTES:
+        builder.append("blob");
+        break;
+      case RECORD:
+        builder.append("frozen<").append(schema.getName()).append(">");
+        break;
+      case STRING:
+      case FIXED:
+      case ENUM:
+        builder.append("text");
+        break;
+      case UNION:
+        for (Schema unionElementSchema : schema.getTypes()) {
+          if (unionElementSchema.getType().equals(Schema.Type.RECORD)) {
+            String recordName = unionElementSchema.getName();
+            if (!builder.toString().contains(recordName)) {
+              builder.append("frozen<").append(recordName).append(">");
+            } else {
+              LOG.warn("Same Field Type can't be mapped recursively. This is 
not supported with Cassandra UDT types, Please use byte dataType for recursive 
mapping.");
+              throw new Exception("Same Field Type has mapped recursively");
+            }
+            break;
+          } else if (!unionElementSchema.getType().equals(Schema.Type.NULL)) {
+            populateFieldsToQuery(unionElementSchema, builder);
+            break;
+          }
+        }
+        break;
+    }
+  }
+
+  static void processRecord(Schema recordSchema, StringBuilder stringBuilder) {
+    boolean isCommaNeeded = false;
+    for (Schema.Field field : recordSchema.getFields()) {
+      if (isCommaNeeded) {
+        stringBuilder.append(", ");
+      }
+      String fieldName = field.name();
+      stringBuilder.append(fieldName).append(" ");
+      try {
+        populateFieldsToQuery(field.schema(), stringBuilder);
+        isCommaNeeded = true;
+      } catch (Exception e) {
+        int i = stringBuilder.indexOf(fieldName);
+        if (i != -1) {
+          stringBuilder.delete(i, i + fieldName.length());
+          isCommaNeeded = false;
+        }
+      }
+    }
+  }
+
+  static String getCreateUDTTypeForNative(CassandraMapping mapping, Class 
persistentClass, String udtType, String fieldName) throws NoSuchFieldException {
+    StringBuilder stringBuffer = new StringBuilder();
+    Class udtClass = persistentClass.getDeclaredField(fieldName).getType();
+    UDT annotation = (UDT) udtClass.getAnnotation(UDT.class);
+    if (annotation != null) {
+      stringBuffer.append("CREATE TYPE IF NOT EXISTS 
").append(mapping.getKeySpace().getName()).append(".").append(udtType).append(" 
(");
+      boolean isCommaNeeded = false;
+      for (java.lang.reflect.Field udtField : udtClass.getDeclaredFields()) {
+        com.datastax.driver.mapping.annotations.Field fieldAnnotation = 
udtField.getDeclaredAnnotation(com.datastax.driver.mapping.annotations.Field.class);
+        if (fieldAnnotation != null) {
+          if (isCommaNeeded) {
+            stringBuffer.append(", ");
+          }
+          if (!fieldAnnotation.name().isEmpty()) {
+            stringBuffer.append(fieldAnnotation.name()).append(" ");
+          } else {
+            stringBuffer.append(udtField.getName()).append(" ");
+          }
+          stringBuffer.append(dataType(udtField, null));
+          isCommaNeeded = true;
+        }
+      }
+      stringBuffer.append(")");
+    } else {
+      throw new RuntimeException("");
+    }
+    return stringBuffer.toString();
+  }
+
+  static String getCreateUDTTypeForAvro(CassandraMapping mapping, String 
udtType, Schema fieldSchema) {
+    StringBuilder stringBuffer = new StringBuilder();
+    stringBuffer.append("CREATE TYPE IF NOT EXISTS 
").append(mapping.getKeySpace().getName()).append(".").append(udtType).append(" 
(");
+    CassandraQueryFactory.processRecord(fieldSchema, stringBuffer);
+    stringBuffer.append(")");
+    return stringBuffer.toString();
+  }
+
+  private static String dataType(java.lang.reflect.Field field, Type 
fieldType) {
+    String type;
+    if (field != null) {
+      type = field.getType().getName();
+    } else {
+      type = fieldType.getTypeName();
+    }
+    String dataType;
+    String s = type;
+    if (s.equals("java.lang.String") || s.equals("java.lang.CharSequence")) {
+      dataType = "text";
+    } else if (s.equals("int") || s.equals("java.lang.Integer")) {
+      dataType = "int";
+    } else if (s.equals("double") || s.equals("java.lang.Double")) {
+      dataType = "double";
+    } else if (s.equals("float") || s.equals("java.lang.Float")) {
+      dataType = "float";
+    } else if (s.equals("boolean") || s.equals("java.lang.Boolean")) {
+      dataType = "boolean";
+    } else if (s.equals("java.util.UUID")) {
+      dataType = "uuid";
+    } else if (s.equals("java.lang.Long")) {
+      dataType = "bigint";
+    } else if (s.equals("java.math.BigDecimal")) {
+      dataType = "decimal";
+    } else if (s.equals("java.net.InetAddress")) {
+      dataType = "inet";
+    } else if (s.equals("java.math.BigInteger")) {
+      dataType = "varint";
+    } else if (s.equals("java.nio.ByteBuffer")) {
+      dataType = "blob";
+    } else if (s.contains("Map")) {
+      ParameterizedType mapType;
+      if (field != null) {
+        mapType = (ParameterizedType) field.getGenericType();
+      } else {
+        mapType = (ParameterizedType) fieldType;
+      }
+      Type value1 = mapType.getActualTypeArguments()[0];
+      Type value2 = mapType.getActualTypeArguments()[1];
+      dataType = "map<" + dataType(null, value1) + "," + dataType(null, 
value2) + ">";
+    } else if (s.contains("List")) {
+      ParameterizedType listType;
+      if (field != null) {
+        listType = (ParameterizedType) field.getGenericType();
+      } else {
+        listType = (ParameterizedType) fieldType;
+      }
+      Type value = listType.getActualTypeArguments()[0];
+      dataType = "list<" + dataType(null, value) + ">";
+    } else if (s.contains("Set")) {
+      ParameterizedType setType;
+      if (field != null) {
+        setType = (ParameterizedType) field.getGenericType();
+      } else {
+        setType = (ParameterizedType) fieldType;
+      }
+      Type value = setType.getActualTypeArguments()[0];
+      dataType = "set<" + dataType(null, value) + ">";
+    } else {
+      throw new RuntimeException("Unsupported Cassandra DataType");
+    }
+    return dataType;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
new file mode 100644
index 0000000..208428e
--- /dev/null
+++ 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
@@ -0,0 +1,225 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.gora.cassandra.serializers;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.TableMetadata;
+import org.apache.gora.cassandra.bean.Field;
+import org.apache.gora.cassandra.store.CassandraClient;
+import org.apache.gora.cassandra.store.CassandraMapping;
+import org.apache.gora.cassandra.store.CassandraStore;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.store.DataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * This is the abstract Cassandra Serializer class.
+ */
+public abstract class CassandraSerializer<K, T extends Persistent> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CassandraStore.class);
+
+  protected Class<K> keyClass;
+
+  protected Class<T> persistentClass;
+
+  protected CassandraMapping mapping;
+
+  protected CassandraClient client;
+
+  protected String readConsistencyLevel;
+
+  protected String writeConsistencyLevel;
+
+  protected Map<String, String> userDefineTypeMaps;
+
+  CassandraSerializer(CassandraClient cc, Class<K> keyClass, Class<T> 
persistantClass, CassandraMapping mapping) {
+    this.keyClass = keyClass;
+    this.persistentClass = persistantClass;
+    this.client = cc;
+    this.mapping = mapping;
+    this.readConsistencyLevel = client.getReadConsistencyLevel();
+    this.writeConsistencyLevel = client.getWriteConsistencyLevel();
+  }
+
+  /**
+   * This method returns the Cassandra Serializer according the Cassandra 
serializer property.
+   *
+   * @param cc        Cassandra Client
+   * @param type      Serialization type
+   * @param dataStore Cassandra DataStore
+   * @param mapping   Cassandra Mapping
+   * @param <K>       key class
+   * @param <T>       persistent class
+   * @return Serializer
+   */
+  public static <K, T extends Persistent> CassandraSerializer 
getSerializer(CassandraClient cc, String type, final DataStore<K, T> dataStore, 
CassandraMapping mapping) {
+    CassandraStore.SerializerType serType = type == null || type.isEmpty() ? 
CassandraStore.SerializerType.NATIVE : 
CassandraStore.SerializerType.valueOf(type.toUpperCase(Locale.ENGLISH));
+    CassandraSerializer serializer;
+    switch (serType) {
+      case AVRO:
+        serializer = new AvroSerializer(cc, dataStore, mapping);
+        break;
+      case NATIVE:
+      default:
+        serializer = new NativeSerializer(cc, dataStore.getKeyClass(), 
dataStore.getPersistentClass(), mapping);
+    }
+    return serializer;
+  }
+
+  /**
+   * In this method persistent class been analyzed to find inner records with 
UDT type, this method should call in every Cassandra serialization Constructor.
+   *
+   * @throws Exception
+   */
+  protected abstract void analyzePersistent() throws Exception;
+
+
+  public void createSchema() {
+    LOG.debug("creating Cassandra keyspace {}", 
mapping.getKeySpace().getName());
+    
this.client.getSession().execute(CassandraQueryFactory.getCreateKeySpaceQuery(mapping));
+    for (Map.Entry udtType : userDefineTypeMaps.entrySet()) {
+      LOG.debug("creating Cassandra User Define Type {}", udtType.getKey());
+      this.client.getSession().execute((String) udtType.getValue());
+    }
+    LOG.debug("creating Cassandra column family / table {}", 
mapping.getCoreName());
+    
this.client.getSession().execute(CassandraQueryFactory.getCreateTableQuery(mapping));
+  }
+
+  public void deleteSchema() {
+    LOG.debug("dropping Cassandra table {}", mapping.getCoreName());
+    
this.client.getSession().execute(CassandraQueryFactory.getDropTableQuery(mapping));
+    LOG.debug("dropping Cassandra keyspace {}", 
mapping.getKeySpace().getName());
+    
this.client.getSession().execute(CassandraQueryFactory.getDropKeySpaceQuery(mapping));
+  }
+
+  public void close() {
+    this.client.close();
+  }
+
+  public void truncateSchema() {
+    LOG.debug("truncating Cassandra table {}", mapping.getCoreName());
+    
this.client.getSession().execute(CassandraQueryFactory.getTruncateTableQuery(mapping));
+  }
+
+  public boolean schemaExists() {
+    KeyspaceMetadata keyspace = 
this.client.getCluster().getMetadata().getKeyspace(mapping.getKeySpace().getName());
+    if (keyspace != null) {
+      TableMetadata table = keyspace.getTable(mapping.getCoreName());
+      return table != null;
+    } else {
+      return false;
+    }
+  }
+
+  protected String[] getFields() {
+    List<String> fields = new ArrayList<>();
+    for (Field field : mapping.getFieldList()) {
+      fields.add(field.getFieldName());
+    }
+    return fields.toArray(new String[0]);
+  }
+
+  /**
+   * Inserts the persistent Object
+   *
+   * @param key   key value
+   * @param value persistent value
+   */
+  public abstract void put(K key, T value);
+
+  /**
+   * Retrieves the persistent value according to the key
+   *
+   * @param key key value
+   * @return persistent value
+   */
+  public abstract T get(K key);
+
+  /**
+   * Deletes persistent value according to the key
+   *
+   * @param key key value
+   * @return isDeleted
+   */
+  public abstract boolean delete(K key);
+
+  /**
+   * Retrieves the persistent value according to the key and fields
+   *
+   * @param key    key value
+   * @param fields fields
+   * @return persistent value
+   */
+  public abstract T get(K key, String[] fields);
+
+  /**
+   * Executes the given query and returns the results.
+   *
+   * @param dataStore Cassandra data store
+   * @param query     Cassandra Query
+   * @return Cassandra Result
+   */
+  public abstract Result<K, T> execute(DataStore<K, T> dataStore, Query<K, T> 
query);
+
+  /**
+   * Update the persistent objects
+   *
+   * @param query Cassandra Query
+   * @return isUpdated
+   */
+  public abstract boolean updateByQuery(Query query);
+
+  public long deleteByQuery(Query query) {
+    List<Object> objectArrayList = new ArrayList<>();
+    if (query.getKey() == null && query.getEndKey() == null && 
query.getStartKey() == null) {
+      if (query.getFields() == null) {
+        
client.getSession().execute(CassandraQueryFactory.getTruncateTableQuery(mapping));
+      } else {
+        LOG.error("Delete by Query is not supported for the Queries which 
didn't specify Query keys with fields.");
+      }
+    } else {
+      String cqlQuery = CassandraQueryFactory.getDeleteByQuery(mapping, query, 
objectArrayList);
+      ResultSet results;
+      SimpleStatement statement;
+      if (objectArrayList.size() == 0) {
+        statement = new SimpleStatement(cqlQuery);
+      } else {
+        statement = new SimpleStatement(cqlQuery, objectArrayList.toArray());
+      }
+      if (writeConsistencyLevel != null) {
+        
statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel));
+      }
+      results = client.getSession().execute(statement);
+      LOG.debug("Delete by Query was applied : " + results.wasApplied());
+    }
+    LOG.info("Delete By Query method doesn't return the deleted element 
count.");
+    return 0;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
new file mode 100644
index 0000000..bf28ee0
--- /dev/null
+++ 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
@@ -0,0 +1,243 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.gora.cassandra.serializers;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.mapping.Mapper;
+import com.datastax.driver.mapping.MappingManager;
+import com.datastax.driver.mapping.Result;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.gora.cassandra.bean.Field;
+import org.apache.gora.cassandra.query.CassandraResultSet;
+import org.apache.gora.cassandra.store.CassandraClient;
+import org.apache.gora.cassandra.store.CassandraMapping;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.Query;
+import org.apache.gora.store.DataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * This Class contains the operation relates to Native Serialization.
+ */
+class NativeSerializer<K, T extends Persistent> extends CassandraSerializer {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(NativeSerializer.class);
+
+  private Mapper<T> mapper;
+
+  NativeSerializer(CassandraClient cassandraClient, Class<K> keyClass, 
Class<T> persistentClass, CassandraMapping mapping) {
+    super(cassandraClient, keyClass, persistentClass, mapping);
+    try {
+      analyzePersistent();
+    } catch (Exception e) {
+      throw new RuntimeException("Error occurred while analyzing the 
persistent class, :" + e.getMessage());
+    }
+    this.createSchema();
+    MappingManager mappingManager = new 
MappingManager(cassandraClient.getSession());
+    mapper = mappingManager.mapper(persistentClass);
+    if (cassandraClient.getWriteConsistencyLevel() != null) {
+      
mapper.setDefaultDeleteOptions(Mapper.Option.consistencyLevel(ConsistencyLevel.valueOf(cassandraClient.getWriteConsistencyLevel())));
+      
mapper.setDefaultSaveOptions(Mapper.Option.consistencyLevel(ConsistencyLevel.valueOf(cassandraClient.getWriteConsistencyLevel())));
+    }
+    if (cassandraClient.getReadConsistencyLevel() != null) {
+      
mapper.setDefaultGetOptions(Mapper.Option.consistencyLevel(ConsistencyLevel.valueOf(cassandraClient.getReadConsistencyLevel())));
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param key
+   * @param value
+   */
+  @Override
+  public void put(Object key, Persistent value) {
+    LOG.debug("Object is saved with key : {} and value : {}", key, value);
+    mapper.save((T) value);
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param key
+   * @return
+   */
+  @Override
+  public T get(Object key) {
+    T object = mapper.get(key);
+    if (object != null) {
+      LOG.debug("Object is found for key : {}", key);
+    } else {
+      LOG.debug("Object is not found for key : {}", key);
+    }
+    return object;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param key
+   * @return
+   */
+  @Override
+  public boolean delete(Object key) {
+    LOG.debug("Object is deleted for key : {}", key);
+    mapper.delete(key);
+    return true;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param key
+   * @param fields
+   * @return
+   */
+  @Override
+  public Persistent get(Object key, String[] fields) {
+    if (fields == null) {
+      fields = getFields();
+    }
+    String cqlQuery = 
CassandraQueryFactory.getSelectObjectWithFieldsQuery(mapping, fields);
+    SimpleStatement statement = new SimpleStatement(cqlQuery, key);
+    if (readConsistencyLevel != null) {
+      
statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel));
+    }
+    ResultSet results = client.getSession().execute(statement);
+    Result<T> objects = mapper.map(results);
+    List<T> objectList = objects.all();
+    if (objectList != null) {
+      LOG.debug("Object is found for key : {}", key);
+      return objectList.get(0);
+    }
+    LOG.debug("Object is not found for key : {}", key);
+    return null;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @throws Exception
+   */
+  @Override
+  protected void analyzePersistent() throws Exception {
+    userDefineTypeMaps = new HashMap<>();
+    for (Field field : mapping.getFieldList()) {
+      String fieldType = field.getType();
+      if (fieldType.contains("frozen")) {
+        String udtType = fieldType.substring(fieldType.indexOf("<") + 1, 
fieldType.indexOf(">"));
+        String createQuery = 
CassandraQueryFactory.getCreateUDTTypeForNative(mapping, persistentClass, 
udtType, field.getFieldName());
+        userDefineTypeMaps.put(udtType, createQuery);
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param query
+   * @return
+   */
+  @Override
+  public boolean updateByQuery(Query query) {
+    List<Object> objectArrayList = new ArrayList<>();
+    String cqlQuery = CassandraQueryFactory.getUpdateByQueryForNative(mapping, 
query, objectArrayList);
+    ResultSet results;
+    SimpleStatement statement;
+    if (objectArrayList.size() == 0) {
+      statement = new SimpleStatement(cqlQuery);
+    } else {
+      statement = new SimpleStatement(cqlQuery, objectArrayList.toArray());
+    }
+    if (writeConsistencyLevel != null) {
+      
statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel));
+    }
+    results = client.getSession().execute(statement);
+    return results.wasApplied();
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param dataStore
+   * @param query
+   * @return
+   */
+  @Override
+  public org.apache.gora.query.Result execute(DataStore dataStore, Query 
query) {
+    List<Object> objectArrayList = new ArrayList<>();
+    String[] fields = query.getFields();
+    if (fields != null) {
+      fields = (String[]) ArrayUtils.addAll(fields, mapping.getAllKeys());
+    } else {
+      fields = mapping.getAllFieldsIncludingKeys();
+    }
+    CassandraResultSet<K, T> cassandraResult = new 
CassandraResultSet<>(dataStore, query);
+    String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, 
objectArrayList, fields);
+    ResultSet results;
+    if (objectArrayList.size() == 0) {
+      results = client.getSession().execute(cqlQuery);
+    } else {
+      results = client.getSession().execute(cqlQuery, 
objectArrayList.toArray());
+    }
+    Result<T> objects = mapper.map(results);
+    Iterator iterator = objects.iterator();
+    while (iterator.hasNext()) {
+      T result = (T) iterator.next();
+      K key = getKey(result);
+      cassandraResult.addResultElement(key, result);
+    }
+    return cassandraResult;
+  }
+
+  private K getKey(T object) {
+    String keyField = null;
+    for (Field field : mapping.getFieldList()) {
+      boolean isPrimaryKey = 
Boolean.parseBoolean(field.getProperty("primarykey"));
+      if (isPrimaryKey) {
+        keyField = field.getFieldName();
+        break;
+      }
+    }
+    K key;
+    Method keyMethod = null;
+    try {
+      for (Method method : this.persistentClass.getMethods()) {
+        if (method.getName().equalsIgnoreCase("get" + keyField)) {
+          keyMethod = method;
+        }
+      }
+      key = (K) keyMethod.invoke(object);
+    } catch (Exception e) {
+      try {
+        key = (K) this.persistentClass.getField(keyField).get(object);
+      } catch (Exception e1) {
+        throw new RuntimeException("Field" + keyField + " is not accessible in 
" + persistentClass + " : " + e1.getMessage());
+      }
+    }
+    return key;
+  }
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/package-info.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/package-info.java
 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/package-info.java
new file mode 100644
index 0000000..ce1e3e7
--- /dev/null
+++ 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains Cassandra store related util classes for serializer.
+ */
+package org.apache.gora.cassandra.serializers;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
new file mode 100644
index 0000000..f672884
--- /dev/null
+++ 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
@@ -0,0 +1,535 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.gora.cassandra.store;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.PoolingOptions;
+import com.datastax.driver.core.ProtocolOptions;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.core.TypeCodec;
+import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.DefaultRetryPolicy;
+import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
+import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
+import com.datastax.driver.core.policies.FallthroughRetryPolicy;
+import com.datastax.driver.core.policies.LatencyAwarePolicy;
+import com.datastax.driver.core.policies.LoggingRetryPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+import com.datastax.driver.core.policies.TokenAwarePolicy;
+import com.datastax.driver.extras.codecs.arrays.DoubleArrayCodec;
+import com.datastax.driver.extras.codecs.arrays.FloatArrayCodec;
+import com.datastax.driver.extras.codecs.arrays.IntArrayCodec;
+import com.datastax.driver.extras.codecs.arrays.LongArrayCodec;
+import com.datastax.driver.extras.codecs.arrays.ObjectArrayCodec;
+import com.datastax.driver.extras.codecs.date.SimpleDateCodec;
+import com.datastax.driver.extras.codecs.date.SimpleTimestampCodec;
+import com.datastax.driver.extras.codecs.jdk8.OptionalCodec;
+import org.apache.gora.cassandra.bean.Field;
+import org.jdom.Document;
+import org.jdom.Element;
+import org.jdom.JDOMException;
+import org.jdom.input.SAXBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+
+/**
+ * This class provides the Cassandra Client Connection.
+ * Initialize the Cassandra Connection according to the Properties.
+ */
+public class CassandraClient {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CassandraClient.class);
+
+  private Cluster cluster;
+
+  private Session session;
+
+  private CassandraMapping mapping;
+
+  private String readConsistencyLevel;
+
+  private String writeConsistencyLevel;
+
+  public Session getSession() {
+    return session;
+  }
+
+  public Cluster getCluster() {
+    return cluster;
+  }
+
+  void initialize(Properties properties, CassandraMapping mapping) throws 
Exception {
+    Cluster.Builder builder = Cluster.builder();
+    List<String> codecs = readCustomCodec(properties);
+    builder = populateSettings(builder, properties);
+    this.mapping = mapping;
+    this.cluster = builder.build();
+    if (codecs != null) {
+      registerCustomCodecs(codecs);
+    }
+    readConsistencyLevel = 
properties.getProperty(CassandraStoreParameters.READ_CONSISTENCY_LEVEL);
+    writeConsistencyLevel = 
properties.getProperty(CassandraStoreParameters.WRITE_CONSISTENCY_LEVEL);
+    registerOptionalCodecs();
+    this.session = this.cluster.connect();
+  }
+
+  private void registerOptionalCodecs() {
+    // Optional Codecs for natives
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
OptionalCodec<>(TypeCodec.ascii()));
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
OptionalCodec<>(TypeCodec.bigint()));
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
OptionalCodec<>(TypeCodec.blob()));
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
OptionalCodec<>(TypeCodec.cboolean()));
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
OptionalCodec<>(TypeCodec.cdouble()));
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
OptionalCodec<>(TypeCodec.cfloat()));
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
OptionalCodec<>(TypeCodec.cint()));
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
OptionalCodec<>(TypeCodec.counter()));
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
OptionalCodec<>(TypeCodec.date()));
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
OptionalCodec<>(TypeCodec.decimal()));
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
OptionalCodec<>(TypeCodec.inet()));
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
OptionalCodec<>(TypeCodec.smallInt()));
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
OptionalCodec<>(TypeCodec.time()));
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
OptionalCodec<>(TypeCodec.timestamp()));
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
OptionalCodec<>(TypeCodec.timeUUID()));
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
OptionalCodec<>(TypeCodec.tinyInt()));
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
OptionalCodec<>(TypeCodec.varint()));
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
OptionalCodec<>(TypeCodec.varchar()));
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
OptionalCodec<>(TypeCodec.uuid()));
+    // Optional Array Codecs
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
IntArrayCodec());
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
DoubleArrayCodec());
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
FloatArrayCodec());
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
LongArrayCodec());
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
ObjectArrayCodec<>(
+            DataType.list(DataType.varchar()),
+            String[].class,
+            TypeCodec.varchar()));
+    // Optional Time Codecs
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
SimpleDateCodec());
+    this.cluster.getConfiguration().getCodecRegistry().register(new 
SimpleTimestampCodec());
+
+    for (Field field : this.mapping.getFieldList()) {
+      String columnType = field.getType().toLowerCase(Locale.ENGLISH);
+      
//http://docs.datastax.com/en/cql/3.3/cql/cql_reference/cql_data_types_c.html
+      if (columnType.contains("list")) {
+        columnType = columnType.substring(columnType.indexOf("<") + 1, 
columnType.indexOf(">"));
+        this.cluster.getConfiguration().getCodecRegistry().register(new 
OptionalCodec<>(TypeCodec.list(getTypeCodec(columnType))));
+      } else if (columnType.contains("set")) {
+        columnType = columnType.substring(columnType.indexOf("<") + 1, 
columnType.indexOf(">"));
+        this.cluster.getConfiguration().getCodecRegistry().register(new 
OptionalCodec<>(TypeCodec.set(getTypeCodec(columnType))));
+      } else if (columnType.contains("map")) {
+        String[] columnTypes = columnType.substring(columnType.indexOf("<") + 
1, columnType.indexOf(">")).split(",");
+        this.cluster.getConfiguration().getCodecRegistry().register(new 
OptionalCodec<>(TypeCodec.map(TypeCodec.set(getTypeCodec(columnTypes[0])), 
TypeCodec.set(getTypeCodec(columnTypes[1])))));
+      }
+    }
+  }
+
+  private TypeCodec getTypeCodec(String columnType) {
+    TypeCodec typeCodec;
+    switch (columnType) {
+      case "ascii":
+        typeCodec = TypeCodec.ascii();
+        break;
+      case "bigint":
+        typeCodec = TypeCodec.bigint();
+        break;
+      case "blob":
+        typeCodec = TypeCodec.blob();
+        break;
+      case "boolean":
+        typeCodec = TypeCodec.cboolean();
+        break;
+      case "counter":
+        typeCodec = TypeCodec.counter();
+        break;
+      case "date":
+        typeCodec = TypeCodec.date();
+        break;
+      case "decimal":
+        typeCodec = TypeCodec.decimal();
+        break;
+      case "double":
+        typeCodec = TypeCodec.cdouble();
+        break;
+      case "float":
+        typeCodec = TypeCodec.cfloat();
+        break;
+      case "inet":
+        typeCodec = TypeCodec.inet();
+        break;
+      case "int":
+        typeCodec = TypeCodec.cint();
+        break;
+      case "smallint":
+        typeCodec = TypeCodec.smallInt();
+        break;
+      case "time":
+        typeCodec = TypeCodec.time();
+        break;
+      case "timestamp":
+        typeCodec = TypeCodec.timestamp();
+        break;
+      case "timeuuid":
+        typeCodec = TypeCodec.timeUUID();
+        break;
+      case "tinyint":
+        typeCodec = TypeCodec.tinyInt();
+        break;
+      case "uuid":
+        typeCodec = TypeCodec.uuid();
+        break;
+      case "varint":
+        typeCodec = TypeCodec.varint();
+        break;
+      case "varchar":
+      case "text":
+        typeCodec = TypeCodec.varchar();
+        break;
+      default:
+        LOG.error("Unsupported Cassandra datatype: {} ", columnType);
+        throw new RuntimeException("Unsupported Cassandra datatype: " + 
columnType);
+    }
+    return typeCodec;
+  }
+
+  private Cluster.Builder populateSettings(Cluster.Builder builder, Properties 
properties) {
+    String serversParam = 
properties.getProperty(CassandraStoreParameters.CASSANDRA_SERVERS);
+    String[] servers = serversParam.split(",");
+    for (String server : servers) {
+      builder = builder.addContactPoint(server);
+    }
+    String portProp = properties.getProperty(CassandraStoreParameters.PORT);
+    if (portProp != null) {
+      builder = builder.withPort(Integer.parseInt(portProp));
+    }
+    String clusterNameProp = 
properties.getProperty(CassandraStoreParameters.CLUSTER_NAME);
+    if (clusterNameProp != null) {
+      builder = builder.withClusterName(clusterNameProp);
+    }
+    String compressionProp = 
properties.getProperty(CassandraStoreParameters.COMPRESSION);
+    if (compressionProp != null) {
+      builder = 
builder.withCompression(ProtocolOptions.Compression.valueOf(compressionProp));
+    }
+    builder = this.populateCredentials(properties, builder);
+    builder = this.populateLoadBalancingProp(properties, builder);
+    String enableJMXProp = 
properties.getProperty(CassandraStoreParameters.ENABLE_JMX_REPORTING);
+    if (!Boolean.parseBoolean(enableJMXProp)) {
+      builder = builder.withoutJMXReporting();
+    }
+    String enableMetricsProp = 
properties.getProperty(CassandraStoreParameters.ENABLE_METRICS);
+    if (!Boolean.parseBoolean(enableMetricsProp)) {
+      builder = builder.withoutMetrics();
+    }
+    builder = this.populatePoolingSettings(properties, builder);
+    String versionProp = 
properties.getProperty(CassandraStoreParameters.PROTOCOL_VERSION);
+    if (versionProp != null) {
+      builder = 
builder.withProtocolVersion(ProtocolVersion.fromInt(Integer.parseInt(versionProp)));
+    }
+    builder = this.populateQueryOptions(properties, builder);
+    builder = this.populateReconnectPolicy(properties, builder);
+    builder = this.populateRetrytPolicy(properties, builder);
+    builder = this.populateSocketOptions(properties, builder);
+    String enableSSLProp = 
properties.getProperty(CassandraStoreParameters.ENABLE_SSL);
+    if (enableSSLProp != null) {
+      if (Boolean.parseBoolean(enableSSLProp)) {
+        builder = builder.withSSL();
+      }
+    }
+    return builder;
+  }
+
+  private Cluster.Builder populateLoadBalancingProp(Properties properties, 
Cluster.Builder builder) {
+    String loadBalancingProp = 
properties.getProperty(CassandraStoreParameters.LOAD_BALANCING_POLICY);
+    if (loadBalancingProp != null) {
+      switch (loadBalancingProp) {
+        case "LatencyAwareRoundRobinPolicy":
+          builder = 
builder.withLoadBalancingPolicy(LatencyAwarePolicy.builder(new 
RoundRobinPolicy()).build());
+          break;
+        case "RoundRobinPolicy":
+          builder = builder.withLoadBalancingPolicy(new RoundRobinPolicy());
+          break;
+        case "DCAwareRoundRobinPolicy": {
+          String dataCenter = 
properties.getProperty(CassandraStoreParameters.DATA_CENTER);
+          boolean allowRemoteDCsForLocalConsistencyLevel = 
Boolean.parseBoolean(
+                  
properties.getProperty(CassandraStoreParameters.ALLOW_REMOTE_DCS_FOR_LOCAL_CONSISTENCY_LEVEL));
+          if (dataCenter != null && !dataCenter.isEmpty()) {
+            if (allowRemoteDCsForLocalConsistencyLevel) {
+              builder = builder.withLoadBalancingPolicy(
+                      DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter)
+                              
.allowRemoteDCsForLocalConsistencyLevel().build());
+            } else {
+              builder = builder.withLoadBalancingPolicy(
+                      
DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter).build());
+            }
+          } else {
+            if (allowRemoteDCsForLocalConsistencyLevel) {
+              builder = builder.withLoadBalancingPolicy(
+                      
(DCAwareRoundRobinPolicy.builder().allowRemoteDCsForLocalConsistencyLevel().build()));
+            } else {
+              builder = 
builder.withLoadBalancingPolicy((DCAwareRoundRobinPolicy.builder().build()));
+            }
+          }
+          break;
+        }
+        case "TokenAwareRoundRobinPolicy":
+          builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy(new 
RoundRobinPolicy()));
+          break;
+        case "TokenAwareDCAwareRoundRobinPolicy": {
+          String dataCenter = 
properties.getProperty(CassandraStoreParameters.DATA_CENTER);
+          boolean allowRemoteDCsForLocalConsistencyLevel = 
Boolean.parseBoolean(
+                  
properties.getProperty(CassandraStoreParameters.ALLOW_REMOTE_DCS_FOR_LOCAL_CONSISTENCY_LEVEL));
+          if (dataCenter != null && !dataCenter.isEmpty()) {
+            if (allowRemoteDCsForLocalConsistencyLevel) {
+              builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy(
+                      DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter)
+                              
.allowRemoteDCsForLocalConsistencyLevel().build()));
+            } else {
+              builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy(
+                      
DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter).build()));
+            }
+          } else {
+            if (allowRemoteDCsForLocalConsistencyLevel) {
+              builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy(
+                      
DCAwareRoundRobinPolicy.builder().allowRemoteDCsForLocalConsistencyLevel().build()));
+            } else {
+              builder = builder.withLoadBalancingPolicy(
+                      new 
TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build()));
+            }
+          }
+          break;
+        }
+        default:
+          LOG.error("Unsupported Cassandra load balancing  policy: {} ", 
loadBalancingProp);
+          break;
+      }
+    }
+    return builder;
+  }
+
+  private Cluster.Builder populateCredentials(Properties properties, 
Cluster.Builder builder) {
+    String usernameProp = 
properties.getProperty(CassandraStoreParameters.USERNAME);
+    String passwordProp = 
properties.getProperty(CassandraStoreParameters.PASSWORD);
+    if (usernameProp != null) {
+      builder = builder.withCredentials(usernameProp, passwordProp);
+    }
+    return builder;
+  }
+
+  private Cluster.Builder populatePoolingSettings(Properties properties, 
Cluster.Builder builder) {
+    String localCoreConnectionsPerHost = 
properties.getProperty(CassandraStoreParameters.LOCAL_CORE_CONNECTIONS_PER_HOST);
+    String remoteCoreConnectionsPerHost = 
properties.getProperty(CassandraStoreParameters.REMOTE_CORE_CONNECTIONS_PER_HOST);
+    String localMaxConnectionsPerHost = 
properties.getProperty(CassandraStoreParameters.LOCAL_MAX_CONNECTIONS_PER_HOST);
+    String remoteMaxConnectionsPerHost = 
properties.getProperty(CassandraStoreParameters.REMOTE_MAX_CONNECTIONS_PER_HOST);
+    String localNewConnectionThreshold = 
properties.getProperty(CassandraStoreParameters.LOCAL_NEW_CONNECTION_THRESHOLD);
+    String remoteNewConnectionThreshold = 
properties.getProperty(CassandraStoreParameters.REMOTE_NEW_CONNECTION_THRESHOLD);
+    String localMaxRequestsPerConnection = 
properties.getProperty(CassandraStoreParameters.LOCAL_MAX_REQUESTS_PER_CONNECTION);
+    String remoteMaxRequestsPerConnection = 
properties.getProperty(CassandraStoreParameters.REMOTE_MAX_REQUESTS_PER_CONNECTION);
+    PoolingOptions options = new PoolingOptions();
+    if (localCoreConnectionsPerHost != null) {
+      options.setCoreConnectionsPerHost(HostDistance.LOCAL, 
Integer.parseInt(localCoreConnectionsPerHost));
+    }
+    if (remoteCoreConnectionsPerHost != null) {
+      options.setCoreConnectionsPerHost(HostDistance.REMOTE, 
Integer.parseInt(remoteCoreConnectionsPerHost));
+    }
+    if (localMaxConnectionsPerHost != null) {
+      options.setMaxConnectionsPerHost(HostDistance.LOCAL, 
Integer.parseInt(localMaxConnectionsPerHost));
+    }
+    if (remoteMaxConnectionsPerHost != null) {
+      options.setMaxConnectionsPerHost(HostDistance.REMOTE, 
Integer.parseInt(remoteMaxConnectionsPerHost));
+    }
+    if (localNewConnectionThreshold != null) {
+      options.setNewConnectionThreshold(HostDistance.LOCAL, 
Integer.parseInt(localNewConnectionThreshold));
+    }
+    if (remoteNewConnectionThreshold != null) {
+      options.setNewConnectionThreshold(HostDistance.REMOTE, 
Integer.parseInt(remoteNewConnectionThreshold));
+    }
+    if (localMaxRequestsPerConnection != null) {
+      options.setMaxRequestsPerConnection(HostDistance.LOCAL, 
Integer.parseInt(localMaxRequestsPerConnection));
+    }
+    if (remoteMaxRequestsPerConnection != null) {
+      options.setMaxRequestsPerConnection(HostDistance.REMOTE, 
Integer.parseInt(remoteMaxRequestsPerConnection));
+    }
+    builder = builder.withPoolingOptions(options);
+    return builder;
+  }
+
+  private Cluster.Builder populateQueryOptions(Properties properties, 
Cluster.Builder builder) {
+    String consistencyLevelProp = 
properties.getProperty(CassandraStoreParameters.CONSISTENCY_LEVEL);
+    String serialConsistencyLevelProp = 
properties.getProperty(CassandraStoreParameters.SERIAL_CONSISTENCY_LEVEL);
+    String fetchSize = 
properties.getProperty(CassandraStoreParameters.FETCH_SIZE);
+    QueryOptions options = new QueryOptions();
+    if (consistencyLevelProp != null) {
+      
options.setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevelProp));
+    }
+    if (serialConsistencyLevelProp != null) {
+      
options.setSerialConsistencyLevel(ConsistencyLevel.valueOf(serialConsistencyLevelProp));
+    }
+    if (fetchSize != null) {
+      options.setFetchSize(Integer.parseInt(fetchSize));
+    }
+    return builder.withQueryOptions(options);
+  }
+
+  private Cluster.Builder populateReconnectPolicy(Properties properties, 
Cluster.Builder builder) {
+    String reconnectionPolicy = 
properties.getProperty(CassandraStoreParameters.RECONNECTION_POLICY);
+    if (reconnectionPolicy != null) {
+      switch (reconnectionPolicy) {
+        case "ConstantReconnectionPolicy": {
+          String constantReconnectionPolicyDelay = 
properties.getProperty(CassandraStoreParameters.CONSTANT_RECONNECTION_POLICY_DELAY);
+          ConstantReconnectionPolicy policy = new 
ConstantReconnectionPolicy(Long.parseLong(constantReconnectionPolicyDelay));
+          builder = builder.withReconnectionPolicy(policy);
+          break;
+        }
+        case "ExponentialReconnectionPolicy": {
+          String exponentialReconnectionPolicyBaseDelay = 
properties.getProperty(CassandraStoreParameters.EXPONENTIAL_RECONNECTION_POLICY_BASE_DELAY);
+          String exponentialReconnectionPolicyMaxDelay = 
properties.getProperty(CassandraStoreParameters.EXPONENTIAL_RECONNECTION_POLICY_MAX_DELAY);
+
+          ExponentialReconnectionPolicy policy = new 
ExponentialReconnectionPolicy(Long.parseLong(exponentialReconnectionPolicyBaseDelay),
+                  Long.parseLong(exponentialReconnectionPolicyMaxDelay));
+          builder = builder.withReconnectionPolicy(policy);
+          break;
+        }
+        default:
+          LOG.error("Unsupported reconnection policy : {} ", 
reconnectionPolicy);
+      }
+    }
+    return builder;
+  }
+
+  private Cluster.Builder populateRetrytPolicy(Properties properties, 
Cluster.Builder builder) {
+    String retryPolicy = 
properties.getProperty(CassandraStoreParameters.RETRY_POLICY);
+    if (retryPolicy != null) {
+      switch (retryPolicy) {
+        case "DefaultRetryPolicy":
+          builder = builder.withRetryPolicy(DefaultRetryPolicy.INSTANCE);
+          break;
+        case "DowngradingConsistencyRetryPolicy":
+          builder = 
builder.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE);
+          break;
+        case "FallthroughRetryPolicy":
+          builder = builder.withRetryPolicy(FallthroughRetryPolicy.INSTANCE);
+          break;
+        case "LoggingDefaultRetryPolicy":
+          builder = builder.withRetryPolicy(new 
LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE));
+          break;
+        case "LoggingDowngradingConsistencyRetryPolicy":
+          builder = builder.withRetryPolicy(new 
LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE));
+          break;
+        case "LoggingFallthroughRetryPolicy":
+          builder = builder.withRetryPolicy(new 
LoggingRetryPolicy(FallthroughRetryPolicy.INSTANCE));
+          break;
+        default:
+          LOG.error("Unsupported retry policy : {} ", retryPolicy);
+          break;
+      }
+    }
+    return builder;
+  }
+
+  private Cluster.Builder populateSocketOptions(Properties properties, 
Cluster.Builder builder) {
+    String connectionTimeoutMillisProp = 
properties.getProperty(CassandraStoreParameters.CONNECTION_TIMEOUT_MILLIS);
+    String keepAliveProp = 
properties.getProperty(CassandraStoreParameters.KEEP_ALIVE);
+    String readTimeoutMillisProp = 
properties.getProperty(CassandraStoreParameters.READ_TIMEOUT_MILLIS);
+    String receiveBufferSizeProp = 
properties.getProperty(CassandraStoreParameters.RECEIVER_BUFFER_SIZE);
+    String reuseAddress = 
properties.getProperty(CassandraStoreParameters.REUSE_ADDRESS);
+    String sendBufferSize = 
properties.getProperty(CassandraStoreParameters.SEND_BUFFER_SIZE);
+    String soLinger = 
properties.getProperty(CassandraStoreParameters.SO_LINGER);
+    String tcpNoDelay = 
properties.getProperty(CassandraStoreParameters.TCP_NODELAY);
+    SocketOptions options = new SocketOptions();
+    if (connectionTimeoutMillisProp != null) {
+      
options.setConnectTimeoutMillis(Integer.parseInt(connectionTimeoutMillisProp));
+    }
+    if (keepAliveProp != null) {
+      options.setKeepAlive(Boolean.parseBoolean(keepAliveProp));
+    }
+    if (readTimeoutMillisProp != null) {
+      options.setReadTimeoutMillis(Integer.parseInt(readTimeoutMillisProp));
+    }
+    if (receiveBufferSizeProp != null) {
+      options.setReceiveBufferSize(Integer.parseInt(receiveBufferSizeProp));
+    }
+    if (reuseAddress != null) {
+      options.setReuseAddress(Boolean.parseBoolean(reuseAddress));
+    }
+    if (sendBufferSize != null) {
+      options.setSendBufferSize(Integer.parseInt(sendBufferSize));
+    }
+    if (soLinger != null) {
+      options.setSoLinger(Integer.parseInt(soLinger));
+    }
+    if (tcpNoDelay != null) {
+      options.setTcpNoDelay(Boolean.parseBoolean(tcpNoDelay));
+    }
+    return builder.withSocketOptions(options);
+  }
+
+  private List<String> readCustomCodec(Properties properties) throws 
JDOMException, IOException {
+    String filename = 
properties.getProperty(CassandraStoreParameters.CUSTOM_CODEC_FILE);
+    if (filename != null) {
+      List<String> codecs = new ArrayList<>();
+      SAXBuilder builder = new SAXBuilder();
+      Document doc = 
builder.build(getClass().getClassLoader().getResourceAsStream(filename));
+      List<Element> codecElementList = 
doc.getRootElement().getChildren("codec");
+      for (Element codec : codecElementList) {
+        codecs.add(codec.getValue());
+      }
+      return codecs;
+    }
+    return null;
+  }
+
+  /**
+   * This method returns configured read consistency level.
+   * @return read Consistency level
+   */
+  public String getReadConsistencyLevel() {
+    return readConsistencyLevel;
+  }
+
+  /**
+   * This method returns configured write consistency level.
+   * @return write Consistency level
+   */
+  public String getWriteConsistencyLevel() {
+    return writeConsistencyLevel;
+  }
+
+
+  public void close() {
+    this.session.close();
+    this.cluster.close();
+  }
+
+  private void registerCustomCodecs(List<String> codecs) throws Exception {
+    for (String codec : codecs) {
+      
this.cluster.getConfiguration().getCodecRegistry().register((TypeCodec<?>) 
Class.forName(codec).newInstance());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
new file mode 100644
index 0000000..7d5ac05
--- /dev/null
+++ 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
@@ -0,0 +1,242 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.gora.cassandra.store;
+
+import org.apache.gora.cassandra.bean.CassandraKey;
+import org.apache.gora.cassandra.bean.Field;
+import org.apache.gora.cassandra.bean.KeySpace;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class represents the Cassandra Mapping.
+ */
+public class CassandraMapping {
+
+  private static final String PRIMARY_KEY = "primarykey";
+  private CassandraKey cassandraKey;
+  private Map<String, String> tableProperties;
+  private Class keyClass;
+  private Class persistentClass;
+  private KeySpace keySpace;
+  private List<Field> fieldList;
+  private Field inlinedDefinedPartitionKey;
+  private String coreName;
+
+  /**
+   * Constructor of the class
+   */
+  CassandraMapping() {
+    this.fieldList = new ArrayList<>();
+    this.tableProperties = new HashMap<>();
+  }
+
+  /**
+   * This method returns the KeySpace in the mapping file,
+   *
+   * @return Key space {@link KeySpace}
+   */
+  public KeySpace getKeySpace() {
+    return keySpace;
+  }
+
+  /**
+   * This method set the KeySpace in the Cassandra mapping.
+   *
+   * @param keySpace Key space {@link KeySpace}
+   */
+  void setKeySpace(KeySpace keySpace) {
+    this.keySpace = keySpace;
+  }
+
+  /**
+   * Thi method returns only the fields which belongs only for the Persistent 
Object.
+   *
+   * @return List of Fields
+   */
+  public List<Field> getFieldList() {
+    return fieldList;
+  }
+
+  /**
+   * This method returns the Persistent Object's Field from the mapping, 
according to the FieldName.
+   *
+   * @param fieldName Field Name
+   * @return Field {@link Field}
+   */
+  public Field getFieldFromFieldName(String fieldName) {
+    for (Field field1 : fieldList) {
+      if (field1.getFieldName().equalsIgnoreCase(fieldName)) {
+        return field1;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * This method returns the Persistent Object's Field from the mapping, 
according to the ColumnName.
+   *
+   * @param columnName Column Name
+   * @return Field {@link Field}
+   */
+  public Field getFieldFromColumnName(String columnName) {
+    for (Field field1 : fieldList) {
+      if (field1.getColumnName().equalsIgnoreCase(columnName)) {
+        return field1;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * This method returns the Field Names
+   *
+   * @return array of Field Names
+   */
+  public String[] getFieldNames() {
+    List<String> fieldNames = new ArrayList<>(fieldList.size());
+    for (Field field : fieldList) {
+      fieldNames.add(field.getFieldName());
+    }
+    String[] fieldNameArray = new String[fieldNames.size()];
+    return fieldNames.toArray(fieldNameArray);
+  }
+
+  /**
+   * This method returns partition keys
+   *
+   * @return partitionKeys
+   */
+  public String[] getAllFieldsIncludingKeys() {
+    List<String> fieldNames = new ArrayList<>(fieldList.size());
+    for (Field field : fieldList) {
+      fieldNames.add(field.getFieldName());
+    }
+    if (cassandraKey != null) {
+      for (Field field : cassandraKey.getFieldList()) {
+        fieldNames.add(field.getFieldName());
+      }
+    }
+    String[] fieldNameArray = new String[fieldNames.size()];
+    return fieldNames.toArray(fieldNameArray);
+  }
+
+  /**
+   * This method return all the fields which involves with partition keys, 
Including composite Keys
+   * @return field Names
+   */
+  public String[] getAllKeys() {
+    List<String> fieldNames = new ArrayList<>();
+    Field keyField = getInlinedDefinedPartitionKey();
+    if (cassandraKey != null) {
+      for (Field field : cassandraKey.getFieldList()) {
+        fieldNames.add(field.getFieldName());
+      }
+    } else {
+      fieldNames.add(keyField.getFieldName());
+    }
+    String[] fieldNameArray = new String[fieldNames.size()];
+    return fieldNames.toArray(fieldNameArray);
+  }
+
+  public CassandraKey getCassandraKey() {
+    return cassandraKey;
+  }
+
+  void setCassandraKey(CassandraKey cassandraKey) {
+    this.cassandraKey = cassandraKey;
+  }
+
+  public String getCoreName() {
+    return coreName;
+  }
+
+  void setCoreName(String coreName) {
+    this.coreName = coreName;
+  }
+
+  void addCassandraField(Field field) {
+    this.fieldList.add(field);
+  }
+
+  void addProperty(String key, String value) {
+    this.tableProperties.put(key, value);
+  }
+
+  public String getProperty(String key) {
+    return this.tableProperties.get(key);
+  }
+
+  private Field getDefaultCassandraKey() {
+    Field field = new Field();
+    field.setFieldName("defaultId");
+    field.setColumnName("defaultId");
+    field.setType("varchar");
+    field.addProperty("primarykey", "true");
+    return field;
+  }
+
+  public Class getKeyClass() {
+    return keyClass;
+  }
+
+  public void setKeyClass(Class keyClass) {
+    this.keyClass = keyClass;
+  }
+
+  public Class getPersistentClass() {
+    return persistentClass;
+  }
+
+  void setPersistentClass(Class persistentClass) {
+    this.persistentClass = persistentClass;
+  }
+
+  /**
+   * This method return the Inlined defined Partition Key,
+   * If there isn't any inlined define partition keys,
+   * this method returns default predefined partition key "defaultId".
+   *
+   * @return Partition Key {@link Field}
+   */
+  public Field getInlinedDefinedPartitionKey() {
+    if (inlinedDefinedPartitionKey != null) {
+      return inlinedDefinedPartitionKey;
+    } else {
+      for (Field field : fieldList) {
+        if (Boolean.parseBoolean(field.getProperty(PRIMARY_KEY))) {
+          inlinedDefinedPartitionKey = field;
+          break;
+        }
+      }
+      if (inlinedDefinedPartitionKey == null) {
+        return getDefaultCassandraKey();
+      }
+      return inlinedDefinedPartitionKey;
+    }
+  }
+
+  void finalized() {
+    Field field = getInlinedDefinedPartitionKey();
+    if (!fieldList.contains(field) && cassandraKey == null) {
+      fieldList.add(field);
+    }
+  }
+}

Reply via email to