http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
 
b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
index 113d0c4..1d9093d 100644
--- 
a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
+++ 
b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
@@ -17,35 +17,8 @@
 
 package org.apache.gora.cassandra.store;
 
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.HostDistance;
-import com.datastax.driver.core.KeyspaceMetadata;
-import com.datastax.driver.core.PoolingOptions;
-import com.datastax.driver.core.ProtocolOptions;
-import com.datastax.driver.core.ProtocolVersion;
-import com.datastax.driver.core.QueryOptions;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.SocketOptions;
-import com.datastax.driver.core.TableMetadata;
-import com.datastax.driver.core.TypeCodec;
-import com.datastax.driver.core.policies.ConstantReconnectionPolicy;
-import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
-import com.datastax.driver.core.policies.DefaultRetryPolicy;
-import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy;
-import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
-import com.datastax.driver.core.policies.FallthroughRetryPolicy;
-import com.datastax.driver.core.policies.LatencyAwarePolicy;
-import com.datastax.driver.core.policies.LoggingRetryPolicy;
-import com.datastax.driver.core.policies.RoundRobinPolicy;
-import com.datastax.driver.core.policies.TokenAwarePolicy;
-import com.datastax.driver.mapping.Mapper;
-import com.datastax.driver.mapping.MappingManager;
-import org.apache.gora.cassandra.bean.CassandraKey;
-import org.apache.gora.cassandra.bean.ClusterKeyField;
-import org.apache.gora.cassandra.bean.Field;
-import org.apache.gora.cassandra.bean.KeySpace;
-import org.apache.gora.cassandra.bean.PartitionKeyField;
+import org.apache.gora.cassandra.persistent.CassandraNativePersistent;
+import org.apache.gora.cassandra.serializers.CassandraSerializer;
 import org.apache.gora.persistency.BeanFactory;
 import org.apache.gora.persistency.Persistent;
 import org.apache.gora.query.PartitionQuery;
@@ -53,18 +26,11 @@ import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
 import org.apache.gora.store.DataStore;
 import org.apache.gora.store.DataStoreFactory;
-import org.jdom.Attribute;
-import org.jdom.Document;
-import org.jdom.Element;
-import org.jdom.JDOMException;
-import org.jdom.input.SAXBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Locale;
 import java.util.Properties;
 
 /**
@@ -81,19 +47,23 @@ public class CassandraStore<K, T extends Persistent> 
implements DataStore<K, T>
 
   private BeanFactory<K, T> beanFactory;
 
-  private Cluster cluster;
+  private Class<K> keyClass;
 
-  private Class keyClass;
-
-  private Class persistentClass;
+  private Class<T> persistentClass;
 
   private CassandraMapping mapping;
 
-  private boolean isUseNativeSerialization;
+  private CassandraSerializer cassandraSerializer;
+
+  public enum SerializerType {
+    AVRO("AVRO"), NATIVE("NATIVE");
+    String val;
 
-  private Mapper<T> mapper;
+    SerializerType(String v) {
+      this.val = v;
+    }
+  }
 
-  private Session session;
 
   public CassandraStore() {
     super();
@@ -113,22 +83,11 @@ public class CassandraStore<K, T extends Persistent> 
implements DataStore<K, T>
       this.keyClass = keyClass;
       this.persistentClass = persistentClass;
       String mappingFile = DataStoreFactory.getMappingFile(properties, this, 
DEFAULT_MAPPING_FILE);
-      List<String> codecs = readCustomCodec(properties);
-      mapping = readMapping(mappingFile);
-      isUseNativeSerialization = 
Boolean.parseBoolean(properties.getProperty(CassandraStoreParameters.USE_CASSANDRA_NATIVE_SERIALIZATION));
-      Cluster.Builder builder = Cluster.builder();
-      builder = populateSettings(builder, properties);
-      this.cluster = builder.build();
-      if (codecs != null) {
-        registerCustomCodecs(codecs);
-      }
-      this.session = this.cluster.connect();
-      if (isUseNativeSerialization) {
-        this.createSchema();
-        MappingManager mappingManager = new MappingManager(session);
-        mapper = mappingManager.mapper(persistentClass);
-      }
-
+      CassandraMappingBuilder mappingBuilder = new 
CassandraMappingBuilder(this);
+      mapping = mappingBuilder.readMapping(mappingFile);
+      CassandraClient cassandraClient = new CassandraClient();
+      cassandraClient.initialize(properties);
+      cassandraSerializer = CassandraSerializer.getSerializer(cassandraClient, 
properties.getProperty(CassandraStoreParameters.CASSANDRA_SERIALIZATION_TYPE), 
keyClass, persistentClass, mapping);
     } catch (Exception e) {
       LOG.error("Error while initializing Cassandra store: {}",
               new Object[]{e.getMessage()});
@@ -136,501 +95,13 @@ public class CassandraStore<K, T extends Persistent> 
implements DataStore<K, T>
     }
   }
 
-  private void registerCustomCodecs(List<String> codecs) throws Exception {
-    for (String codec : codecs) {
-      
this.cluster.getConfiguration().getCodecRegistry().register((TypeCodec<?>) 
Class.forName(codec).newInstance());
-    }
-  }
-
-  /**
-   * In this method we reads the mapping file and creates the Cassandra 
Mapping.
-   *
-   * @param filename mapping file name
-   * @return @{@link CassandraMapping}
-   * @throws IOException
-   */
-  private CassandraMapping readMapping(String filename) throws IOException {
-    CassandraMapping map = new CassandraMapping();
-    try {
-      SAXBuilder builder = new SAXBuilder();
-      Document doc = 
builder.build(getClass().getClassLoader().getResourceAsStream(filename));
-
-      List<Element> keyspaces = doc.getRootElement().getChildren("keyspace");
-      List<Element> classes = doc.getRootElement().getChildren("class");
-      List<Element> keys = doc.getRootElement().getChildren("cassandraKey");
-
-      boolean classMatched = false;
-      for (Element classElement : classes) {
-        if (classElement.getAttributeValue("keyClass").equals(
-                keyClass.getCanonicalName())
-                && classElement.getAttributeValue("name").equals(
-                persistentClass.getCanonicalName())) {
-
-          classMatched = true;
-          String tableName = classElement.getAttributeValue("table");
-          map.setCoreName(tableName);
-
-          List classAttributes = classElement.getAttributes();
-          for (Object anAttributeList : classAttributes) {
-            Attribute attribute = (Attribute) anAttributeList;
-            String attributeName = attribute.getName();
-            String attributeValue = attribute.getValue();
-            map.addProperty(attributeName, attributeValue);
-          }
-
-          List<Element> fields = classElement.getChildren("field");
-
-          for (Element field : fields) {
-            Field cassandraField = new Field();
-
-            List fieldAttributes = field.getAttributes();
-            processAttributes(fieldAttributes, cassandraField);
-            map.addCassandraField(cassandraField);
-          }
-          break;
-        }
-        LOG.warn("Check that 'keyClass' and 'name' parameters in 
gora-solr-mapping.xml "
-                + "match with intended values. A mapping mismatch has been 
found therefore "
-                + "no mapping has been initialized for class mapping at 
position "
-                + " {} in mapping file.", classes.indexOf(classElement));
-      }
-      if (!classMatched) {
-        LOG.error("Check that 'keyClass' and 'name' parameters in {} no 
mapping has been initialized for {} class mapping", filename, persistentClass);
-      }
-
-      String keyspaceName = map.getProperty("keyspace");
-      if (keyspaceName != null) {
-        KeySpace keyspace;
-        for (Element keyspaceElement : keyspaces) {
-          if (keyspaceName.equals(keyspaceElement.getAttributeValue("name"))) {
-            keyspace = new KeySpace();
-            List fieldAttributes = keyspaceElement.getAttributes();
-            for (Object attributeObject : fieldAttributes) {
-              Attribute attribute = (Attribute) attributeObject;
-              String attributeName = attribute.getName();
-              String attributeValue = attribute.getValue();
-              switch (attributeName) {
-                case "name":
-                  keyspace.setName(attributeValue);
-                  break;
-                case "durableWrite":
-                  
keyspace.setDurableWritesEnabled(Boolean.parseBoolean(attributeValue));
-                  break;
-                default:
-                  keyspace.addProperty(attributeName, attributeValue);
-                  break;
-              }
-            }
-            Element placementStrategy = 
keyspaceElement.getChild("placementStrategy");
-            switch 
(KeySpace.PlacementStrategy.valueOf(placementStrategy.getAttributeValue("name")))
 {
-              case SimpleStrategy:
-                
keyspace.setPlacementStrategy(KeySpace.PlacementStrategy.SimpleStrategy);
-                
keyspace.setReplicationFactor(Integer.parseInt(placementStrategy.getAttributeValue("replication_factor")));
-                break;
-              case NetworkTopologyStrategy:
-                List<Element> dataCenters = 
placementStrategy.getChildren("datacenter");
-                
keyspace.setPlacementStrategy(KeySpace.PlacementStrategy.NetworkTopologyStrategy);
-                for (Element dataCenter : dataCenters) {
-                  String dataCenterName = dataCenter.getAttributeValue("name");
-                  Integer dataCenterReplicationFactor = 
Integer.valueOf(dataCenter.getAttributeValue("replication_factor"));
-                  keyspace.addDataCenter(dataCenterName, 
dataCenterReplicationFactor);
-                }
-                break;
-            }
-            map.setKeySpace(keyspace);
-            break;
-          }
-
-        }
-
-      }
-
-      for (Element key : keys) {
-        if (keyClass.getName().equals(key.getAttributeValue("name"))) {
-          CassandraKey cassandraKey = new CassandraKey(keyClass.getName());
-          Element partitionKeys = key.getChild("partitionKey");
-          Element clusterKeys = key.getChild("clusterKey");
-          List<Element> partitionKeyFields = 
partitionKeys.getChildren("field");
-          List<Element> partitionCompositeKeyFields = 
partitionKeys.getChildren("compositeKey");
-          // process non composite partition keys
-          for (Element partitionKeyField : partitionKeyFields) {
-            PartitionKeyField fieldKey = new PartitionKeyField();
-            List fieldAttributes = partitionKeyField.getAttributes();
-            processAttributes(fieldAttributes, fieldKey);
-            cassandraKey.addPartitionKeyField(fieldKey);
-          }
-          // process composite partitions keys
-          for (Element partitionCompositeKeyField : 
partitionCompositeKeyFields) {
-            PartitionKeyField compositeFieldKey = new PartitionKeyField();
-            compositeFieldKey.setComposite(true);
-            List<Element> compositeKeyFields = 
partitionCompositeKeyField.getChildren("field");
-            for (Element partitionKeyField : compositeKeyFields) {
-              PartitionKeyField fieldKey = new PartitionKeyField();
-              List fieldAttributes = partitionKeyField.getAttributes();
-              processAttributes(fieldAttributes, fieldKey);
-              compositeFieldKey.addField(fieldKey);
-            }
-            cassandraKey.addPartitionKeyField(compositeFieldKey);
-          }
-
-          //process cluster keys
-          List<Element> clusterKeyFields = clusterKeys.getChildren("field");
-          for (Element clusterKeyField : clusterKeyFields) {
-            ClusterKeyField keyField = new ClusterKeyField();
-            List fieldAttributes = clusterKeyField.getAttributes();
-            for (Object anAttributeList : fieldAttributes) {
-              Attribute attribute = (Attribute) anAttributeList;
-              String attributeName = attribute.getName();
-              String attributeValue = attribute.getValue();
-              switch (attributeName) {
-                case "name":
-                  keyField.setFieldName(attributeValue);
-                  break;
-                case "column":
-                  keyField.setColumnName(attributeValue);
-                  break;
-                case "type":
-                  keyField.setType(attributeValue);
-                  break;
-                case "order":
-                  
keyField.setOrder(ClusterKeyField.Order.valueOf(attributeValue.toUpperCase(Locale.ENGLISH)));
-                  break;
-                default:
-                  keyField.addProperty(attributeName, attributeValue);
-                  break;
-              }
-            }
-            cassandraKey.addClusterKeyField(keyField);
-          }
-          map.setCassandraKey(cassandraKey);
-        }
-      }
-    } catch (Exception ex) {
-      throw new IOException(ex);
-    }
-    return map;
-  }
-
-  private void processAttributes(List<Element> attributes, Field fieldKey) {
-    for (Object anAttributeList : attributes) {
-      Attribute attribute = (Attribute) anAttributeList;
-      String attributeName = attribute.getName();
-      String attributeValue = attribute.getValue();
-      switch (attributeName) {
-        case "name":
-          fieldKey.setFieldName(attributeValue);
-          break;
-        case "column":
-          fieldKey.setColumnName(attributeValue);
-          break;
-        case "type":
-          fieldKey.setType(attributeValue);
-          break;
-        default:
-          fieldKey.addProperty(attributeName, attributeValue);
-          break;
-      }
-    }
-  }
-
-  private List<String> readCustomCodec(Properties properties) throws 
JDOMException, IOException {
-    String filename = 
properties.getProperty(CassandraStoreParameters.CUSTOM_CODEC_FILE);
-    if (filename != null) {
-      List<String> codecs = new ArrayList<>();
-      SAXBuilder builder = new SAXBuilder();
-      Document doc = 
builder.build(getClass().getClassLoader().getResourceAsStream(filename));
-      List<Element> codecElementList = 
doc.getRootElement().getChildren("codec");
-      for (Element codec : codecElementList) {
-        codecs.add(codec.getValue());
-      }
-      return codecs;
-    }
-    return null;
-  }
-
-  private Cluster.Builder populateSettings(Cluster.Builder builder, Properties 
properties) {
-    String serversParam = 
properties.getProperty(CassandraStoreParameters.CASSANDRA_SERVERS);
-    String[] servers = serversParam.split(",");
-    for (String server : servers) {
-      builder = builder.addContactPoint(server);
-    }
-    String portProp = properties.getProperty(CassandraStoreParameters.PORT);
-    if (portProp != null) {
-      builder = builder.withPort(Integer.parseInt(portProp));
-    }
-    String clusterNameProp = 
properties.getProperty(CassandraStoreParameters.CLUSTER_NAME);
-    if (clusterNameProp != null) {
-      builder = builder.withClusterName(clusterNameProp);
-    }
-    String compressionProp = 
properties.getProperty(CassandraStoreParameters.COMPRESSION);
-    if (compressionProp != null) {
-      builder = 
builder.withCompression(ProtocolOptions.Compression.valueOf(compressionProp));
-    }
-    builder = this.populateCredentials(properties, builder);
-    builder = this.populateLoadBalancingProp(properties, builder);
-    String enableJMXProp = 
properties.getProperty(CassandraStoreParameters.ENABLE_JMX_REPORTING);
-    if (enableJMXProp != null) {
-      if (!Boolean.parseBoolean(enableJMXProp)) {
-        builder = builder.withoutJMXReporting();
-      }
-    }
-    String enableMetricsProp = 
properties.getProperty(CassandraStoreParameters.ENABLE_METRICS);
-    if (enableMetricsProp != null) {
-      if (!Boolean.parseBoolean(enableMetricsProp)) {
-        builder = builder.withoutMetrics();
-      }
-    }
-    builder = this.populatePoolingSettings(properties, builder);
-    String versionProp = 
properties.getProperty(CassandraStoreParameters.PROTOCOL_VERSION);
-    if (versionProp != null) {
-      builder = 
builder.withProtocolVersion(ProtocolVersion.fromInt(Integer.parseInt(versionProp)));
-    }
-    builder = this.populateQueryOptions(properties, builder);
-    builder = this.populateReconnectPolicy(properties, builder);
-    builder = this.populateRetrytPolicy(properties, builder);
-    builder = this.populateSocketOptions(properties, builder);
-    String enableSSLProp = 
properties.getProperty(CassandraStoreParameters.ENABLE_SSL);
-    if (enableSSLProp != null) {
-      if (Boolean.parseBoolean(enableSSLProp)) {
-        builder = builder.withSSL();
-      }
-    }
-    return builder;
-  }
-
-
-  private Cluster.Builder populateLoadBalancingProp(Properties properties, 
Cluster.Builder builder) {
-    String loadBalancingProp = 
properties.getProperty(CassandraStoreParameters.LOAD_BALANCING_POLICY);
-    if (loadBalancingProp != null) {
-      switch (loadBalancingProp) {
-        case "LatencyAwareRoundRobinPolicy":
-          builder = 
builder.withLoadBalancingPolicy(LatencyAwarePolicy.builder(new 
RoundRobinPolicy()).build());
-          break;
-        case "RoundRobinPolicy":
-          builder = builder.withLoadBalancingPolicy(new RoundRobinPolicy());
-          break;
-        case "DCAwareRoundRobinPolicy": {
-          String dataCenter = 
properties.getProperty(CassandraStoreParameters.DATA_CENTER);
-          boolean allowRemoteDCsForLocalConsistencyLevel = 
Boolean.parseBoolean(
-                  
properties.getProperty(CassandraStoreParameters.ALLOW_REMOTE_DCS_FOR_LOCAL_CONSISTENCY_LEVEL));
-          if (dataCenter != null && !dataCenter.isEmpty()) {
-            if (allowRemoteDCsForLocalConsistencyLevel) {
-              builder = builder.withLoadBalancingPolicy(
-                      DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter)
-                              
.allowRemoteDCsForLocalConsistencyLevel().build());
-            } else {
-              builder = builder.withLoadBalancingPolicy(
-                      
DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter).build());
-            }
-          } else {
-            if (allowRemoteDCsForLocalConsistencyLevel) {
-              builder = builder.withLoadBalancingPolicy(
-                      
(DCAwareRoundRobinPolicy.builder().allowRemoteDCsForLocalConsistencyLevel().build()));
-            } else {
-              builder = 
builder.withLoadBalancingPolicy((DCAwareRoundRobinPolicy.builder().build()));
-            }
-          }
-          break;
-        }
-        case "TokenAwareRoundRobinPolicy":
-          builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy(new 
RoundRobinPolicy()));
-          break;
-        case "TokenAwareDCAwareRoundRobinPolicy": {
-          String dataCenter = 
properties.getProperty(CassandraStoreParameters.DATA_CENTER);
-          boolean allowRemoteDCsForLocalConsistencyLevel = 
Boolean.parseBoolean(
-                  
properties.getProperty(CassandraStoreParameters.ALLOW_REMOTE_DCS_FOR_LOCAL_CONSISTENCY_LEVEL));
-          if (dataCenter != null && !dataCenter.isEmpty()) {
-            if (allowRemoteDCsForLocalConsistencyLevel) {
-              builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy(
-                      DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter)
-                              
.allowRemoteDCsForLocalConsistencyLevel().build()));
-            } else {
-              builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy(
-                      
DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter).build()));
-            }
-          } else {
-            if (allowRemoteDCsForLocalConsistencyLevel) {
-              builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy(
-                      
DCAwareRoundRobinPolicy.builder().allowRemoteDCsForLocalConsistencyLevel().build()));
-            } else {
-              builder = builder.withLoadBalancingPolicy(
-                      new 
TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build()));
-            }
-          }
-          break;
-        }
-        default:
-          LOG.error("Unsupported Cassandra load balancing  policy: {} ", 
loadBalancingProp);
-          break;
-      }
-    }
-    return builder;
-  }
-
-  private Cluster.Builder populateCredentials(Properties properties, 
Cluster.Builder builder) {
-    String usernameProp = 
properties.getProperty(CassandraStoreParameters.USERNAME);
-    String passwordProp = 
properties.getProperty(CassandraStoreParameters.PASSWORD);
-    if (usernameProp != null) {
-      builder = builder.withCredentials(usernameProp, passwordProp);
-    }
-    return builder;
-  }
-
-  private Cluster.Builder populatePoolingSettings(Properties properties, 
Cluster.Builder builder) {
-    String localCoreConnectionsPerHost = 
properties.getProperty(CassandraStoreParameters.LOCAL_CORE_CONNECTIONS_PER_HOST);
-    String remoteCoreConnectionsPerHost = 
properties.getProperty(CassandraStoreParameters.REMOTE_CORE_CONNECTIONS_PER_HOST);
-    String localMaxConnectionsPerHost = 
properties.getProperty(CassandraStoreParameters.LOCAL_MAX_CONNECTIONS_PER_HOST);
-    String remoteMaxConnectionsPerHost = 
properties.getProperty(CassandraStoreParameters.REMOTE_MAX_CONNECTIONS_PER_HOST);
-    String localNewConnectionThreshold = 
properties.getProperty(CassandraStoreParameters.LOCAL_NEW_CONNECTION_THRESHOLD);
-    String remoteNewConnectionThreshold = 
properties.getProperty(CassandraStoreParameters.REMOTE_NEW_CONNECTION_THRESHOLD);
-    String localMaxRequestsPerConnection = 
properties.getProperty(CassandraStoreParameters.LOCAL_MAX_REQUESTS_PER_CONNECTION);
-    String remoteMaxRequestsPerConnection = 
properties.getProperty(CassandraStoreParameters.REMOTE_MAX_REQUESTS_PER_CONNECTION);
-    PoolingOptions options = new PoolingOptions();
-    if (localCoreConnectionsPerHost != null) {
-      options.setCoreConnectionsPerHost(HostDistance.LOCAL, 
Integer.parseInt(localCoreConnectionsPerHost));
-    }
-    if (remoteCoreConnectionsPerHost != null) {
-      options.setCoreConnectionsPerHost(HostDistance.REMOTE, 
Integer.parseInt(remoteCoreConnectionsPerHost));
-    }
-    if (localMaxConnectionsPerHost != null) {
-      options.setMaxConnectionsPerHost(HostDistance.LOCAL, 
Integer.parseInt(localMaxConnectionsPerHost));
-    }
-    if (remoteMaxConnectionsPerHost != null) {
-      options.setMaxConnectionsPerHost(HostDistance.REMOTE, 
Integer.parseInt(remoteMaxConnectionsPerHost));
-    }
-    if (localNewConnectionThreshold != null) {
-      options.setNewConnectionThreshold(HostDistance.LOCAL, 
Integer.parseInt(localNewConnectionThreshold));
-    }
-    if (remoteNewConnectionThreshold != null) {
-      options.setNewConnectionThreshold(HostDistance.REMOTE, 
Integer.parseInt(remoteNewConnectionThreshold));
-    }
-    if (localMaxRequestsPerConnection != null) {
-      options.setMaxRequestsPerConnection(HostDistance.LOCAL, 
Integer.parseInt(localMaxRequestsPerConnection));
-    }
-    if (remoteMaxRequestsPerConnection != null) {
-      options.setMaxRequestsPerConnection(HostDistance.REMOTE, 
Integer.parseInt(remoteMaxRequestsPerConnection));
-    }
-    builder = builder.withPoolingOptions(options);
-    return builder;
-  }
-
-  private Cluster.Builder populateQueryOptions(Properties properties, 
Cluster.Builder builder) {
-    String consistencyLevelProp = 
properties.getProperty(CassandraStoreParameters.CONSISTENCY_LEVEL);
-    String serialConsistencyLevelProp = 
properties.getProperty(CassandraStoreParameters.SERIAL_CONSISTENCY_LEVEL);
-    String fetchSize = 
properties.getProperty(CassandraStoreParameters.FETCH_SIZE);
-    QueryOptions options = new QueryOptions();
-    if (consistencyLevelProp != null) {
-      
options.setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevelProp));
-    }
-    if (serialConsistencyLevelProp != null) {
-      
options.setSerialConsistencyLevel(ConsistencyLevel.valueOf(serialConsistencyLevelProp));
-    }
-    if (fetchSize != null) {
-      options.setFetchSize(Integer.parseInt(fetchSize));
-    }
-    return builder.withQueryOptions(options);
-  }
-
-  private Cluster.Builder populateReconnectPolicy(Properties properties, 
Cluster.Builder builder) {
-    String reconnectionPolicy = 
properties.getProperty(CassandraStoreParameters.RECONNECTION_POLICY);
-    if (reconnectionPolicy != null) {
-      switch (reconnectionPolicy) {
-        case "ConstantReconnectionPolicy": {
-          String constantReconnectionPolicyDelay = 
properties.getProperty(CassandraStoreParameters.CONSTANT_RECONNECTION_POLICY_DELAY);
-          ConstantReconnectionPolicy policy = new 
ConstantReconnectionPolicy(Long.parseLong(constantReconnectionPolicyDelay));
-          builder = builder.withReconnectionPolicy(policy);
-          break;
-        }
-        case "ExponentialReconnectionPolicy": {
-          String exponentialReconnectionPolicyBaseDelay = 
properties.getProperty(CassandraStoreParameters.EXPONENTIAL_RECONNECTION_POLICY_BASE_DELAY);
-          String exponentialReconnectionPolicyMaxDelay = 
properties.getProperty(CassandraStoreParameters.EXPONENTIAL_RECONNECTION_POLICY_MAX_DELAY);
-
-          ExponentialReconnectionPolicy policy = new 
ExponentialReconnectionPolicy(Long.parseLong(exponentialReconnectionPolicyBaseDelay),
-                  Long.parseLong(exponentialReconnectionPolicyMaxDelay));
-          builder = builder.withReconnectionPolicy(policy);
-          break;
-        }
-        default:
-          LOG.error("Unsupported reconnection policy : {} ", 
reconnectionPolicy);
-      }
-    }
-    return builder;
-  }
-
-  private Cluster.Builder populateRetrytPolicy(Properties properties, 
Cluster.Builder builder) {
-    String retryPolicy = 
properties.getProperty(CassandraStoreParameters.RETRY_POLICY);
-    if (retryPolicy != null) {
-      switch (retryPolicy) {
-        case "DefaultRetryPolicy":
-          builder = builder.withRetryPolicy(DefaultRetryPolicy.INSTANCE);
-          break;
-        case "DowngradingConsistencyRetryPolicy":
-          builder = 
builder.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE);
-          break;
-        case "FallthroughRetryPolicy":
-          builder = builder.withRetryPolicy(FallthroughRetryPolicy.INSTANCE);
-          break;
-        case "LoggingDefaultRetryPolicy":
-          builder = builder.withRetryPolicy(new 
LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE));
-          break;
-        case "LoggingDowngradingConsistencyRetryPolicy":
-          builder = builder.withRetryPolicy(new 
LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE));
-          break;
-        case "LoggingFallthroughRetryPolicy":
-          builder = builder.withRetryPolicy(new 
LoggingRetryPolicy(FallthroughRetryPolicy.INSTANCE));
-          break;
-        default:
-          LOG.error("Unsupported retry policy : {} ", retryPolicy);
-          break;
-      }
-    }
-    return builder;
-  }
-
-  private Cluster.Builder populateSocketOptions(Properties properties, 
Cluster.Builder builder) {
-    String connectionTimeoutMillisProp = 
properties.getProperty(CassandraStoreParameters.CONNECTION_TIMEOUT_MILLIS);
-    String keepAliveProp = 
properties.getProperty(CassandraStoreParameters.KEEP_ALIVE);
-    String readTimeoutMillisProp = 
properties.getProperty(CassandraStoreParameters.READ_TIMEOUT_MILLIS);
-    String receiveBufferSizeProp = 
properties.getProperty(CassandraStoreParameters.RECEIVER_BUFFER_SIZE);
-    String reuseAddress = 
properties.getProperty(CassandraStoreParameters.REUSE_ADDRESS);
-    String sendBufferSize = 
properties.getProperty(CassandraStoreParameters.SEND_BUFFER_SIZE);
-    String soLinger = 
properties.getProperty(CassandraStoreParameters.SO_LINGER);
-    String tcpNoDelay = 
properties.getProperty(CassandraStoreParameters.TCP_NODELAY);
-    SocketOptions options = new SocketOptions();
-    if (connectionTimeoutMillisProp != null) {
-      
options.setConnectTimeoutMillis(Integer.parseInt(connectionTimeoutMillisProp));
-    }
-    if (keepAliveProp != null) {
-      options.setKeepAlive(Boolean.parseBoolean(keepAliveProp));
-    }
-    if (readTimeoutMillisProp != null) {
-      options.setReadTimeoutMillis(Integer.parseInt(readTimeoutMillisProp));
-    }
-    if (receiveBufferSizeProp != null) {
-      options.setReceiveBufferSize(Integer.parseInt(receiveBufferSizeProp));
-    }
-    if (reuseAddress != null) {
-      options.setReuseAddress(Boolean.parseBoolean(reuseAddress));
-    }
-    if (sendBufferSize != null) {
-      options.setSendBufferSize(Integer.parseInt(sendBufferSize));
-    }
-    if (soLinger != null) {
-      options.setSoLinger(Integer.parseInt(soLinger));
-    }
-    if (tcpNoDelay != null) {
-      options.setTcpNoDelay(Boolean.parseBoolean(tcpNoDelay));
-    }
-    return builder.withSocketOptions(options);
-  }
-
   /**
    * {@inheritDoc}
    * <p>
    * This is a setter method to set the class of persistent objects.
    *
    * @param persistentClass class of persistent objects
-   *                        {@link 
org.apache.gora.cassandra.serializers.CassandraNativePersistent}
+   *                        {@link CassandraNativePersistent}
    *                        {@link  org.apache.gora.persistency.Persistent}
    */
   @Override
@@ -638,9 +109,10 @@ public class CassandraStore<K, T extends Persistent> 
implements DataStore<K, T>
     this.persistentClass = persistentClass;
   }
 
+  @SuppressWarnings("all")
   @Override
   public Class<T> getPersistentClass() {
-    return this.persistentClass;
+    return (Class<T>) this.persistentClass;
   }
 
   @Override
@@ -650,20 +122,15 @@ public class CassandraStore<K, T extends Persistent> 
implements DataStore<K, T>
 
   @Override
   public void createSchema() {
-    LOG.debug("creating Cassandra keyspace {}", 
mapping.getKeySpace().getName());
-    
this.session.execute(CassandraQueryFactory.getCreateKeySpaceQuery(mapping));
-    LOG.debug("creating Cassandra column family / table {}", 
mapping.getCoreName());
-    this.session.execute(CassandraQueryFactory.getCreateTableQuery(mapping));
+    cassandraSerializer.createSchema();
   }
 
   @Override
   public void deleteSchema() {
-    LOG.debug("dropping Cassandra table {}", mapping.getCoreName());
-    this.session.execute(CassandraQueryFactory.getDropTableQuery(mapping));
-    LOG.debug("dropping Cassandra keyspace {}", 
mapping.getKeySpace().getName());
-    this.session.execute(CassandraQueryFactory.getDropKeySpaceQuery(mapping));
+    cassandraSerializer.deleteSchema();
   }
 
+  @SuppressWarnings("all")
   @Override
   public Class<K> getKeyClass() {
     return this.keyClass;
@@ -677,16 +144,32 @@ public class CassandraStore<K, T extends Persistent> 
implements DataStore<K, T>
   @Override
   public K newKey() {
     try {
-      return beanFactory.newKey();
+      if (beanFactory != null) {
+        return beanFactory.newKey();
+      } else {
+        LOG.warn("beanFactory is hasn't been initialized. It's recommended to 
initialize beanFactory.");
+        return keyClass.newInstance();
+      }
     } catch (Exception ex) {
       LOG.error(ex.getMessage(), ex);
       return null;
     }
   }
 
+  @SuppressWarnings("all")
   @Override
   public T newPersistent() {
-    return this.beanFactory.newPersistent();
+    try {
+      if (beanFactory != null) {
+        return this.beanFactory.newPersistent();
+      } else {
+        LOG.warn("beanFactory is hasn't been initialized. It's recommended to 
initialize beanFactory.");
+        return persistentClass.newInstance();
+      }
+    } catch (Exception ex) {
+      LOG.error(ex.getMessage(), ex);
+      return null;
+    }
   }
 
   @Override
@@ -701,17 +184,12 @@ public class CassandraStore<K, T extends Persistent> 
implements DataStore<K, T>
 
   @Override
   public void close() {
-    this.session.close();
-    this.cluster.close();
+    this.cassandraSerializer.close();
   }
 
   @Override
   public T get(K key) {
-    if (isUseNativeSerialization) {
-      return mapper.get(key);
-    } else {
-      return null;
-    }
+    return (T) cassandraSerializer.get(key);
   }
 
   @Override
@@ -721,21 +199,12 @@ public class CassandraStore<K, T extends Persistent> 
implements DataStore<K, T>
 
   @Override
   public void put(K key, T obj) {
-    if (isUseNativeSerialization) {
-      mapper.save(obj);
-    } else {
-
-    }
+    cassandraSerializer.put(key, obj);
   }
 
   @Override
   public boolean delete(K key) {
-    if (isUseNativeSerialization) {
-      mapper.delete(key);
-      return true;
-    } else {
-      return false;
-    }
+    return cassandraSerializer.delete(key);
   }
 
   @Override
@@ -760,7 +229,7 @@ public class CassandraStore<K, T extends Persistent> 
implements DataStore<K, T>
 
   @Override
   public void flush() {
-
+ // ignore since caching has been disabled
   }
 
   @Override
@@ -770,19 +239,12 @@ public class CassandraStore<K, T extends Persistent> 
implements DataStore<K, T>
 
   @Override
   public void truncateSchema() {
-    LOG.debug("truncating Cassandra table {}", mapping.getCoreName());
-    this.session.execute(CassandraQueryFactory.getTruncateTableQuery(mapping));
+    cassandraSerializer.truncateSchema();
   }
 
   @Override
   public boolean schemaExists() {
-    KeyspaceMetadata keyspace = 
cluster.getMetadata().getKeyspace(mapping.getKeySpace().getName());
-    if (keyspace != null) {
-      TableMetadata table = keyspace.getTable(mapping.getCoreName());
-      return table != null;
-    } else {
-      return false;
-    }
+    return cassandraSerializer.schemaExists();
   }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java
 
b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java
index c48b1c0..95e1c0f 100644
--- 
a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java
+++ 
b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java
@@ -218,7 +218,7 @@ public class CassandraStoreParameters {
    * Property pointing to use Native Cassandra Native Serialization.
    * boolean
    */
-  public static final String USE_CASSANDRA_NATIVE_SERIALIZATION = 
"gora.cassandrastore.useCassandraNativeSerialization";
+  public static final String CASSANDRA_SERIALIZATION_TYPE = 
"gora.cassandrastore.cassandraSerializationType";
   /**
    * Property pointing to the custom codec file.
    * string

http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/test/conf/avro/gora-cassandra-mapping.xml
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/test/conf/avro/gora-cassandra-mapping.xml 
b/gora-cassandra-cql/src/test/conf/avro/gora-cassandra-mapping.xml
new file mode 100644
index 0000000..e99cf92
--- /dev/null
+++ b/gora-cassandra-cql/src/test/conf/avro/gora-cassandra-mapping.xml
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<!--
+   The value of 'host' attribute of keyspace tag should match exactly what is 
in
+   gora.properties file. Essentially this means that if you are using port 
number, you should
+   use it every where regardless of whether it is the default port or not.
+   At runtime Gora will otherwise try to connect to localhost
+   https://issues.apache.org/jira/browse/GORA-269
+
+   The values of 'replication_factor' and 'placement_strategy' attribute of 
keyspace tag
+   only apply if gora create the kyespace. they have no effect if this is 
being used against 
+   an existing keyspace. the default value for 'replication_factor' is '1'
+   
+   The value of 'placement_strategy' should be a fully qualifed class name 
that is known to
+   the cassansra cluster, not the application or gora. As of this writing, the 
classes that ship
+   with cassandra are:
+   'org.apache.cassandra.locator.SimpleStrategy'
+   'org.apache.cassandra.locator.NetworkTopologyStrategy'
+   gora cassandra would use SimpleStrategy by default if no value for this 
attribute is specified
+   
+   The default value of 'gc_grace_seconds' is '0' which is ONLY VIABLE FOR 
SINGLE NODE
+   CLUSTER. you should update this value according to your cluster 
configuration. 
+   https://wiki.apache.org/cassandra/StorageConfiguration
+
+   The value of 'ttl' (time to live) attribute of field tag should most likely 
always
+   be zero unless you want Cassandra to create Tombstones and delete portions 
of your
+   data once this period expires. Any positive value is read and bound to the 
number
+   of seconds after which the value for that field will disappear. The default 
value of ttl
+   is '0'
+
+   More information on gora-cassandra configuration and mapping's can be found
+   at http://gora.apache.org/current/gora-cassandra.html
+-->
+
+<gora-otd>
+
+  <keyspace name="avroKeySpace" durableWrite="false">
+    <placementStrategy name="SimpleStrategy" replication_factor="1"/>
+  </keyspace>
+  
+  <class name="org.apache.gora.examples.generated.WebPage" 
keyClass="java.lang.String" table="WebPage" keyspace="avroKeySpace">
+    <field name="url" column="url" length="128" primarykey="true"/>
+    <field name="content" column="content" type="blob"/>
+    <field name="parsedContent" column="parsedContent" type="list"/>
+    <field name="outlinks" column="outlinks" type="map"/>
+  </class>
+
+  <class name="org.apache.gora.examples.generated.Employee" 
keyClass="java.lang.String" keyspace="avroKeySpace"
+         table="Employee" compactStorage="true" >
+    <field name="name" column="name" type="text" ttl="10"/>
+    <field name="dateOfBirth" column="dob" type="bigint" ttl="10"/>
+    <field name="ssn" column="ssn" type="int" ttl="10" primarykey="true"/>
+    <field name="salary" column="salary" type="int" ttl="10" />
+   </class>
+
+</gora-otd>

http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/test/conf/nativeSerialization/gora-cassandra-mapping.xml
----------------------------------------------------------------------
diff --git 
a/gora-cassandra-cql/src/test/conf/nativeSerialization/gora-cassandra-mapping.xml
 
b/gora-cassandra-cql/src/test/conf/nativeSerialization/gora-cassandra-mapping.xml
index 07d6590..d3dde32 100644
--- 
a/gora-cassandra-cql/src/test/conf/nativeSerialization/gora-cassandra-mapping.xml
+++ 
b/gora-cassandra-cql/src/test/conf/nativeSerialization/gora-cassandra-mapping.xml
@@ -55,7 +55,7 @@
         <placementStrategy name="SimpleStrategy" replication_factor="1"/>
     </keyspace>
 
-    <class name="org.apache.gora.cassandra.test.nativeSerialization.User" 
keyClass="java.util.UUID" keyspace="nativeTestKeySpace"
+    <class 
name="org.apache.gora.cassandra.example.generated.nativeSerialization.User" 
keyClass="java.util.UUID" keyspace="nativeTestKeySpace"
            table="Users" compactStorage="true" >
         <field name="userId" column="user_id" type="uuid" ttl="10" 
primarykey="true"/>
         <field name="name" column="name" type="text" ttl="10"/>

http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/test/java/.gitignore
----------------------------------------------------------------------
diff --git a/gora-cassandra-cql/src/test/java/.gitignore 
b/gora-cassandra-cql/src/test/java/.gitignore
deleted file mode 100644
index 09697dc..0000000
--- a/gora-cassandra-cql/src/test/java/.gitignore
+++ /dev/null
@@ -1,15 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-

http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java
 
b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java
index 5bb1114..9ebcae5 100644
--- 
a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java
+++ 
b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java
@@ -1,7 +1,24 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package org.apache.gora.cassandra.store;
 
 import org.apache.gora.cassandra.GoraCassandraTestDriver;
-import org.apache.gora.cassandra.test.nativeSerialization.User;
+import org.apache.gora.cassandra.example.generated.nativeSerialization.User;
 import org.apache.gora.store.DataStore;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -34,7 +51,7 @@ public class TestCassandraStoreWithNativeSerialization {
     parameter = new Properties();
     parameter.setProperty(CassandraStoreParameters.CASSANDRA_SERVERS, 
"localhost");
     parameter.setProperty(CassandraStoreParameters.PORT, "9042");
-    
parameter.setProperty(CassandraStoreParameters.USE_CASSANDRA_NATIVE_SERIALIZATION,
 "true");
+    
parameter.setProperty(CassandraStoreParameters.CASSANDRA_SERIALIZATION_TYPE, 
"native");
     parameter.setProperty(CassandraStoreParameters.PROTOCOL_VERSION, "3");
     parameter.setProperty(CassandraStoreParameters.CLUSTER_NAME,"Test 
Cluster");
     parameter.setProperty("gora.cassandrastore.mapping.file", 
"nativeSerialization/gora-cassandra-mapping.xml");
@@ -75,11 +92,11 @@ public class TestCassandraStoreWithNativeSerialization {
     // storing data;
     userDataStore.put(id, user1);
     // get data;
-    User olduser =  userDataStore.get(id);
+    User olduser =  userDataStore.get(user1.getUserId());
     Assert.assertEquals(olduser.getName(), user1.getName());
     Assert.assertEquals(olduser.getDateOfBirth(), user1.getDateOfBirth());
     // delete data;
-    userDataStore.delete(id);
+    userDataStore.delete(user1.getUserId());
     // get data
     User deletedUser = userDataStore.get(id);
     Assert.assertNull(deletedUser);

http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/test/nativeSerialization/DateAsStringCodec.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/test/nativeSerialization/DateAsStringCodec.java
 
b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/test/nativeSerialization/DateAsStringCodec.java
index f6061cf..b5ffedb 100644
--- 
a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/test/nativeSerialization/DateAsStringCodec.java
+++ 
b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/test/nativeSerialization/DateAsStringCodec.java
@@ -1,3 +1,20 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package org.apache.gora.cassandra.test.nativeSerialization;
 
 import com.datastax.driver.core.TypeCodec;

http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/test/nativeSerialization/User.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/test/nativeSerialization/User.java
 
b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/test/nativeSerialization/User.java
deleted file mode 100644
index 0f4c7ee..0000000
--- 
a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/test/nativeSerialization/User.java
+++ /dev/null
@@ -1,66 +0,0 @@
-package org.apache.gora.cassandra.test.nativeSerialization;
-
-import com.datastax.driver.mapping.annotations.Column;
-import com.datastax.driver.mapping.annotations.PartitionKey;
-import com.datastax.driver.mapping.annotations.Table;
-import com.datastax.driver.mapping.annotations.Transient;
-import org.apache.gora.cassandra.serializers.CassandraNativePersistent;
-
-import java.util.Date;
-import java.util.UUID;
-
-/**
- * Created by madhawa on 6/23/17.
- */
-
-@Table(keyspace = "nativeTestKeySpace", name = "users",
-        readConsistency = "QUORUM",
-        writeConsistency = "QUORUM",
-        caseSensitiveKeyspace = false,
-        caseSensitiveTable = false)
-public class User extends CassandraNativePersistent {
-  @PartitionKey
-  @Column(name = "user_id")
-  private UUID userId;
-  @Column(name = "name")
-  private String name;
-  @Column(name = "dob")
-  private Date dateOfBirth;
-
-  @Transient
-  private boolean dirty;
-
-  public User() {
-
-  }
-
-  public User(UUID userId, String name, Date dateOfBirth) {
-    this.userId = userId;
-    this.name = name;
-    this.dateOfBirth = dateOfBirth;
-  }
-
-  public void setUserId(UUID userId) {
-    this.userId = userId;
-  }
-
-  public void setName(String name) {
-    this.name = name;
-  }
-
-  public void setDateOfBirth(Date dateOfBirth) {
-    this.dateOfBirth = dateOfBirth;
-  }
-
-  public UUID getUserId() {
-    return userId;
-  }
-
-  public String getName() {
-    return name;
-  }
-
-  public Date getDateOfBirth() {
-    return dateOfBirth;
-  }
-}

Reply via email to