http://git-wip-us.apache.org/repos/asf/gora/blob/cbed41d0/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java
 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java
deleted file mode 100644
index c5db72b..0000000
--- 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gora.cassandra.serializers;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericArray;
-import org.apache.avro.specific.SpecificFixed;
-import org.apache.avro.util.Utf8;
-import org.apache.gora.persistency.Persistent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Utility class that infers the concrete Serializer needed to turn a value 
into
- * its binary representation
- */
-public class TypeUtils {
-
-  public static final Logger LOG = LoggerFactory.getLogger(TypeUtils.class);
-
-  // @SuppressWarnings({ "rawtypes", "unchecked" })
-  public static Class<? extends Object> getClass(Object value) {
-    return value.getClass();
-  }
-
-  public static Schema getSchema(Object value) {
-    if (value instanceof GenericArray) {
-      return Schema.createArray( getElementSchema((GenericArray<?>)value) );
-    } else {
-      return getSchema( getClass(value) );
-    }
-  }
-
-  public static Type getType(Object value) {
-    return getType( getClass(value) );
-  }
-
-  public static Type getType(Class<?> clazz) {
-    if (clazz.equals(Utf8.class)) {
-      return Type.STRING;
-    } else if (clazz.equals(Boolean.class) || clazz.equals(boolean.class)) {
-      return Type.BOOLEAN;
-    } else if (clazz.equals(ByteBuffer.class)) {
-      return Type.BYTES;
-    } else if (clazz.equals(Double.class) || clazz.equals(double.class)) {
-      return Type.DOUBLE;
-    } else if (clazz.equals(Float.class) || clazz.equals(float.class)) {
-      return Type.FLOAT;
-    } else if (clazz.equals(Integer.class) || clazz.equals(int.class)) {
-      return Type.INT;
-    } else if (clazz.equals(Long.class) || clazz.equals(long.class)) {
-      return Type.LONG;
-    } else if (clazz.isAssignableFrom(List.class)) {
-      return Type.ARRAY;
-    } else if (clazz.isAssignableFrom(Map.class)) {
-      return Type.MAP;
-    } else if (clazz.equals(Persistent.class)) {
-      return Type.RECORD;
-    } else if (clazz.getSuperclass().equals(SpecificFixed.class)) {
-      return Type.FIXED;
-    } else {
-      return null;
-    }
-  }
-
-  public static Class<?> getClass(Type type) {
-    if (type == Type.STRING) {
-      return Utf8.class;
-    } else if (type == Type.BOOLEAN) {
-      return Boolean.class;
-    } else if (type == Type.BYTES) {
-      return ByteBuffer.class;
-    } else if (type == Type.DOUBLE) {
-      return Double.class;
-    } else if (type == Type.FLOAT) {
-      return Float.class;
-    } else if (type == Type.INT) {
-      return Integer.class;
-    } else if (type == Type.LONG) {
-      return Long.class;
-    } else if (type == Type.ARRAY) {
-      return List.class;
-    } else if (type == Type.MAP) {
-      return Map.class;
-    } else if (type == Type.RECORD) {
-      return Persistent.class;
-    } else if (type == Type.FIXED) {
-      // return SpecificFixed.class;
-      return null;
-    } else {
-      return null;
-    }
-  }
-
-  public static Schema getSchema(Class<?> clazz) {
-    Type type = getType(clazz);
-    if (type == null) {
-      return null;
-    } else if (type == Type.FIXED) {
-      int size = getFixedSize(clazz);
-      String name = clazz.getName();
-      String space = null;
-      int n = name.lastIndexOf(".");
-      if (n < 0) {
-        space = name.substring(0,n);
-        name = name.substring(n+1);
-      } else {
-        space = null;
-      }
-      String doc = null; // ?
-      // LOG.info(Schema.createFixed(name, doc, space, size).toString());
-      return Schema.createFixed(name, doc, space, size);
-    } else if (type == Type.ARRAY) {
-      Object obj = null;
-      try {
-        obj = clazz.newInstance();
-      } catch (InstantiationException e) {
-        LOG.warn(e.toString());
-        return null;
-      } catch (IllegalAccessException e) {
-        LOG.warn(e.toString());
-        return null;
-      }
-      return getSchema(obj);
-    } else if (type == Type.MAP) {
-      // TODO
-      // return Schema.createMap(...);
-      return null;
-    } else if (type == Type.RECORD) {
-      // TODO
-      // return Schema.createRecord(...);
-      return null;
-    } else {
-      return Schema.create(type);
-    }
-  }
-
-  public static Class<?> getClass(Schema schema) {
-    Type type = schema.getType();
-    if (type == null) {
-      return null;
-    } else if (type == Type.FIXED) {
-      try {
-        return Class.forName( schema.getFullName() );
-      } catch (ClassNotFoundException e) {
-        LOG.warn(e.toString() + " : " + schema);
-        return null;
-      }
-    } else {
-      return getClass(type);
-    }
-  }
-
-  public static int getFixedSize(Type type) {
-    if (type == Type.BOOLEAN) {
-      return 1;
-    } else if (type == Type.DOUBLE) {
-      return 8;
-    } else if (type == Type.FLOAT) {
-      return 4;
-    } else if (type == Type.INT) {
-      return 4;
-    } else if (type == Type.LONG) {
-      return 8;
-    } else {
-      return -1;
-    }
-  }
-
-  public static int getFixedSize(Schema schema) {
-    Type type = schema.getType();
-    if (type == Type.FIXED) {
-      return schema.getFixedSize();
-    } else {
-      return getFixedSize(type);
-    }
-  }
-
-  public static int getFixedSize(Class<?> clazz) {
-    Type type = getType(clazz);
-    if (type == Type.FIXED) {
-      try {
-        return ((SpecificFixed)clazz.newInstance()).bytes().length;
-      } catch (InstantiationException e) {
-        LOG.warn(e.toString());
-        return -1;
-      } catch (IllegalAccessException e) {
-        LOG.warn(e.toString());
-        return -1;
-      }
-    } else {
-      return getFixedSize(type);
-    }
-  }
-
-  public static Schema getElementSchema(GenericArray<?> array) {
-    Schema schema = array.getSchema();
-    return (schema.getType() == Type.ARRAY) ? schema.getElementType() : schema;
-  }
-
-  /*
-  public static Schema getValueSchema(StatefulHashMap map) {
-    return map.getSchema().getValueType();
-  }
-
-  public static Type getValueType(StatefulHashMap map) {
-    return getValueSchema(map).getType();
-  }
-  */
-
-}

http://git-wip-us.apache.org/repos/asf/gora/blob/cbed41d0/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/package-info.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/package-info.java
 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/package-info.java
deleted file mode 100644
index 5d22d94..0000000
--- 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/package-info.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * This package contains Cassandra store related util classes for serializer.
- */
-package org.apache.gora.cassandra.serializers;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/gora/blob/cbed41d0/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
deleted file mode 100644
index 1f9d614..0000000
--- 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java
+++ /dev/null
@@ -1,658 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gora.cassandra.store;
-
-import static org.apache.gora.cassandra.store.CassandraStore.colFamConsLvl;
-import static org.apache.gora.cassandra.store.CassandraStore.readOpConsLvl;
-import static org.apache.gora.cassandra.store.CassandraStore.writeOpConsLvl;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel;
-import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
-import me.prettyprint.cassandra.serializers.IntegerSerializer;
-import me.prettyprint.cassandra.serializers.StringSerializer;
-import me.prettyprint.cassandra.service.CassandraHostConfigurator;
-import me.prettyprint.hector.api.Cluster;
-import me.prettyprint.hector.api.Keyspace;
-import me.prettyprint.hector.api.beans.OrderedRows;
-import me.prettyprint.hector.api.beans.OrderedSuperRows;
-import me.prettyprint.hector.api.beans.Row;
-import me.prettyprint.hector.api.beans.SuperRow;
-import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
-import me.prettyprint.hector.api.ddl.ComparatorType;
-import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
-import me.prettyprint.hector.api.factory.HFactory;
-import me.prettyprint.hector.api.mutation.Mutator;
-import me.prettyprint.hector.api.query.QueryResult;
-import me.prettyprint.hector.api.query.RangeSlicesQuery;
-import me.prettyprint.hector.api.query.RangeSuperSlicesQuery;
-import me.prettyprint.hector.api.HConsistencyLevel;
-import me.prettyprint.hector.api.Serializer;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericArray;
-import org.apache.gora.cassandra.query.CassandraQuery;
-import org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer;
-import org.apache.gora.mapreduce.GoraRecordReader;
-import org.apache.gora.persistency.impl.PersistentBase;
-import org.apache.gora.query.Query;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * CassandraClient is where all of the primary datastore functionality is 
- * executed. Typically CassandraClient is invoked by calling 
- * {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, 
Class, Properties)}.
- * CassandraClient deals with Cassandra data model definition, mutation, 
- * and general/specific mappings.
- * {@link org.apache.gora.cassandra.store.CassandraStore#initialize} .
- *
- * @param <K>
- * @param <T>
- */
-public class CassandraClient<K, T extends PersistentBase> {
-  
-  /** The logging implementation */
-  public static final Logger LOG = 
LoggerFactory.getLogger(CassandraClient.class);
-  
-  private Cluster cluster;
-  private Keyspace keyspace;
-  private Mutator<K> mutator;
-  private Class<K> keyClass;
-  private Class<T> persistentClass;
-  
-  /** Object which holds the XML mapping for Cassandra. */
-  private CassandraMapping cassandraMapping = null;
-
-  /** Hector client default column family consistency level. */
-  public static final String DEFAULT_HECTOR_CONSIS_LEVEL = "QUORUM";
-  
-  /** Cassandra serializer to be used for serializing Gora's keys. */
-  private Serializer<K> keySerializer;
-
-  /**
-   * Method to maintain backward compatibility with earlier versions.
-   * @param keyClass
-   * @param persistentClass
-   * @throws Exception
-   */
-  public void initialize(Class<K> keyClass, Class<T> persistentClass)
-    throws Exception {
-       initialize(keyClass, persistentClass, null);
-  }
-
-  /**
-   * Given our key, persistentClass from
-   * {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, 
Class, Properties)}
-   * we make best efforts to dictate our data model.
-   * We make a quick check within {@link 
org.apache.gora.cassandra.store.CassandraClient#checkKeyspace() }
-   * to see if our keyspace has already been invented, this simple check 
prevents us from
-   * recreating the keyspace if it already exists.
-   * We then simple specify (based on the input keyclass) an appropriate 
serializer
-   * via {@link 
org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer} before
-   * defining a mutator from and by which we can mutate this object.
-   *
-   * @param keyClass        the Key by which we wish o assign a record object
-   * @param persistentClass the generated {@link 
org.apache.gora.persistency.Persistent} bean representing the data.
-   * @param properties      key value pairs from gora.properties
-   * @throws Exception
-   */
-  public void initialize(Class<K> keyClass, Class<T> persistentClass, 
Properties properties) throws Exception {
-    this.keyClass = keyClass;
-
-    // get cassandra mapping with persistent class
-    this.persistentClass = persistentClass;
-    this.cassandraMapping = 
CassandraMappingManager.getManager().get(persistentClass);
-    Map<String, String> accessMap = null;
-    if (properties != null) {
-      String username = properties
-          .getProperty("gora.cassandrastore.username");
-      if (username != null) {
-        accessMap = new HashMap<>();
-        accessMap.put("username", username);
-        String password = properties
-            .getProperty("gora.cassandrastore.password");
-        if (password != null) {
-          accessMap.put("password", password);
-        }
-      }
-    }
-
-    this.cluster = 
HFactory.getOrCreateCluster(this.cassandraMapping.getClusterName(), 
-        new CassandraHostConfigurator(this.cassandraMapping.getHostName()), 
accessMap);
-    
-    // add keyspace to cluster
-    checkKeyspace();
-    
-    // Just create a Keyspace object on the client side, corresponding to an 
already 
-    // existing keyspace with already created column families.
-    this.keyspace = 
HFactory.createKeyspace(this.cassandraMapping.getKeyspaceName(), this.cluster);
-    
-    this.keySerializer = GoraSerializerTypeInferer.getSerializer(keyClass);
-    if (this.keySerializer == null)
-      LOG.error("Serializer for " + keyClass + " not found.");
-    this.mutator = HFactory.createMutator(this.keyspace, this.keySerializer);
-  }
-
-  /**
-   * Check if keyspace already exists.
-   *
-   * @return if keyspace already exists return true.
-   */
-  public boolean keyspaceExists() {
-    KeyspaceDefinition keyspaceDefinition = 
this.cluster.describeKeyspace(this.cassandraMapping.getKeyspaceName());
-    return (keyspaceDefinition != null);
-  }
-
-  /**
-   * Check if keyspace already exists. If not, create it.
-   * In this method, we also utilize Hector's
-   * {@link me.prettyprint.cassandra.model.ConfigurableConsistencyLevel} logic.
-   * It is set by passing a
-   * {@link me.prettyprint.cassandra.model.ConfigurableConsistencyLevel} 
object right
-   * when the {@link me.prettyprint.hector.api.Keyspace} is created.
-   * If we cannot find a consistency level within <code>gora.properites</code>,
-   * then column family consistency level is set to QUORUM (by default) which 
permits
-   * consistency to wait for a quorum of replicas to respond regardless of 
data center.
-   * QUORUM is Hector Client's default setting and we respect that here as 
well.
-   *
-   * @see <a 
href="http://hector-client.github.io/hector/build/html/content/consistency_level.html";>Consistency
 Level</a>
-   */
-  public void checkKeyspace() {
-    // "describe keyspace <keyspaceName>;" query
-    KeyspaceDefinition keyspaceDefinition = 
this.cluster.describeKeyspace(this.cassandraMapping.getKeyspaceName());
-    if (keyspaceDefinition == null) {
-      List<ColumnFamilyDefinition> columnFamilyDefinitions = 
this.cassandraMapping.getColumnFamilyDefinitions();
-
-      // GORA-197
-      for (ColumnFamilyDefinition cfDef : columnFamilyDefinitions) {
-        cfDef.setComparatorType(ComparatorType.BYTESTYPE);
-      }
-
-      keyspaceDefinition = HFactory.createKeyspaceDefinition(
-        this.cassandraMapping.getKeyspaceName(), 
-        this.cassandraMapping.getKeyspaceReplicationStrategy(),
-        this.cassandraMapping.getKeyspaceReplicationFactor(),
-        columnFamilyDefinitions
-      );
-      
-      this.cluster.addKeyspace(keyspaceDefinition, true);
-      
-      // GORA-167 Create a customized Consistency Level
-      ConfigurableConsistencyLevel ccl = new ConfigurableConsistencyLevel();
-      Map<String, HConsistencyLevel> clmap = 
getConsisLevelForColFams(columnFamilyDefinitions);
-      // Column family consistency levels
-      ccl.setReadCfConsistencyLevels(clmap);
-      ccl.setWriteCfConsistencyLevels(clmap);
-      // Operations consistency levels
-      String opConsisLvl = (readOpConsLvl!=null && 
!readOpConsLvl.isEmpty())?readOpConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL;
-      
ccl.setDefaultReadConsistencyLevel(HConsistencyLevel.valueOf(opConsisLvl));
-      LOG.debug("Hector read consistency configured to '" + opConsisLvl + 
"'.");
-      opConsisLvl = (writeOpConsLvl!=null && 
!writeOpConsLvl.isEmpty())?writeOpConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL;
-      
ccl.setDefaultWriteConsistencyLevel(HConsistencyLevel.valueOf(opConsisLvl));
-      LOG.debug("Hector write consistency configured to '" + opConsisLvl + 
"'.");
-
-      HFactory.createKeyspace("Keyspace", this.cluster, ccl);
-      keyspaceDefinition = null;
-    }
-    else {
-      List<ColumnFamilyDefinition> cfDefs = keyspaceDefinition.getCfDefs();
-      if (cfDefs == null || cfDefs.size() == 0) {
-        LOG.warn(keyspaceDefinition.getName() + " does not have any column 
family.");
-      }
-      else {
-        for (ColumnFamilyDefinition cfDef : cfDefs) {
-          ComparatorType comparatorType = cfDef.getComparatorType();
-          if (! comparatorType.equals(ComparatorType.BYTESTYPE)) {
-            // GORA-197
-            LOG.warn("The comparator type of " + cfDef.getName() + " column 
family is " + comparatorType.getTypeName()
-              + ", not BytesType. It may cause a fatal error on column 
validation later.");
-          }
-          else {
-            LOG.debug("The comparator type of " + cfDef.getName() + " column 
family is " 
-              + comparatorType.getTypeName() + ".");
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Method in charge of setting the consistency level for defined column 
families.
-   * @param pColFams  Column families
-   * @return Map<String, HConsistencyLevel> with the mapping between colFams 
and consistency level.
-   */
-  private Map<String, HConsistencyLevel> 
getConsisLevelForColFams(List<ColumnFamilyDefinition> pColFams) {
-    Map<String, HConsistencyLevel> clMap = new HashMap<>();
-    // Get columnFamily consistency level.
-    String colFamConsisLvl = (colFamConsLvl != null && 
!colFamConsLvl.isEmpty())?colFamConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL;
-    LOG.debug("ColumnFamily consistency level configured to '" + 
colFamConsisLvl + "'.");
-    // Define consistency for ColumnFamily "ColumnFamily"
-    for (ColumnFamilyDefinition colFamDef : pColFams)
-      clMap.put(colFamDef.getName(), 
HConsistencyLevel.valueOf(colFamConsisLvl));
-    return clMap;
-  }
-  
-  /**
-   * Drop keyspace.
-   */
-  public void dropKeyspace() {
-    this.cluster.dropKeyspace(this.cassandraMapping.getKeyspaceName());
-  }
-
-  /**
-   * Insert a field in a column.
-   * @param key the row key
-   * @param fieldName the field name
-   * @param value the field value.
-   */
-  public void addColumn(K key, String fieldName, Object value) {
-    if (value == null) {
-       LOG.debug( "field:"+fieldName+", its value is null.");
-      return;
-    }
-
-    ByteBuffer byteBuffer = toByteBuffer(value);
-    String columnFamily = this.cassandraMapping.getFamily(fieldName);
-    String columnName = this.cassandraMapping.getColumn(fieldName);
-    
-    if (columnName == null) {
-       LOG.warn("Column name is null for field: " + fieldName );
-        return;
-    }
-      
-    if( LOG.isDebugEnabled() ) LOG.debug( "fieldName: "+fieldName +" 
columnName: " + columnName );
-    
-    String ttlAttr = this.cassandraMapping.getColumnsAttribs().get(columnName);
-    
-    if ( null == ttlAttr ){
-       ttlAttr = CassandraMapping.DEFAULT_COLUMNS_TTL;
-       if( LOG.isDebugEnabled() ) LOG.debug( "ttl was not set for field: " + 
fieldName + ". Using " + ttlAttr );
-    } else {
-       if( LOG.isDebugEnabled() ) LOG.debug( "ttl for field: " + fieldName + " 
is " + ttlAttr );
-    }
-
-    synchronized(mutator) {
-      HectorUtils.insertColumn(mutator, key, columnFamily, columnName, 
byteBuffer, ttlAttr);
-    }
-  }
-
-  /**
-   * Delete a row within the keyspace.
-   *
-   * @param key
-   * @param familyName
-   * @param columnName
-   */
-  public void deleteColumn(K key, String familyName, ByteBuffer columnName) {
-    synchronized(mutator) {
-      HectorUtils.deleteColumn(mutator, key, familyName, columnName);
-    }
-  }
-
-  /**
-   * Deletes an entry based on its key.
-   * @param key
-   */
-  public void deleteByKey(K key) {
-    Map<String, String> familyMap = this.cassandraMapping.getFamilyMap();
-    deleteColumn(key, familyMap.values().iterator().next(), null);
-  }
-
-  /**
-   * Insert a member in a super column. This is used for map and record Avro 
types.
-   * @param key the row key
-   * @param fieldName the field name
-   * @param columnName the column name (the member name, or the index of array)
-   * @param value the member value
-   */
-  public void addSubColumn(K key, String fieldName, ByteBuffer columnName, 
Object value) {
-    if (value == null) {
-      return;
-    }
-
-    ByteBuffer byteBuffer = toByteBuffer(value);
-    
-    String columnFamily = this.cassandraMapping.getFamily(fieldName);
-    String superColumnName = this.cassandraMapping.getColumn(fieldName);
-    String ttlAttr = 
this.cassandraMapping.getColumnsAttribs().get(superColumnName);
-    if ( null == ttlAttr ) {
-      ttlAttr = CassandraMapping.DEFAULT_COLUMNS_TTL;
-      if( LOG.isDebugEnabled() ) LOG.debug( "ttl was not set for field:" + 
fieldName + " .Using " + ttlAttr );
-    } else {
-      if( LOG.isDebugEnabled() ) LOG.debug( "ttl for field:" + fieldName + " 
is " + ttlAttr );
-    }
-
-    synchronized(mutator) {
-      HectorUtils.insertSubColumn(mutator, key, columnFamily, superColumnName, 
columnName, byteBuffer, ttlAttr);
-    }
-  }
-
-  /**
-   * Adds an subColumn inside the cassandraMapping file when a String is 
serialized
-   * @param key the row key
-   * @param fieldName the field name
-   * @param columnName the column name (the member name, or the index of array)
-   * @param value the member value
-   */
-  public void addSubColumn(K key, String fieldName, String columnName, Object 
value) {
-    addSubColumn(key, fieldName, 
StringSerializer.get().toByteBuffer(columnName), value);
-  }
-
-  /**
-   * Adds an subColumn inside the cassandraMapping file when an Integer is 
serialized
-   * @param key the row key
-   * @param fieldName the field name
-   * @param columnName the column name (the member name, or the index of array)
-   * @param value the member value
-   */
-  public void addSubColumn(K key, String fieldName, Integer columnName, Object 
value) {
-    addSubColumn(key, fieldName, 
IntegerSerializer.get().toByteBuffer(columnName), value);
-  }
-
-
-  /**
-   * Delete a member in a super column. This is used for map and record Avro 
types.
-   * @param key the row key
-   * @param fieldName the field name
-   * @param columnName the column name (the member name, or the index of array)
-   */
-  public void deleteSubColumn(K key, String fieldName, ByteBuffer columnName) {
-
-    String columnFamily = this.cassandraMapping.getFamily(fieldName);
-    String superColumnName = this.cassandraMapping.getColumn(fieldName);
-    
-    synchronized(mutator) {
-      HectorUtils.deleteSubColumn(mutator, key, columnFamily, superColumnName, 
columnName);
-    }
-  }
-
-  /**
-   * Deletes a subColumn 
-   * @param key
-   * @param fieldName
-   * @param columnName
-   */
-  public void deleteSubColumn(K key, String fieldName, String columnName) {
-    deleteSubColumn(key, fieldName, 
StringSerializer.get().toByteBuffer(columnName));
-  }
-
-  /**
-   * Deletes all subcolumns from a super column.
-   * @param key the row key.
-   * @param fieldName the field name.
-   */
-  public void deleteSubColumn(K key, String fieldName) {
-    String columnFamily = this.cassandraMapping.getFamily(fieldName);
-    String superColumnName = this.cassandraMapping.getColumn(fieldName);
-    synchronized(mutator) {
-      HectorUtils.deleteSubColumn(mutator, key, columnFamily, superColumnName, 
null);
-    }
-  }
-
-  public void deleteGenericArray(K key, String fieldName) {
-    //TODO Verify this. Everything that goes inside a genericArray will go 
inside a column so let's just delete that.
-    deleteColumn(key, cassandraMapping.getFamily(fieldName), 
toByteBuffer(fieldName));
-  }
-  
-  public void addGenericArray(K key, String fieldName, GenericArray<?> array) {
-    if (isSuper( cassandraMapping.getFamily(fieldName) )) {
-      int i= 0;
-      for (Object itemValue: array) {
-
-        // TODO: hack, do not store empty arrays
-        if (itemValue instanceof GenericArray<?>) {
-          if (((List<?>)itemValue).size() == 0) {
-            continue;
-          }
-        } else if (itemValue instanceof Map<?,?>) {
-          if (((Map<?, ?>)itemValue).size() == 0) {
-            continue;
-          }
-        }
-
-        addSubColumn(key, fieldName, i++, itemValue);
-      }
-    }
-    else {
-      addColumn(key, fieldName, array);
-    }
-  }
-
-  public void deleteStatefulHashMap(K key, String fieldName) {
-    if (isSuper( cassandraMapping.getFamily(fieldName) )) {
-      deleteSubColumn(key, fieldName);
-    } else {
-      deleteColumn(key, cassandraMapping.getFamily(fieldName), 
toByteBuffer(fieldName));
-    }
-  }
-
-  public void addStatefulHashMap(K key, String fieldName, 
Map<CharSequence,Object> map) {
-    if (isSuper( cassandraMapping.getFamily(fieldName) )) {
-      // as we don't know what has changed inside the map or If it's an empty 
map, then delete its content.
-      deleteSubColumn(key, fieldName);
-      // update if there is anything to update.
-      if (!map.isEmpty()) {
-        // If it's not empty, then update its content.
-        for (CharSequence mapKey: map.keySet()) {
-          // TODO: hack, do not store empty arrays
-          Object mapValue = map.get(mapKey);
-          if (mapValue instanceof GenericArray<?>) {
-            if (((List<?>)mapValue).size() == 0) {
-              continue;
-            }
-          } else if (mapValue instanceof Map<?,?>) {
-            if (((Map<?, ?>)mapValue).size() == 0) {
-              continue;
-            }
-          }
-          addSubColumn(key, fieldName, mapKey.toString(), mapValue);
-        }
-      }
-    }
-    else {
-      addColumn(key, fieldName, map);
-    }
-  }
-
-  /**
-   * Serialize value to ByteBuffer using 
-   * {@link 
org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer#getSerializer(Object)}.
-   * @param value the member value {@link java.lang.Object}.
-   * @return ByteBuffer object
-   */
-  public ByteBuffer toByteBuffer(Object value) {
-    ByteBuffer byteBuffer = null;
-    Serializer<Object> serializer = 
GoraSerializerTypeInferer.getSerializer(value);
-    if (serializer == null) {
-      LOG.warn("Serializer not found for: " + value.toString());
-    }
-    else {
-      LOG.debug(serializer.getClass() + " selected as appropriate 
Serializer.");
-      byteBuffer = serializer.toByteBuffer(value);
-    }
-    if (byteBuffer == null) {
-      LOG.warn("Serialization value for: " + value.getClass().getName() + " = 
null");
-    }
-    return byteBuffer;
-  }
-
-  /**
-   * Select a family column in the keyspace.
-   * @param cassandraQuery a wrapper of the query
-   * @param family the family name to be queried
-   * @return a list of family rows
-   */
-  public List<Row<K, ByteBuffer, ByteBuffer>> execute(CassandraQuery<K, T> 
cassandraQuery, String family) {
-    
-    String[] columnNames = cassandraQuery.getColumns(family);
-    ByteBuffer[] columnNameByteBuffers = new ByteBuffer[columnNames.length];
-    for (int i = 0; i < columnNames.length; i++) {
-      columnNameByteBuffers[i] = 
StringSerializer.get().toByteBuffer(columnNames[i]);
-    }
-    Query<K, T> query = cassandraQuery.getQuery();
-    int limit = (int) query.getLimit();
-    if (limit < 1) {
-      limit = Integer.MAX_VALUE;
-    }
-    K startKey = query.getStartKey();
-    K endKey = query.getEndKey();
-    
-    RangeSlicesQuery<K, ByteBuffer, ByteBuffer> rangeSlicesQuery = 
HFactory.createRangeSlicesQuery
-        (this.keyspace, this.keySerializer, ByteBufferSerializer.get(), 
ByteBufferSerializer.get());
-    rangeSlicesQuery.setColumnFamily(family);
-    rangeSlicesQuery.setKeys(startKey, endKey);
-    rangeSlicesQuery.setRange(ByteBuffer.wrap(new byte[0]), 
ByteBuffer.wrap(new byte[0]), false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
-    rangeSlicesQuery.setRowCount(limit);
-    rangeSlicesQuery.setColumnNames(columnNameByteBuffers);
-    
-    QueryResult<OrderedRows<K, ByteBuffer, ByteBuffer>> queryResult = 
rangeSlicesQuery.execute();
-    OrderedRows<K, ByteBuffer, ByteBuffer> orderedRows = queryResult.get();
-    
-    return orderedRows.getList();
-  }
-  
-  private String getMappingFamily(String pField){
-    String family = null;
-    // checking if it was a UNION field the one we are retrieving
-    if (pField.indexOf(CassandraStore.UNION_COL_SUFIX) > 0)
-      family = 
this.cassandraMapping.getFamily(pField.substring(0,pField.indexOf(CassandraStore.UNION_COL_SUFIX)));
-    else
-      family = this.cassandraMapping.getFamily(pField);
-     return family;
-   }
- 
-  private String getMappingColumn(String pField){
-    String column = null;
-    if (pField.indexOf(CassandraStore.UNION_COL_SUFIX) > 0)
-      column = pField;
-    else
-      column = this.cassandraMapping.getColumn(pField);
-      return column;
-    }
-
-  /**
-   * Select the families that contain at least one column mapped to a query 
field.
-   * @param query indicates the columns to select
-   * @return a map which keys are the family names and values the 
-   * corresponding column names required to get all the query fields.
-   */
-  public Map<String, List<String>> getFamilyMap(Query<K, T> query) {
-    Map<String, List<String>> map = new HashMap<>();
-    Schema persistentSchema = query.getDataStore().newPersistent().getSchema();
-    for (String field: query.getFields()) {
-      String family = this.getMappingFamily(field);
-      String column = this.getMappingColumn(field);
-      
-      // check if the family value was already initialized 
-      List<String> list = map.get(family);
-      if (list == null) {
-        list = new ArrayList<>();
-        map.put(family, list);
-      }
-      if (persistentSchema.getField(field).schema().getType() == Type.UNION)
-        list.add(field + CassandraStore.UNION_COL_SUFIX);
-      if (column != null) {
-        list.add(column);
-      }
-    }
-    
-    return map;
-  }
-
-  /**
-   * Retrieves the cassandraMapping which holds whatever was mapped 
-   * from the gora-cassandra-mapping.xml
-   * @return 
-   */
-  public CassandraMapping getCassandraMapping(){
-    return this.cassandraMapping;
-  }
-  
-  /**
-   * Select the field names according to the column names, which format 
-   * if fully qualified: "family:column"
-   * @param query
-   * @return a map which keys are the fully qualified column 
-   * names and values the query fields
-   */
-  public Map<String, String> getReverseMap(Query<K, T> query) {
-    Map<String, String> map = new HashMap<>();
-    Schema persistentSchema = query.getDataStore().newPersistent().getSchema();
-    for (String field: query.getFields()) {
-      String family = this.getMappingFamily(field);
-      String column = this.getMappingColumn(field);
-      if (persistentSchema.getField(field).schema().getType() == Type.UNION)
-        map.put(family + ":" + field + CassandraStore.UNION_COL_SUFIX, field + 
CassandraStore.UNION_COL_SUFIX);
-      map.put(family + ":" + column, field);
-    }
-    
-    return map;
-  }
-
-  /**
-   * Determines if a column is a superColumn or not.
-   * @param family
-   * @return boolean
-   */
-  public boolean isSuper(String family) {
-    return this.cassandraMapping.isSuper(family);
-  }
-
-  public List<SuperRow<K, String, ByteBuffer, ByteBuffer>> 
executeSuper(CassandraQuery<K, T> cassandraQuery, String family) {
-    String[] columnNames = cassandraQuery.getColumns(family);
-    Query<K, T> query = cassandraQuery.getQuery();
-    int limit = (int) query.getLimit();
-    if (limit < 1) {
-      limit = Integer.MAX_VALUE;
-    }
-    K startKey = query.getStartKey();
-    K endKey = query.getEndKey();
-    
-    RangeSuperSlicesQuery<K, String, ByteBuffer, ByteBuffer> 
rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery
-        (this.keyspace, this.keySerializer, StringSerializer.get(), 
ByteBufferSerializer.get(), ByteBufferSerializer.get());
-    rangeSuperSlicesQuery.setColumnFamily(family);    
-    rangeSuperSlicesQuery.setKeys(startKey, endKey);
-    rangeSuperSlicesQuery.setRange("", "", false, 
GoraRecordReader.BUFFER_LIMIT_READ_VALUE);
-    rangeSuperSlicesQuery.setRowCount(limit);
-    rangeSuperSlicesQuery.setColumnNames(columnNames);
-    
-    
-    QueryResult<OrderedSuperRows<K, String, ByteBuffer, ByteBuffer>> 
queryResult = rangeSuperSlicesQuery.execute();
-    OrderedSuperRows<K, String, ByteBuffer, ByteBuffer> orderedRows = 
queryResult.get();
-    return orderedRows.getList();
-
-
-  }
-
-  /**
-   * Obtain Schema/Keyspace name
-   * @return Keyspace
-   */
-  public String getKeyspaceName() {
-    return this.cassandraMapping.getKeyspaceName();
-  }
-}

http://git-wip-us.apache.org/repos/asf/gora/blob/cbed41d0/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
deleted file mode 100644
index b7a7087..0000000
--- 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java
+++ /dev/null
@@ -1,329 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gora.cassandra.store;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import me.prettyprint.cassandra.model.BasicColumnFamilyDefinition;
-import me.prettyprint.cassandra.service.ThriftCfDef;
-import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
-import me.prettyprint.hector.api.ddl.ColumnType;
-import me.prettyprint.hector.api.ddl.ComparatorType;
-
-import org.jdom.Element;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Mapping definitions for CouchDB.
- */
-public class CassandraMapping {
-  
-  public static final Logger LOG = 
LoggerFactory.getLogger(CassandraMapping.class);
-  
-  private static final String NAME_ATTRIBUTE = "name";
-  private static final String COLUMN_ATTRIBUTE = "qualifier";
-  private static final String FAMILY_ATTRIBUTE = "family";
-  private static final String SUPER_ATTRIBUTE = "type";
-  private static final String CLUSTER_ATTRIBUTE = "cluster";
-  private static final String HOST_ATTRIBUTE = "host";
-
-  private static final String GCGRACE_SECONDS_ATTRIBUTE = "gc_grace_seconds";
-  private static final String COLUMNS_TTL_ATTRIBUTE = "ttl";
-  private static final String REPLICATION_FACTOR_ATTRIBUTE = 
"replication_factor";     
-  private static final String REPLICATION_STRATEGY_ATTRIBUTE = 
"placement_strategy";
-  
-  public static final String DEFAULT_REPLICATION_FACTOR = "1";          
-  public static final String DEFAULT_REPLICATION_STRATEGY = 
"org.apache.cassandra.locator.SimpleStrategy";
-  public static final String DEFAULT_COLUMNS_TTL = "0";
-  public static final String DEFAULT_GCGRACE_SECONDS = "0";
-
-  private String hostName;
-  private String clusterName;
-  private String keyspaceName;
-  private String keyspaceStrategy;
-  private int   keyspaceRF;
-  
-  
-  /**
-   * List of the super column families.
-   */
-  private List<String> superFamilies = new ArrayList<>();
-
-  /**
-   * Look up the column family associated to the Avro field.
-   */
-  private Map<String, String> familyMap = new HashMap<>();
-  
-  /**
-   * Look up the column associated to the Avro field.
-   */
-  private Map<String, String> columnMap = new HashMap<>();
-
-  /**
-   * Helps storing attributes defined for each field.
-   */
-  private Map<String, String> columnAttrMap = new HashMap<>();
-  
-  /**
-   * Look up the column family from its name.
-   */
-  private Map<String, BasicColumnFamilyDefinition> columnFamilyDefinitions = 
-                 new HashMap<>();
-
-  
-  /**
-   * Simply gets the Cassandra host name.
-   * @return hostName
-   */
-  public String getHostName() {
-    return this.hostName;
-  }
-  
-  /**
-   * Simply gets the Cassandra cluster (the machines (nodes) 
-   * in a logical Cassandra instance) name.
-   * Clusters can contain multiple keyspaces. 
-   * @return clusterName
-   */
-  public String getClusterName() {
-    return this.clusterName;
-  }
-
-  /**
-   * Simply gets the Cassandra namespace for ColumnFamilies, typically one per 
application
-   * @return
-   */
-  public String getKeyspaceName() {
-    return this.keyspaceName;
-  }
-  
-  /**
-   * gets the replication strategy
-   * @return string class name to be used for strategy
-   */
-  public String getKeyspaceReplicationStrategy() {
-       return this.keyspaceStrategy;
-  }
-  
-  /**
-   * gets the replication factor
-   * @return int replication factor
-   */
-  public int getKeyspaceReplicationFactor() {
-       return this.keyspaceRF;
-  }
-
-  /**
-   * Primary class for loading Cassandra configuration from the 'MAPPING_FILE'.
-   * It should be noted that should the "qualifier" attribute and its 
associated
-   * value be absent from class field definition, it will automatically be set 
to 
-   * the field name value.
-   * 
-   */
-  @SuppressWarnings("unchecked")
-  public CassandraMapping(Element keyspace, Element mapping) {
-    if (keyspace == null) {
-      LOG.error("Keyspace element should not be null!");
-      return;
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Located Cassandra Keyspace");
-      }
-    }
-    this.keyspaceName = keyspace.getAttributeValue(NAME_ATTRIBUTE);
-    if (this.keyspaceName == null) {
-      LOG.error("Error locating Cassandra Keyspace name attribute!");
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Located Cassandra Keyspace name: '" + keyspaceName + "'");
-      }
-    }
-    this.clusterName = keyspace.getAttributeValue(CLUSTER_ATTRIBUTE);
-    if (this.clusterName == null) {
-       LOG.error("Error locating Cassandra Keyspace cluster attribute!");
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Located Cassandra Keyspace cluster: '" + clusterName + "'");
-      }
-    }
-    this.hostName = keyspace.getAttributeValue(HOST_ATTRIBUTE);
-    if (this.hostName == null) {
-       LOG.error("Error locating Cassandra Keyspace host attribute!");
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Located Cassandra Keyspace host: '" + hostName + "'");
-      }  
-    }
-    
-    // setting replication strategy
-    this.keyspaceStrategy = keyspace.getAttributeValue( 
REPLICATION_STRATEGY_ATTRIBUTE );
-    if( null == this.keyspaceStrategy ) {
-       this.keyspaceStrategy = DEFAULT_REPLICATION_STRATEGY;
-    }
-       if( LOG.isDebugEnabled() ) {
-               LOG.debug( "setting Keyspace replication strategy to " + 
this.keyspaceStrategy );
-       }
-
-       // setting replication factor
-       String tmp = keyspace.getAttributeValue( REPLICATION_FACTOR_ATTRIBUTE 
);        
-       if( null == tmp ) {
-               tmp = DEFAULT_REPLICATION_FACTOR;
-       }
-       this.keyspaceRF = Integer.parseInt( tmp );
-       if( LOG.isDebugEnabled() ) {
-               LOG.debug( "setting Keyspace replication factor to " + 
this.keyspaceRF );
-       }
-
-    
-    // load column family definitions
-    List<Element> elements = keyspace.getChildren();
-    for (Element element: elements) {
-      BasicColumnFamilyDefinition cfDef = new BasicColumnFamilyDefinition();
-      
-      String familyName = element.getAttributeValue(NAME_ATTRIBUTE);
-      if (familyName == null) {
-       LOG.error("Error locating column family name attribute!");
-       continue;
-      } else {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Located column family: '" + familyName + "'" );
-        }
-      }
-      String gcgrace_scs = 
element.getAttributeValue(GCGRACE_SECONDS_ATTRIBUTE);
-      if (gcgrace_scs == null) {
-        LOG.warn("Error locating gc_grace_seconds attribute for '" + 
familyName + "' column family");
-        LOG.warn("Using gc_grace_seconds default value which is: " + 
DEFAULT_GCGRACE_SECONDS 
-                       + " and is viable ONLY FOR A SINGLE NODE CLUSTER");
-        LOG.warn("please update the gora-cassandra-mapping.xml file to avoid 
seeing this warning");
-      } else {
-        if (LOG.isDebugEnabled()) {
-        LOG.debug("Located gc_grace_seconds: '" + gcgrace_scs + "'" );
-        }
-      }
-
-      String superAttribute = element.getAttributeValue(SUPER_ATTRIBUTE);
-      if (superAttribute != null) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Located super column family");
-        }
-        this.superFamilies.add(familyName);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Added super column family: '" + familyName + "'");
-        }
-        cfDef.setColumnType(ColumnType.SUPER);
-        cfDef.setSubComparatorType(ComparatorType.BYTESTYPE);
-      }
-
-      cfDef.setKeyspaceName(this.keyspaceName);
-      cfDef.setName(familyName);
-      cfDef.setComparatorType(ComparatorType.BYTESTYPE);
-      cfDef.setDefaultValidationClass(ComparatorType.BYTESTYPE.getClassName());
-
-      cfDef.setGcGraceSeconds(Integer.parseInt( 
gcgrace_scs!=null?gcgrace_scs:DEFAULT_GCGRACE_SECONDS));
-      this.columnFamilyDefinitions.put(familyName, cfDef);
-
-    }
-    
-    // load column definitions    
-    elements = mapping.getChildren();
-    for (Element element: elements) {
-      String fieldName = element.getAttributeValue(NAME_ATTRIBUTE);
-      String familyName = element.getAttributeValue(FAMILY_ATTRIBUTE);
-      String columnName = element.getAttributeValue(COLUMN_ATTRIBUTE);
-      String ttlValue = element.getAttributeValue(COLUMNS_TTL_ATTRIBUTE);
-      if (fieldName == null) {
-       LOG.error("Field name is not declared.");
-        continue;
-      }
-      if (familyName == null) {
-        LOG.error("Family name is not declared for \"" + fieldName + "\" 
field.");
-        continue;
-      }
-      if (columnName == null) {
-        LOG.warn("Column name (qualifier) is not declared for \"" + fieldName 
+ "\" field.");
-        columnName = fieldName;
-      }
-      if (ttlValue == null) {
-        LOG.warn("TTL value is not defined for \"" + fieldName + "\" field. \n 
Using default value: " + DEFAULT_COLUMNS_TTL);
-      }
-
-      BasicColumnFamilyDefinition columnFamilyDefinition = 
this.columnFamilyDefinitions.get(familyName);
-      if (columnFamilyDefinition == null) {
-        LOG.warn("Family " + familyName + " was not declared in the 
keyspace.");
-      }
-
-      this.familyMap.put(fieldName, familyName);
-      this.columnMap.put(fieldName, columnName);
-      // TODO we should find a way of storing more values into this map
-      this.columnAttrMap.put(columnName, 
ttlValue!=null?ttlValue:DEFAULT_COLUMNS_TTL);
-    }
-  }
-
-  /**
-   * Add new column to the CassandraMapping using the the below parameters
-   * @param pFamilyName the column family name
-   * @param pFieldName the Avro field from the Schema
-   * @param pColumnName the column name within the column family.
-   */
-  public void addColumn(String pFamilyName, String pFieldName, String 
pColumnName){
-    this.familyMap.put(pFieldName, pFamilyName);
-    this.columnMap.put(pFieldName, pColumnName);
-  }
-
-  public String getFamily(String name) {
-    return this.familyMap.get(name);
-  }
-
-  public String getColumn(String name) {
-    return this.columnMap.get(name);
-  }
-
-  public Map<String,String> getFamilyMap(){
-    return this.familyMap;
-  }
-
-  public Map<String, String> getColumnsAttribs(){
-    return this.columnAttrMap;
-  }
-
-  /**
-   * Read family super attribute.
-   * @param family the family name
-   * @return true is the family is a super column family
-   */
-  public boolean isSuper(String family) {
-    return this.superFamilies.indexOf(family) != -1;
-  }
-
-  public List<ColumnFamilyDefinition> getColumnFamilyDefinitions() {
-    List<ColumnFamilyDefinition> list = new ArrayList<>();
-    for (String key: this.columnFamilyDefinitions.keySet()) {
-      ColumnFamilyDefinition columnFamilyDefinition = 
this.columnFamilyDefinitions.get(key);
-      ThriftCfDef thriftCfDef = new ThriftCfDef(columnFamilyDefinition);
-      list.add(thriftCfDef);
-    }
-    
-    return list;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/gora/blob/cbed41d0/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java
 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java
deleted file mode 100644
index 6b46eec..0000000
--- 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gora.cassandra.store;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.jdom.Document;
-import org.jdom.Element;
-import org.jdom.JDOMException;
-import org.jdom.input.SAXBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A builder for creating the mapper.
- */
-public class CassandraMappingManager {
-  
-  public static final Logger LOG = 
LoggerFactory.getLogger(CassandraMappingManager.class);
-  
-  private static final String MAPPING_FILE = "gora-cassandra-mapping.xml";
-  private static final String KEYSPACE_ELEMENT = "keyspace";
-  private static final String NAME_ATTRIBUTE = "name";
-  private static final String MAPPING_ELEMENT = "class";
-  private static final String KEYCLASS_ATTRIBUTE = "keyClass";
-  private static final String HOST_ATTRIBUTE = "host";
-  private static final String CLUSTER_ATTRIBUTE = "cluster";
-  // singleton
-  private static CassandraMappingManager manager = new 
CassandraMappingManager();
-
-  public static CassandraMappingManager getManager() {
-    return manager;
-  }
-
- /**
-  * Objects to maintain mapped keyspaces
-  */
-  private Map<String, Element> keyspaceMap = null;
-  private Map<String, Element>  mappingMap = null;
-
-  private CassandraMappingManager() {
-    keyspaceMap = new HashMap<>();
-    mappingMap  = new HashMap<>();
-    try {
-      loadConfiguration();
-    }
-    catch (JDOMException | IOException e) {
-      LOG.error(e.toString());
-    }
-  }
-
-  public CassandraMapping get(Class<?> persistentClass) {
-    String className = persistentClass.getName();
-    Element mappingElement = mappingMap.get(className);
-    if (mappingElement == null) {
-      LOG.error("Mapping element does not exist for className=" + className);
-      return null;
-    }
-    String keyspaceName = mappingElement.getAttributeValue(KEYSPACE_ELEMENT);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("persistentClassName=" + className + " -> keyspaceName=" + 
keyspaceName);
-    }
-    Element keyspaceElement = keyspaceMap.get(keyspaceName);
-    if (keyspaceElement == null) {
-      LOG.error("Keyspace element does not exist for keyspaceName=" + 
keyspaceName);
-      return null;
-    }
-    return new CassandraMapping(keyspaceElement, mappingElement);
-  }
-
-  /**
-   * Primary class for loading Cassandra configuration from the 'MAPPING_FILE'.
-   * 
-   * @throws JDOMException
-   * @throws IOException
-   */
-  @SuppressWarnings("unchecked")
-  public void loadConfiguration() throws JDOMException, IOException {
-    SAXBuilder saxBuilder = new SAXBuilder();
-    // get mapping file
-    InputStream inputStream = 
getClass().getClassLoader().getResourceAsStream(MAPPING_FILE);
-    if (inputStream == null){
-      LOG.warn("Mapping file '" + MAPPING_FILE + "' could not be found!");
-      throw new IOException("Mapping file '" + MAPPING_FILE + "' could not be 
found!");
-    }
-    Document document = saxBuilder.build(inputStream);
-    if (document == null) {
-      LOG.warn("Mapping file '" + MAPPING_FILE + "' could not be found!");
-      throw new IOException("Mapping file '" + MAPPING_FILE + "' could not be 
found!");
-    }
-    Element root = document.getRootElement();
-    // find cassandra keyspace element
-    List<Element> keyspaces = root.getChildren(KEYSPACE_ELEMENT);
-    if (keyspaces == null || keyspaces.size() == 0) {
-      LOG.error("Error locating Cassandra Keyspace element!");
-    }
-    else {
-      for (Element keyspace : keyspaces) {
-        // log name, cluster and host for given keyspace(s)
-        String keyspaceName = keyspace.getAttributeValue(NAME_ATTRIBUTE);
-        String clusterName = keyspace.getAttributeValue(CLUSTER_ATTRIBUTE);
-        String hostName = keyspace.getAttributeValue(HOST_ATTRIBUTE);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Located Cassandra Keyspace: '" + keyspaceName + "' in 
cluster '" + clusterName + 
-          "' on host '" + hostName + "'.");
-        }
-        if (keyspaceName == null) {
-          LOG.error("Error locating Cassandra Keyspace name attribute!");
-          continue;
-        }
-        keyspaceMap.put(keyspaceName, keyspace);
-      }
-    }
-      
-    // load column definitions    
-    List<Element> mappings = root.getChildren(MAPPING_ELEMENT);
-    if (mappings == null || mappings.size() == 0) {
-      LOG.error("Error locating Cassandra Mapping class element!");
-    }
-    else {
-      for (Element mapping : mappings) {
-        // associate persistent and class names for keyspace(s)
-        String className = mapping.getAttributeValue(NAME_ATTRIBUTE);
-        String keyClassName = mapping.getAttributeValue(KEYCLASS_ATTRIBUTE);
-        String keyspaceName = mapping.getAttributeValue(KEYSPACE_ELEMENT);
-        if (LOG.isDebugEnabled()) {
-        LOG.debug("Located Cassandra Mapping: keyClass: '" + keyClassName + "' 
in storage class '" 
-          + className + "' for Keyspace '" + keyspaceName + "'.");
-        }
-        if (className == null) {
-          LOG.error("Error locating Cassandra Mapping class name attribute!");
-          continue;
-        }
-        mappingMap.put(className, mapping);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/gora/blob/cbed41d0/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
deleted file mode 100644
index 5f21aca..0000000
--- 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
+++ /dev/null
@@ -1,668 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gora.cassandra.store;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.Collections;
-import java.util.concurrent.ConcurrentHashMap;
-
-import me.prettyprint.hector.api.beans.ColumnSlice;
-import me.prettyprint.hector.api.beans.HColumn;
-import me.prettyprint.hector.api.beans.HSuperColumn;
-import me.prettyprint.hector.api.beans.Row;
-import me.prettyprint.hector.api.beans.SuperRow;
-import me.prettyprint.hector.api.beans.SuperSlice;
-
-import org.apache.avro.Schema;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Type;
-import org.apache.avro.generic.GenericArray;
-import org.apache.avro.generic.GenericData.Array;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.specific.SpecificData;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.gora.cassandra.query.CassandraQuery;
-import org.apache.gora.cassandra.query.CassandraResult;
-import org.apache.gora.cassandra.query.CassandraResultSet;
-import org.apache.gora.cassandra.query.CassandraRow;
-import org.apache.gora.cassandra.query.CassandraSubColumn;
-import org.apache.gora.cassandra.query.CassandraSuperColumn;
-import org.apache.gora.persistency.Persistent;
-import org.apache.gora.persistency.impl.DirtyListWrapper;
-import org.apache.gora.persistency.impl.PersistentBase;
-import org.apache.gora.query.PartitionQuery;
-import org.apache.gora.query.Query;
-import org.apache.gora.query.Result;
-import org.apache.gora.query.impl.PartitionQueryImpl;
-import org.apache.gora.store.DataStoreFactory;
-import org.apache.gora.store.impl.DataStoreBase;
-import org.apache.gora.cassandra.serializers.AvroSerializerUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * {@link org.apache.gora.cassandra.store.CassandraStore} is the primary class
- * responsible for directing Gora CRUD operations into Cassandra. We 
(delegate) rely
- * heavily on {@link org.apache.gora.cassandra.store.CassandraClient} for many 
operations
- * such as initialization, creating and deleting schemas (Cassandra 
Keyspaces), etc.
- */
-public class CassandraStore<K, T extends PersistentBase> extends 
DataStoreBase<K, T> {
-
-  /** Logging implementation */
-  public static final Logger LOG = 
LoggerFactory.getLogger(CassandraStore.class);
-
-  /** Consistency property level for Cassandra column families */
-  private static final String COL_FAM_CL = "cf.consistency.level";
-   
-  /** Consistency property level for Cassandra read operations. */
-  private static final String READ_OP_CL = "read.consistency.level";
-  
-  /** Consistency property level for Cassandra write operations. */
-  private static final String WRITE_OP_CL = "write.consistency.level";
-  
-  /** Variables to hold different consistency levels defined by the 
properties. */
-  public static String colFamConsLvl;
-  public static String readOpConsLvl;
-  public static String writeOpConsLvl;
-  
-  private CassandraClient<K, T> cassandraClient = new CassandraClient<>();
-
-  /**
-   * Fixed string with value "UnionIndex" used to generate an extra column 
based on 
-   * the original field's name
-   */
-  public static final String UNION_COL_SUFIX = "_UnionIndex";
-
-  /**
-   * Default schema index with value "0" used when AVRO Union data types are 
stored
-   */
-  public static final int DEFAULT_UNION_SCHEMA = 0;
-
-  /**
-   * The values are Avro fields pending to be stored.
-   *
-   * We want to iterate over the keys in insertion order.
-   * We don't want to lock the entire collection before iterating over the 
keys, 
-   * since in the meantime other threads are adding entries to the map.
-   */
-  private Map<K, T> buffer = Collections.synchronizedMap(new LinkedHashMap<K, 
T>());
-
-  public static final ThreadLocal<BinaryEncoder> encoders =
-      new ThreadLocal<>();
-  
-  /**
-   * Create a {@link java.util.concurrent.ConcurrentHashMap} for the 
-   * datum readers and writers. 
-   * This is necessary because they are not thread safe, at least not before 
-   * Avro 1.4.0 (See AVRO-650).
-   * When they are thread safe, it is possible to maintain a single reader and
-   * writer pair for every schema, instead of one for every thread.
-   * @see <a href="https://issues.apache.org/jira/browse/AVRO-650";>AVRO-650</a>
-   */
-  public static final ConcurrentHashMap<String, SpecificDatumWriter<?>> 
writerMap = 
-      new ConcurrentHashMap<>();
-  
-  /** The default constructor for CassandraStore */
-  public CassandraStore() throws Exception {
-  }
-
-  /** 
-   * Initialize is called when then the call to 
-   * {@link org.apache.gora.store.DataStoreFactory#createDataStore}
-   * is made. In this case, we merely delegate the store initialization to the 
-   * {@link org.apache.gora.cassandra.store.CassandraClient#initialize}.
-   *
-   * @param keyClass
-   * @param persistent
-   * @param properties
-   */
-  public void initialize(Class<K> keyClass, Class<T> persistent, Properties 
properties) {
-    try {
-      super.initialize(keyClass, persistent, properties);
-      if (autoCreateSchema) {
-        // If this is not set, then each Cassandra client should set its 
default
-        // column family
-        colFamConsLvl = DataStoreFactory.findProperty(properties, this, 
COL_FAM_CL, null);
-        // operations
-        readOpConsLvl = DataStoreFactory.findProperty(properties, this, 
READ_OP_CL, null);
-        writeOpConsLvl = DataStoreFactory.findProperty(properties, this, 
WRITE_OP_CL, null);
-      }
-      this.cassandraClient.initialize(keyClass, persistent, properties);
-    } catch (Exception e) {
-      LOG.error(e.getMessage(), e);
-    }
-  }
-
-  @Override
-  public void close() {
-    LOG.debug("close");
-    flush();
-  }
-
-  @Override
-  public void createSchema() {
-    LOG.debug("creating Cassandra keyspace");
-    this.cassandraClient.checkKeyspace();
-  }
-
-  @Override
-  public boolean delete(K key) {
-    this.cassandraClient.deleteByKey(key);
-    return true;
-  }
-
-  @Override
-  public long deleteByQuery(Query<K, T> query) {
-    LOG.debug("delete by query " + query);
-    return 0;
-  }
-
-  @Override
-  public void deleteSchema() {
-    LOG.debug("delete schema");
-    this.cassandraClient.dropKeyspace();
-  }
-
-  /**
-   * When executing Gora Queries in Cassandra we query the Cassandra keyspace 
by families.
-   * When we add sub/supercolumns, Gora keys are mapped to Cassandra partition 
keys only. 
-   * This is because we follow the Cassandra logic where column family data is 
-   * partitioned across nodes based on row Key.
-   */
-  @Override
-  public Result<K, T> execute(Query<K, T> query) {
-
-    Map<String, List<String>> familyMap = 
this.cassandraClient.getFamilyMap(query);
-    Map<String, String> reverseMap = this.cassandraClient.getReverseMap(query);
-
-    CassandraQuery<K, T> cassandraQuery = new CassandraQuery<>();
-    cassandraQuery.setQuery(query);
-    cassandraQuery.setFamilyMap(familyMap);
-
-    CassandraResult<K, T> cassandraResult = new CassandraResult<>(this, query);
-    cassandraResult.setReverseMap(reverseMap);
-
-    CassandraResultSet<K> cassandraResultSet = new CassandraResultSet<>();
-
-    // We query Cassandra keyspace by families.
-    for (String family : familyMap.keySet()) {
-      if (family == null) {
-        continue;
-      }
-      if (this.cassandraClient.isSuper(family)) {
-        addSuperColumns(family, cassandraQuery, cassandraResultSet);
-
-      } else {
-        addSubColumns(family, cassandraQuery, cassandraResultSet);
-      }
-    }
-
-    cassandraResult.setResultSet(cassandraResultSet);
-
-    return cassandraResult;
-  }
-
-  /**
-   * When we add subcolumns, Gora keys are mapped to Cassandra partition keys 
only. 
-   * This is because we follow the Cassandra logic where column family data is 
-   * partitioned across nodes based on row Key.
-   */
-  private void addSubColumns(String family, CassandraQuery<K, T> 
cassandraQuery,
-      CassandraResultSet<K> cassandraResultSet) {
-    // select family columns that are included in the query
-    List<Row<K, ByteBuffer, ByteBuffer>> rows = 
this.cassandraClient.execute(cassandraQuery, family);
-
-    for (Row<K, ByteBuffer, ByteBuffer> row : rows) {
-      K key = row.getKey();
-
-      // find associated row in the resultset
-      CassandraRow<K> cassandraRow = cassandraResultSet.getRow(key);
-      if (cassandraRow == null) {
-        cassandraRow = new CassandraRow<>();
-        cassandraResultSet.putRow(key, cassandraRow);
-        cassandraRow.setKey(key);
-      }
-
-      ColumnSlice<ByteBuffer, ByteBuffer> columnSlice = row.getColumnSlice();
-
-      for (HColumn<ByteBuffer, ByteBuffer> hColumn : columnSlice.getColumns()) 
{
-        CassandraSubColumn cassandraSubColumn = new CassandraSubColumn();
-        cassandraSubColumn.setValue(hColumn);
-        cassandraSubColumn.setFamily(family);
-        cassandraRow.add(cassandraSubColumn);
-      }
-
-    }
-  }
-
-  /**
-   * When we add supercolumns, Gora keys are mapped to Cassandra partition 
keys only. 
-   * This is because we follow the Cassandra logic where column family data is 
-   * partitioned across nodes based on row Key.
-   */
-  private void addSuperColumns(String family, CassandraQuery<K, T> 
cassandraQuery, 
-      CassandraResultSet<K> cassandraResultSet) {
-
-    List<SuperRow<K, String, ByteBuffer, ByteBuffer>> superRows = 
this.cassandraClient.executeSuper(cassandraQuery, family);
-    for (SuperRow<K, String, ByteBuffer, ByteBuffer> superRow: superRows) {
-      K key = superRow.getKey();
-      CassandraRow<K> cassandraRow = cassandraResultSet.getRow(key);
-      if (cassandraRow == null) {
-        cassandraRow = new CassandraRow<>();
-        cassandraResultSet.putRow(key, cassandraRow);
-        cassandraRow.setKey(key);
-      }
-
-      SuperSlice<String, ByteBuffer, ByteBuffer> superSlice = 
superRow.getSuperSlice();
-      for (HSuperColumn<String, ByteBuffer, ByteBuffer> hSuperColumn: 
superSlice.getSuperColumns()) {
-        CassandraSuperColumn cassandraSuperColumn = new CassandraSuperColumn();
-        cassandraSuperColumn.setValue(hSuperColumn);
-        cassandraSuperColumn.setFamily(family);
-        cassandraRow.add(cassandraSuperColumn);
-      }
-    }
-  }
-
-  /**
-   * Flush the buffer which is a synchronized {@link java.util.LinkedHashMap}
-   * storing fields pending to be stored by 
-   * {@link org.apache.gora.cassandra.store.CassandraStore#put(Object, 
PersistentBase)}
-   * operations. Invoking this method therefore writes the buffered rows
-   * into Cassandra.
-   * @see org.apache.gora.store.DataStore#flush()
-   */
-  @Override
-  public void flush() {
-
-    Set<K> keys = this.buffer.keySet();
-
-    // this duplicates memory footprint
-    @SuppressWarnings("unchecked")
-    K[] keyArray = (K[]) keys.toArray();
-
-    // iterating over the key set directly would throw 
-    //ConcurrentModificationException with java.util.HashMap and subclasses
-    for (K key: keyArray) {
-      T value = this.buffer.get(key);
-      if (value == null) {
-        LOG.info("Value to update is null for key: " + key);
-        continue;
-      }
-      Schema schema = value.getSchema();
-
-      for (Field field: schema.getFields()) {
-        if (value.isDirty(field.pos())) {
-          addOrUpdateField(key, field, field.schema(), value.get(field.pos()));
-        }
-      }
-    }
-
-    // remove flushed rows from the buffer as all 
-    // added or updated fields should now have been written.
-    for (K key: keyArray) {
-      this.buffer.remove(key);
-    }
-  }
-
-  @Override
-  public T get(K key, String[] fields) {
-    CassandraQuery<K,T> query = new CassandraQuery<>();
-    query.setDataStore(this);
-    query.setKeyRange(key, key);
-    
-    if (fields == null){
-      fields = this.getFields();
-    }
-    query.setFields(fields);
-
-    query.setLimit(1);
-    Result<K,T> result = execute(query);
-    boolean hasResult = false;
-    try {
-      hasResult = result.next();
-    } catch (Exception e) {
-      LOG.error(e.getMessage());
-      throw new RuntimeException(e);
-    }
-    return hasResult ? result.get() : null;
-  }
-
-  @Override
-  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query)
-      throws IOException {
-    // TODO GORA-298 Implement CassandraStore#getPartitions
-    List<PartitionQuery<K,T>> partitions = new ArrayList<>();
-    PartitionQueryImpl<K, T> pqi = new PartitionQueryImpl<>(query);
-    pqi.setConf(getConf());
-    partitions.add(pqi);
-    return partitions;
-  }
-
-  /**
-   * In Cassandra Schemas are referred to as Keyspaces
-   * @return Keyspace
-   */
-  @Override
-  public String getSchemaName() {
-    return this.cassandraClient.getKeyspaceName();
-  }
-
-  @Override
-  public Query<K, T> newQuery() {
-    Query<K,T> query = new CassandraQuery<>(this);
-    query.setFields(getFieldsToQuery(null));
-    return query;
-  }
-
-  /**
-   * When doing the
-   * {@link org.apache.gora.cassandra.store.CassandraStore#put(Object, 
PersistentBase)}
-   * operation, the logic is as follows:
-   * <ol>
-   * <li>Obtain the Avro {@link org.apache.avro.Schema} for the object.</li>
-   * <li>Create a new duplicate instance of the object (explained in more 
detail below) **.</li>
-   * <li>Obtain a {@link java.util.List} of the {@link org.apache.avro.Schema}
-   * {@link org.apache.avro.Schema.Field}'s.</li>
-   * <li>Iterate through the field {@link java.util.List}. This allows us to
-   * consequently process each item.</li>
-   * <li>Check to see if the {@link org.apache.avro.Schema.Field} is NOT dirty.
-   * If this condition is true then we DO NOT process this field.</li>
-   * <li>Obtain the element at the specified position in this list so we can
-   * directly operate on it.</li>
-   * <li>Obtain the {@link org.apache.avro.Schema.Type} of the element obtained
-   * above and process it accordingly. N.B. For nested type ARRAY, MAP
-   * RECORD or UNION, we shadow the checks in bullet point 5 above to infer 
that the
-   * {@link org.apache.avro.Schema.Field} is either at
-   * position 0 OR it is NOT dirty. If one of these conditions is true then we 
DO NOT
-   * process this field. This is carried out in
-   * {@link 
org.apache.gora.cassandra.store.CassandraStore#getFieldValue(Schema, Type, 
Object)}</li>
-   * <li>We then insert the Key and Object into the {@link 
java.util.LinkedHashMap} buffer
-   * before being flushed. This performs a structural modification of the 
map.</li>
-   * </ol>
-   * ** We create a duplicate instance of the object to be persisted and 
insert processed
-   * objects into a synchronized {@link java.util.LinkedHashMap}. This allows
-   * us to keep all the objects in memory till flushing.
-   *
-   * @param key   for the Avro Record (object).
-   * @param value Record object to be persisted in Cassandra
-   * @see 
org.apache.gora.store.DataStore#put(java.lang.Object,org.apache.gora.persistency.Persistent)
-   */
-  @Override
-  public void put(K key, T value) {
-    Schema schema = value.getSchema();
-    @SuppressWarnings("unchecked")
-    T p = (T) SpecificData.get().newRecord(value, schema);
-    List<Field> fields = schema.getFields();
-    for (int i = 1; i < fields.size(); i++) {
-      if (!value.isDirty(i)) {
-        continue;
-      }
-      Field field = fields.get(i);
-      Type type = field.schema().getType();
-      Object fieldValue = value.get(field.pos());
-      Schema fieldSchema = field.schema();
-      // check if field has a nested structure (array, map, record or union)
-      fieldValue = getFieldValue(fieldSchema, type, fieldValue);
-      p.put(field.pos(), fieldValue);
-    }
-    // this performs a structural modification of the map
-    this.buffer.put(key, p);
-  }
-
-  /**
-   * For every field within an object, we pass in a field schema, Type and 
value.
-   * This enables us to process fields (based on their characteristics) 
-   * preparing them for persistence.
-   * @param fieldSchema the associated field schema
-   * @param type the field type
-   * @param fieldValue the field value.
-   * @return
-   */
-  private Object getFieldValue(Schema fieldSchema, Type type, Object 
fieldValue ){
-    switch(type) {
-    case RECORD:
-      PersistentBase persistent = (PersistentBase) fieldValue;
-      PersistentBase newRecord = (PersistentBase) 
SpecificData.get().newRecord(persistent, persistent.getSchema());
-      for (Field member: fieldSchema.getFields()) {
-        if (member.pos() == 0 || !persistent.isDirty()) {
-          continue;
-        }
-        Schema memberSchema = member.schema();
-        Type memberType = memberSchema.getType();
-        Object memberValue = persistent.get(member.pos());
-        newRecord.put(member.pos(), getFieldValue(memberSchema, memberType, 
memberValue));
-      }
-      fieldValue = newRecord;
-      break;
-    case MAP:
-      Map<?, ?> map = (Map<?, ?>) fieldValue;
-      fieldValue = map;
-      break;
-    case ARRAY:
-      fieldValue = (List<?>) fieldValue;
-      break;
-    case UNION:
-      // storing the union selected schema, the actual value will 
-      // be stored as soon as we get break out.
-      if (fieldValue != null){
-        int schemaPos = getUnionSchema(fieldValue,fieldSchema);
-        Schema unionSchema = fieldSchema.getTypes().get(schemaPos);
-        Type unionType = unionSchema.getType();
-        fieldValue = getFieldValue(unionSchema, unionType, fieldValue);
-      }
-      //p.put( schemaPos, p.getSchema().getField(field.name() + 
CassandraStore.UNION_COL_SUFIX));
-      //p.put(fieldPos, fieldValue);
-      break;
-    default:
-      break;
-    }    
-    return fieldValue;
-  }
-  
-  /**
-   * Add a field to Cassandra according to its type.
-   * @param key     the key of the row where the field should be added
-   * @param field   the Avro field representing a datum
-   * @param schema  the schema belonging to the particular Avro field
-   * @param value   the field value
-   */
-  @SuppressWarnings({ "unchecked", "rawtypes" })
-  private void addOrUpdateField(K key, Field field, Schema schema, Object 
value) {
-    Type type = schema.getType();
-    // checking if the value to be updated is used for saving union schema
-    if (!field.name().contains(CassandraStore.UNION_COL_SUFIX)){
-      switch (type) {
-      case STRING:
-      case BOOLEAN:
-      case INT:
-      case LONG:
-      case BYTES:
-      case FLOAT:
-      case DOUBLE:
-      case FIXED:
-        this.cassandraClient.addColumn(key, field.name(), value);
-        break;
-      case RECORD:
-        if (value != null) {
-          if (value instanceof PersistentBase) {
-            PersistentBase persistentBase = (PersistentBase) value;            
-            try {
-              byte[] byteValue = AvroSerializerUtil.serializer(persistentBase, 
schema);
-              this.cassandraClient.addColumn(key, field.name(), byteValue);
-            } catch (IOException e) {
-              LOG.warn(field.name() + " named record could not be 
serialized.");
-            }
-          } else {
-            LOG.warn("Record with value: " + value.toString() + " not 
supported for field: " + field.name());
-          }
-        } else {
-          LOG.warn("Setting content of: " + field.name() + " to null.");
-          String familyName =  
this.cassandraClient.getCassandraMapping().getFamily(field.name());
-          this.cassandraClient.deleteColumn(key, familyName, 
this.cassandraClient.toByteBuffer(field.name()));
-        }
-        break;
-      case MAP:
-        if (value != null) {
-          if (value instanceof Map<?, ?>) {            
-            Map<CharSequence,Object> map = (Map<CharSequence,Object>)value;
-            Schema valueSchema = schema.getValueType();
-            Type valueType = valueSchema.getType();
-            if (Type.UNION.equals(valueType)){
-              Map<CharSequence,Object> valueMap = new HashMap<>();
-              for (CharSequence mapKey: map.keySet()) {
-                Object mapValue = map.get(mapKey);
-                int valueUnionIndex = getUnionSchema(mapValue, valueSchema);
-                valueMap.put((mapKey+UNION_COL_SUFIX), valueUnionIndex);
-                valueMap.put(mapKey, mapValue);
-              }
-              map = valueMap;
-            }
-            
-            String familyName = 
this.cassandraClient.getCassandraMapping().getFamily(field.name());
-            
-            // If map is not super column. We using Avro serializer. 
-            if (!this.cassandraClient.isSuper( familyName )){
-              try {
-                byte[] byteValue = AvroSerializerUtil.serializer(map, schema);
-                this.cassandraClient.addColumn(key, field.name(), byteValue);
-              } catch (IOException e) {
-                LOG.warn(field.name() + " named map could not be serialized.");
-              }
-            }else{
-              this.cassandraClient.addStatefulHashMap(key, field.name(), map); 
             
-            }
-          } else {
-            LOG.warn("Map with value: " + value.toString() + " not supported 
for field: " + field.name());
-          }
-        } else {
-          // delete map
-          LOG.warn("Setting content of: " + field.name() + " to null.");
-          this.cassandraClient.deleteStatefulHashMap(key, field.name());
-        }
-        break;
-      case ARRAY:
-        if (value != null) {
-          if (value instanceof DirtyListWrapper<?>) {
-            DirtyListWrapper fieldValue = (DirtyListWrapper<?>)value;
-            GenericArray valueArray = new Array(fieldValue.size(), schema);
-            for (int i = 0; i < fieldValue.size(); i++) {
-              valueArray.add(i, fieldValue.get(i));
-            }
-            this.cassandraClient.addGenericArray(key, field.name(), 
(GenericArray<?>)valueArray);
-          } else {
-            LOG.warn("Array with value: " + value.toString() + " not supported 
for field: " + field.name());
-          }
-        } else {
-          LOG.warn("Setting content of: " + field.name() + " to null.");
-          this.cassandraClient.deleteGenericArray(key, field.name());
-        }
-        break;
-      case UNION:
-     // adding union schema index
-        String columnName = field.name() + UNION_COL_SUFIX;
-        String familyName = 
this.cassandraClient.getCassandraMapping().getFamily(field.name());
-        if(value != null) {
-          int schemaPos = getUnionSchema(value, schema);
-          LOG.debug("Union with value: " + value.toString() + " at index: " + 
schemaPos + " supported for field: " + field.name());
-          this.cassandraClient.getCassandraMapping().addColumn(familyName, 
columnName, columnName);
-          if (this.cassandraClient.isSuper( familyName )){
-            this.cassandraClient.addSubColumn(key, columnName, columnName, 
schemaPos);
-          }else{
-            this.cassandraClient.addColumn(key, columnName, schemaPos);
-            
-          }
-          //this.cassandraClient.getCassandraMapping().addColumn(familyName, 
columnName, columnName);
-          // adding union value
-          Schema unionSchema = schema.getTypes().get(schemaPos);
-          addOrUpdateField(key, field, unionSchema, value);
-          //this.cassandraClient.addColumn(key, field.name(), value);
-        } else {
-          LOG.warn("Setting content of: " + field.name() + " to null.");
-          if (this.cassandraClient.isSuper( familyName )){
-            this.cassandraClient.deleteSubColumn(key, field.name());
-          } else {
-            this.cassandraClient.deleteColumn(key, familyName, 
this.cassandraClient.toByteBuffer(field.name()));
-          }
-        }
-        break;
-      default:
-        LOG.warn("Type: " + type.name() + " not considered for field: " + 
field.name() + ". Please report this to d...@gora.apache.org");
-      }
-    }
-  }
-
-  /**
-   * Given an object and the object schema this function obtains,
-   * from within the UNION schema, the position of the type used.
-   * If no data type can be inferred then we return a default value
-   * of position 0.
-   * @param pValue
-   * @param pUnionSchema
-   * @return the unionSchemaPosition.
-   */
-  private int getUnionSchema(Object pValue, Schema pUnionSchema){
-    int unionSchemaPos = 0;
-//    String valueType = pValue.getClass().getSimpleName();
-    for (Schema currentSchema : pUnionSchema.getTypes()) {
-      Type schemaType = currentSchema.getType();
-      if (pValue instanceof CharSequence && schemaType.equals(Type.STRING))
-        return unionSchemaPos;
-      else if (pValue instanceof ByteBuffer && schemaType.equals(Type.BYTES))
-        return unionSchemaPos;
-      else if (pValue instanceof Integer && schemaType.equals(Type.INT))
-        return unionSchemaPos;
-      else if (pValue instanceof Long && schemaType.equals(Type.LONG))
-        return unionSchemaPos;
-      else if (pValue instanceof Double && schemaType.equals(Type.DOUBLE))
-        return unionSchemaPos;
-      else if (pValue instanceof Float && schemaType.equals(Type.FLOAT))
-        return unionSchemaPos;
-      else if (pValue instanceof Boolean && schemaType.equals(Type.BOOLEAN))
-        return unionSchemaPos;
-      else if (pValue instanceof Map && schemaType.equals(Type.MAP))
-        return unionSchemaPos;
-      else if (pValue instanceof List && schemaType.equals(Type.ARRAY))
-        return unionSchemaPos;
-      else if (pValue instanceof Persistent && schemaType.equals(Type.RECORD))
-        return unionSchemaPos;
-      unionSchemaPos++;
-    }
-    // if we weren't able to determine which data type it is, then we return 
the default
-    return DEFAULT_UNION_SCHEMA;
-  }
-
-  /**
-   * Simple method to check if a Cassandra Keyspace exists.
-   * @return true if a Keyspace exists.
-   */
-  @Override
-  public boolean schemaExists() {
-    LOG.info("schema exists");
-    return cassandraClient.keyspaceExists();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/gora/blob/cbed41d0/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
deleted file mode 100644
index 3f33202..0000000
--- 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/HectorUtils.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gora.cassandra.store;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import me.prettyprint.cassandra.serializers.ByteBufferSerializer;
-import me.prettyprint.cassandra.serializers.IntegerSerializer;
-import me.prettyprint.cassandra.serializers.StringSerializer;
-import me.prettyprint.hector.api.beans.HColumn;
-import me.prettyprint.hector.api.beans.HSuperColumn;
-import me.prettyprint.hector.api.factory.HFactory;
-import me.prettyprint.hector.api.mutation.Mutator;
-
-import org.apache.gora.persistency.Persistent;
-
-/**
- * This class it not thread safe.
- * According to Hector's JavaDoc a Mutator isn't thread safe, too.
- * @see CassandraClient for safe usage.
- */
-public class HectorUtils<K,T extends Persistent> {
-
-  public static<K> void insertColumn(Mutator<K> mutator, K key, String 
columnFamily, ByteBuffer columnName, ByteBuffer columnValue, String ttlAttr) {
-    mutator.insert(key, columnFamily, createColumn(columnName, columnValue, 
ttlAttr));
-  }
-
-  public static<K> void insertColumn(Mutator<K> mutator, K key, String 
columnFamily, String columnName, ByteBuffer columnValue, String ttlAttr) {
-    mutator.insert(key, columnFamily, createColumn(columnName, columnValue, 
ttlAttr));
-  }
-
-
-  public static<K> HColumn<ByteBuffer,ByteBuffer> createColumn(ByteBuffer 
name, ByteBuffer value, String ttlAttr) {
-       int ttl = Integer.parseInt(ttlAttr);
-       HColumn<ByteBuffer,ByteBuffer> col = HFactory.createColumn(name, value, 
ByteBufferSerializer.get(), ByteBufferSerializer.get());
-       
-       if( 0 < ttl ) {
-               col.setTtl( ttl ); 
-       }
-       
-       return col;
-  }
-
-  public static<K> HColumn<String,ByteBuffer> createColumn(String name, 
ByteBuffer value, String ttlAttr) {
-       int ttl = Integer.parseInt(ttlAttr);
-       HColumn<String,ByteBuffer> col = HFactory.createColumn(name, value, 
StringSerializer.get(), ByteBufferSerializer.get());
-       
-       if( 0 < ttl ) {
-               col.setTtl( ttl );
-       }
-       
-       return col;
-  }
-
-  public static<K> HColumn<Integer,ByteBuffer> createColumn(Integer name, 
ByteBuffer value, String ttlAttr) {
-    int ttl = Integer.parseInt(ttlAttr);
-    HColumn<Integer,ByteBuffer> col = HFactory.createColumn(name, value, 
IntegerSerializer.get(), ByteBufferSerializer.get());
-    
-    if( 0 < ttl ) {
-       col.setTtl( ttl );
-    }
-    
-       return col;
-  }
-
-
-  public static<K> void insertSubColumn(Mutator<K> mutator, K key, String 
columnFamily, String superColumnName, ByteBuffer columnName, ByteBuffer 
columnValue, String ttlAttr) {
-    mutator.insert(key, columnFamily, createSuperColumn(superColumnName, 
columnName, columnValue, ttlAttr));
-  }
-
-  public static<K> void insertSubColumn(Mutator<K> mutator, K key, String 
columnFamily, String superColumnName, String columnName, ByteBuffer 
columnValue, String ttlAttr) {
-    mutator.insert(key, columnFamily, createSuperColumn(superColumnName, 
columnName, columnValue, ttlAttr));
-  }
-
-  public static<K> void insertSubColumn(Mutator<K> mutator, K key, String 
columnFamily, String superColumnName, Integer columnName, ByteBuffer 
columnValue, String ttlAttr) {
-    mutator.insert(key, columnFamily, createSuperColumn(superColumnName, 
columnName, columnValue, ttlAttr));
-  }
-
-
-  public static<K> void deleteSubColumn(Mutator<K> mutator, K key, String 
columnFamily, String superColumnName, ByteBuffer columnName) {
-    mutator.subDelete(key, columnFamily, superColumnName, columnName, 
StringSerializer.get(), ByteBufferSerializer.get());
-  }
-
-  public static<K> void deleteColumn(Mutator<K> mutator, K key, String 
columnFamily, ByteBuffer columnName){
-    mutator.delete(key, columnFamily, columnName, ByteBufferSerializer.get());
-  }
-
-  public static<K> HSuperColumn<String,ByteBuffer,ByteBuffer> 
createSuperColumn(String superColumnName, ByteBuffer columnName, ByteBuffer 
columnValue, String ttlAttr) {
-    return HFactory.createSuperColumn(superColumnName, 
Arrays.asList(createColumn(columnName, columnValue, ttlAttr)), 
StringSerializer.get(), ByteBufferSerializer.get(), ByteBufferSerializer.get());
-  }
-
-  public static<K> HSuperColumn<String,String,ByteBuffer> 
createSuperColumn(String superColumnName, String columnName, ByteBuffer 
columnValue, String ttlAttr) {
-    return HFactory.createSuperColumn(superColumnName, 
Arrays.asList(createColumn(columnName, columnValue, ttlAttr)), 
StringSerializer.get(), StringSerializer.get(), ByteBufferSerializer.get());
-  }
-
-  public static<K> HSuperColumn<String,Integer,ByteBuffer> 
createSuperColumn(String superColumnName, Integer columnName, ByteBuffer 
columnValue, String ttlAttr) {
-    return HFactory.createSuperColumn(superColumnName, 
Arrays.asList(createColumn(columnName, columnValue, ttlAttr)), 
StringSerializer.get(), IntegerSerializer.get(), ByteBufferSerializer.get());
-  }
-
-}

Reply via email to