http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/compiler/GoraCassandraNativeCompiler.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/compiler/GoraCassandraNativeCompiler.java
 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/compiler/GoraCassandraNativeCompiler.java
new file mode 100644
index 0000000..cd0a5d2
--- /dev/null
+++ 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/compiler/GoraCassandraNativeCompiler.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.cassandra.compiler;
+
+import org.apache.commons.io.FilenameUtils;
+import org.apache.gora.cassandra.bean.Field;
+import org.apache.gora.cassandra.store.CassandraMapping;
+import org.apache.gora.cassandra.store.CassandraMappingBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Locale;
+
+/**
+ * This class generate Java classes for Cassandra Native Serialization.
+ * <p>
+ * Generate specific Java classes for defined Gora cassandra mapping.
+ * Different from the @see org.apache.gora.compiler.GoraCompiler,
+ * which uses an .avsc or .json schema definition, this compiler
+ * expects an XML mapping file as input.
+ */
+public class GoraCassandraNativeCompiler {
+
+  private static final Logger log = 
LoggerFactory.getLogger(GoraCassandraNativeCompiler.class);
+
+  private Writer out;
+  private File dest;
+
+  GoraCassandraNativeCompiler(File dest) {
+    this.dest = dest;
+  }
+
+  /**
+   * Start point of the compiler program
+   *
+   * @param args the schema file to be compiled and where this should be 
written
+   */
+  public static void main(String[] args) {
+    try {
+      if (args.length < 2) {
+        log.error("Usage: Compiler <mapping file> <output dir>");
+        System.exit(1);
+      }
+      compileSchema(new File(args[0]), new File(args[1]));
+    } catch (Exception e) {
+      log.error("Something went wrong. Please check the input file.", 
e.getMessage());
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Generates Java classes for a mapping.
+   */
+  private static void compileSchema(File src, File dest) throws Exception {
+    log.info("Compiling {} to {}", src, dest);
+    GoraCassandraNativeCompiler compiler = new 
GoraCassandraNativeCompiler(dest);
+    List<CassandraMapping> mappings = readMappingFile(src);
+    for (CassandraMapping mapping : mappings) {
+      compiler.compile(mapping);
+    }
+  }
+
+  private static List<CassandraMapping> readMappingFile(File src) throws 
Exception {
+    List<CassandraMapping> mappings = new 
CassandraMappingBuilder().readMappingFile(src);
+    return mappings;
+  }
+
+  /**
+   * Returns the string received with the first letter in uppercase
+   *
+   * @param name to be converted
+   * @return camelCase String
+   */
+  static String cap(String name) {
+    return name.substring(0, 1).toUpperCase(Locale.getDefault())
+            + name.substring(1, name.length());
+  }
+
+  /**
+   * Method in charge of compiling a specific table using a key schema and a 
set
+   * of attributes
+   *
+   * @param mapping Cassandra Mapping
+   */
+  private void compile(CassandraMapping mapping) {
+    String fullQualifiedName = mapping.getProperty("name");
+    String tableName = mapping.getCoreName();
+    String packageName = fullQualifiedName.substring(0, 
fullQualifiedName.lastIndexOf("."));
+    String className = fullQualifiedName.substring(packageName.length() + 1, 
fullQualifiedName.length());
+    String keySpace = mapping.getKeySpace().getName();
+
+    try {
+      startFile(className, packageName);
+      setHeaders(packageName);
+      line(0, "");
+      line(0, "@Table(keyspace = \"" + keySpace + "\", name = \"" + tableName 
+ "\"," +
+              "readConsistency = \"QUORUM\"," +
+              "writeConsistency = \"QUORUM\"," +
+              "caseSensitiveKeyspace = false," +
+              "caseSensitiveTable = false)");
+      line(0, "public class " + className + " implements Persistent {");
+      for (Field field : mapping.getFieldList()) {
+        processFields(field);
+        processGetterAndSetters(field);
+        line(2, "");
+      }
+
+      setDefaultMethods(2, className);
+      line(0, "}");
+      out.flush();
+      out.close();
+    } catch (IOException e) {
+      log.error("Error while compiling table {}", className, e.getMessage());
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Sets the necessary imports for the generated java class to work
+   *
+   * @param namespace Namespace
+   * @throws IOException
+   */
+  private void setHeaders(String namespace) throws IOException {
+    if (namespace != null) {
+      line(0, "package " + namespace + ";\n");
+    }
+    line(0, "import java.util.List;");
+    line(0, "import java.util.Set;");
+    line(0, "import java.util.Map;");
+    line(0, "import java.util.UUID;");
+    line(0, "import java.math.BigDecimal;");
+    line(0, "import java.math.BigInteger;");
+    line(0, "import java.net.InetAddress;");
+    line(0, "import java.nio.ByteBuffer;");
+    line(0, "import java.util.Date;");
+    line(0, "");
+    line(0, "import org.apache.avro.Schema.Field;");
+    line(0, "import org.apache.gora.persistency.Persistent;");
+    line(0, "import org.apache.gora.persistency.Tombstone;");
+    line(0, "import com.datastax.driver.mapping.annotations.Column;");
+    line(0, "import com.datastax.driver.mapping.annotations.PartitionKey;");
+    line(0, "import com.datastax.driver.mapping.annotations.Table;");
+    line(0, "import com.datastax.driver.mapping.annotations.Transient;");
+  }
+
+  /**
+   * Starts the java generated class file
+   *
+   * @param name Class name
+   * @throws IOException
+   */
+  private void startFile(String name, String packageName) throws IOException {
+    String fullDest = FilenameUtils.normalize
+            (dest.getAbsolutePath() + File.separatorChar + 
packageName.replace('.', File.separatorChar));
+    File dir = new File(fullDest);
+    if (!dir.exists())
+      if (!dir.mkdirs())
+        throw new IOException("Unable to create " + dir);
+    name = cap(name) + ".java";
+    out = new OutputStreamWriter(new FileOutputStream(new File(dir, name)), 
Charset.defaultCharset());
+  }
+
+  /**
+   * Creates default methods inherited from upper classes
+   *
+   * @param pIden     of spaces used for indentation
+   * @param className class Name
+   * @throws IOException
+   */
+  private void setDefaultMethods(int pIden, String className) throws 
IOException {
+    line(pIden, "@Transient");
+    line(pIden, "@Override");
+    line(pIden, "public void clear() { }");
+    line(pIden, "@Transient");
+    line(pIden, "@Override");
+    line(pIden, "public boolean isDirty() { return false; }");
+    line(pIden, "@Transient");
+    line(pIden, "@Override");
+    line(pIden, "public boolean isDirty(int fieldIndex) { return false; }");
+    line(pIden, "@Transient");
+    line(pIden, "@Override");
+    line(pIden, "public boolean isDirty(String field) { return false; }");
+    line(pIden, "@Transient");
+    line(pIden, "@Override");
+    line(pIden, "public void setDirty() { }");
+    line(pIden, "@Transient");
+    line(pIden, "@Override");
+    line(pIden, "public void setDirty(int fieldIndex) { }");
+    line(pIden, "@Transient");
+    line(pIden, "@Override");
+    line(pIden, "public void setDirty(String field) { }");
+    line(pIden, "@Transient");
+    line(pIden, "@Override");
+    line(pIden, "public void clearDirty(int fieldIndex) { }");
+    line(pIden, "@Transient");
+    line(pIden, "@Override");
+    line(pIden, "public void clearDirty(String field) { }");
+    line(pIden, "@Transient");
+    line(pIden, "@Override");
+    line(pIden, "public void clearDirty() { }");
+    line(pIden, "@Transient");
+    line(pIden, "@Override");
+    line(pIden, "public Tombstone getTombstone() { return null; }");
+    line(pIden, "@Transient");
+    line(pIden, "@Override");
+    line(pIden, "public List<Field> getUnmanagedFields() { return null; }");
+    line(pIden, "@Transient");
+    line(pIden, "@Override");
+    line(pIden, "public Persistent newInstance() { return new " + className + 
"(); }");
+  }
+
+  private void processFields(Field field) throws IOException {
+    String fieldName = field.getFieldName();
+    String columnName = field.getColumnName();
+    if (Boolean.parseBoolean(field.getProperty("primarykey"))) {
+      line(2, "@PartitionKey");
+    }
+    line(2, "@Column(name = \"" + columnName + "\")");
+    line(2, "private " + getDataType(field.getType(), false) + " " + fieldName 
+ ";");
+  }
+
+  private void processGetterAndSetters(Field field) throws IOException {
+    String dataType = getDataType(field.getType(), false);
+    line(2, "public " + dataType + " get" + cap(field.getFieldName()) + "() 
{");
+    line(2, "return " + field.getFieldName() + ";");
+    line(2, "}");
+    line(2, "public void set" + cap(field.getFieldName()) + "(" + dataType + " 
field) {");
+    line(2, field.getFieldName() + " = field;");
+    line(2, "}");
+  }
+
+  private String getDataType(String dbType, boolean isInner) {
+    if (dbType.equalsIgnoreCase("uuid")) {
+      return "UUID";
+    } else if (dbType.equalsIgnoreCase("text") || 
dbType.equalsIgnoreCase("ascii") || dbType.equalsIgnoreCase("varchar")) {
+      return "String";
+    } else if (dbType.equalsIgnoreCase("timestamp")) {
+      return "Date";
+    } else if (dbType.startsWith("list")) {
+      String innerType = dbType.substring(dbType.indexOf("<") + 1, 
dbType.indexOf(">"));
+      return "List<" + getDataType(innerType, true) + ">";
+    } else if (dbType.startsWith("set")) {
+      String innerType = dbType.substring(dbType.indexOf("<") + 1, 
dbType.indexOf(">"));
+      return "Set<" + getDataType(innerType, true) + ">";
+    } else if (dbType.startsWith("map")) {
+      String innerTypes = dbType.substring(dbType.indexOf("<") + 1, 
dbType.indexOf(">"));
+      String[] types = innerTypes.split(",");
+      return "Map<" + getDataType(types[0], true) + "," + 
getDataType(types[1], true) + ">";
+    } else if (dbType.equalsIgnoreCase("blob")) {
+      return "ByteBuffer";
+    } else if (dbType.equalsIgnoreCase("int")) {
+      if (isInner) {
+        return "Integer";
+      } else {
+        return "int";
+      }
+    } else if (dbType.equalsIgnoreCase("float")) {
+      if (isInner) {
+        return "Float";
+      } else {
+        return "float";
+      }
+    } else if (dbType.equalsIgnoreCase("double")) {
+      if (isInner) {
+        return "Double";
+      } else {
+        return "double";
+      }
+    } else if (dbType.equalsIgnoreCase("decimal")) {
+      return "BigDecimal";
+    } else if (dbType.equalsIgnoreCase("bigint") || 
dbType.equalsIgnoreCase("counter")) {
+      return "Long";
+    } else if (dbType.equalsIgnoreCase("boolean")) {
+      if (isInner) {
+        return "Boolean";
+      } else {
+        return "boolean";
+      }
+    } else if (dbType.equalsIgnoreCase("varint")) {
+      return "BigInteger";
+    } else if (dbType.equalsIgnoreCase("inet")) {
+      return "InetAddress";
+    } else if (dbType.contains("frozen")) {
+      throw new RuntimeException("Compiler Doesn't support user define types");
+    }
+    throw new RuntimeException("Invalid Cassandra DataType");
+  }
+
+  /**
+   * Writes a line within the output stream
+   *
+   * @param indent Number of spaces used for indentation
+   * @param text   Text to be written
+   * @throws IOException
+   */
+  private void line(int indent, String text) throws IOException {
+    for (int i = 0; i < indent; i++) {
+      out.append("  ");
+    }
+    out.append(text);
+    out.append("\n");
+  }
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/package-info.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/package-info.java 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/package-info.java
new file mode 100644
index 0000000..3ad9186
--- /dev/null
+++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains Casandra datastore related all classes.
+ */
+package org.apache.gora.cassandra;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
new file mode 100644
index 0000000..e24822d
--- /dev/null
+++ 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.cassandra.query;
+
+import org.apache.gora.filter.Filter;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.ws.impl.QueryWSBase;
+import org.apache.gora.store.DataStore;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Cassandra specific implementation of the {@link Query} interface.
+ */
+public class CassandraQuery<K, T extends Persistent> extends QueryWSBase<K, T> 
{
+
+  private Filter<K, T> filter;
+
+  private boolean localFilterEnabled = true;
+
+  private Map<String, Object> updateFields = new HashMap<>();
+
+  public CassandraQuery(DataStore<K, T> dataStore) {
+    super(dataStore);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Filter<K, T> getFilter() {
+    return filter;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void setFilter(Filter<K, T> filter) {
+    this.filter = filter;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean isLocalFilterEnabled() {
+    return localFilterEnabled;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void setLocalFilterEnabled(boolean enable) {
+    localFilterEnabled = enable;
+  }
+
+  /**
+   * This method adds Update field with the relevant Value
+   *
+   * @param field    field Name
+   * @param newValue New Value of the field
+   */
+  public void addUpdateField(String field, Object newValue) {
+    updateFields.put(field, newValue);
+  }
+
+  /**
+   * This method returns the updated field value of the particular field.
+   *
+   * @param key Field Name
+   * @return Object value
+   */
+  public Object getUpdateFieldValue(String key) {
+    return updateFields.get(key);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String[] getFields() {
+    if (updateFields.size() == 0) {
+      return super.getFields();
+    } else {
+      String[] updateFieldsArray = new String[updateFields.size()];
+      return updateFields.keySet().toArray(updateFieldsArray);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
new file mode 100644
index 0000000..4e44d0d
--- /dev/null
+++ 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.cassandra.query;
+
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.impl.ResultBase;
+import org.apache.gora.store.DataStore;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * CassandraResult specific implementation of the {@link 
org.apache.gora.query.Result}
+ * interface.
+ */
+public class CassandraResultSet<K, T extends Persistent> extends ResultBase<K, 
T> {
+
+  private List<T> persistentObject = new ArrayList<T>();
+
+  private List<K> persistentKey = new ArrayList<K>();
+
+  private int size = 0;
+
+  private int position = 0;
+
+  /**
+   * Constructor of the Cassandra Result
+   * @param dataStore Cassandra Data Store
+   * @param query Cassandra Query
+   */
+  public CassandraResultSet(DataStore<K, T> dataStore, Query<K, T> query) {
+    super(dataStore, query);
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @return
+   * @throws IOException
+   */
+  @Override
+  protected boolean nextInner() throws IOException {
+    if (offset < size) {
+      persistent = persistentObject.get(position);
+      key = persistentKey.get(position);
+      position++;
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @return
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    return ((float) position) / size;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @return
+   */
+  @Override
+  public T get() {
+    return super.get();
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @return
+   */
+  @Override
+  public K getKey() {
+    return super.getKey();
+  }
+
+  /**
+   * This method adds Result Element into result lists, So when user retrieves 
values from the Result these objects will be passed.
+   *
+   * @param key   key
+   * @param token persistent Object
+   */
+  public void addResultElement(K key, T token) {
+    this.persistentKey.add(key);
+    this.persistentObject.add(token);
+    this.size++;
+  }
+
+  @Override
+  /**
+   * Returns whether the limit for the query is reached.
+   * @return true if result limit is reached
+   */
+  protected boolean isLimitReached() {
+    return (limit > 0 && offset >= limit) || (offset >= size);
+  }
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/package-info.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/package-info.java
 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/package-info.java
new file mode 100644
index 0000000..275c8d9
--- /dev/null
+++ 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains all the Cassandra store query representation class as 
well as Result set representing class
+ * when query is executed over the Cassandra dataStore.
+ */
+package org.apache.gora.cassandra.query;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java
 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java
new file mode 100644
index 0000000..9c33bf6
--- /dev/null
+++ 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java
@@ -0,0 +1,354 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.cassandra.serializers;
+
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.cassandra.bean.CassandraKey;
+import org.apache.gora.cassandra.bean.Field;
+import org.apache.gora.cassandra.store.CassandraMapping;
+import org.apache.gora.hbase.util.HBaseByteInterface;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.DirtyListWrapper;
+import org.apache.gora.persistency.impl.DirtyMapWrapper;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * This class is Utils class for Avro serialization.
+ */
+class AvroCassandraUtils {
+
+  /**
+   * Default schema index with value "0" used when AVRO Union data types are 
stored.
+   */
+  private static final int DEFAULT_UNION_SCHEMA = 0;
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AvroCassandraUtils.class);
+
+  static void processKeys(CassandraMapping cassandraMapping, Object key, 
List<String> keys, List<Object> values) {
+    CassandraKey cassandraKey = cassandraMapping.getCassandraKey();
+    if (cassandraKey != null) {
+      if (key instanceof PersistentBase) {
+        PersistentBase keyBase = (PersistentBase) key;
+        for (Schema.Field field : keyBase.getSchema().getFields()) {
+          Field mappedField = cassandraKey.getFieldFromFieldName(field.name());
+          if (mappedField != null) {
+            keys.add(field.name());
+            Object value = keyBase.get(field.pos());
+            value = getFieldValueFromAvroBean(field.schema(), 
field.schema().getType(), value, mappedField);
+            values.add(value);
+          } else {
+            LOG.debug("Ignoring field {}, Since field couldn't find in the {} 
mapping", new Object[]{field.name(), cassandraMapping.getPersistentClass()});
+          }
+        }
+      } else {
+        LOG.error("Key bean isn't extended by {} .", new 
Object[]{cassandraMapping.getKeyClass(), PersistentBase.class});
+      }
+    } else {
+      
keys.add(cassandraMapping.getInlinedDefinedPartitionKey().getFieldName());
+      values.add(key);
+    }
+  }
+
+  /**
+   * For every field within an object, we pass in a field schema, Type and 
value.
+   * This enables us to process fields (based on their characteristics)
+   * preparing them for persistence.
+   *
+   * @param fieldSchema the associated field schema
+   * @param type        the field type
+   * @param fieldValue  the field value.
+   * @return field value
+   */
+  static Object getFieldValueFromAvroBean(Schema fieldSchema, Schema.Type 
type, Object fieldValue, Field field) {
+    switch (type) {
+      // Record can be persist with two ways, udt and bytes
+      case RECORD:
+        if (field.getType().contains("blob")) {
+          try {
+            byte[] serializedBytes = HBaseByteInterface.toBytes(fieldValue, 
fieldSchema);
+            fieldValue = ByteBuffer.wrap(serializedBytes);
+          } catch (IOException e) {
+            LOG.error("Error occurred when serializing {} field. {}", new 
Object[]{field.getFieldName(), e.getMessage()});
+          }
+        } else {
+          throw new RuntimeException("Unsupported Data Type for Record, 
Currently Supported Data Types are blob and UDT for Records");
+        }
+        break;
+      case MAP:
+        Schema valueSchema = fieldSchema.getValueType();
+        Schema.Type valuetype = valueSchema.getType();
+        Map<String, Object> map = new HashMap<>();
+        for (Map.Entry<CharSequence, ?> e : ((Map<CharSequence, ?>) 
fieldValue).entrySet()) {
+          String mapKey = e.getKey().toString();
+          Object mapValue = e.getValue();
+          mapValue = getFieldValueFromAvroBean(valueSchema, valuetype, 
mapValue, field);
+          map.put(mapKey, mapValue);
+        }
+        fieldValue = map;
+        break;
+      case ARRAY:
+        valueSchema = fieldSchema.getElementType();
+        valuetype = valueSchema.getType();
+        ArrayList<Object> list = new ArrayList<>();
+        for (Object item : (Collection<?>) fieldValue) {
+          Object value = getFieldValueFromAvroBean(valueSchema, valuetype, 
item, field);
+          list.add(value);
+        }
+        fieldValue = list;
+        break;
+      case UNION:
+        // storing the union selected schema, the actual value will
+        // be stored as soon as we get break out.
+        if (fieldValue != null) {
+          int schemaPos = getUnionSchema(fieldValue, fieldSchema);
+          Schema unionSchema = fieldSchema.getTypes().get(schemaPos);
+          Schema.Type unionType = unionSchema.getType();
+          fieldValue = getFieldValueFromAvroBean(unionSchema, unionType, 
fieldValue, field);
+        }
+        break;
+      case STRING:
+        fieldValue = fieldValue.toString();
+        break;
+      default:
+        break;
+    }
+    return fieldValue;
+  }
+
+  /**
+   * Given an object and the object schema this function obtains,
+   * from within the UNION schema, the position of the type used.
+   * If no data type can be inferred then we return a default value
+   * of position 0.
+   *
+   * @param pValue       Object
+   * @param pUnionSchema avro Schema
+   * @return the unionSchemaPosition.
+   */
+  private static int getUnionSchema(Object pValue, Schema pUnionSchema) {
+    int unionSchemaPos = 0;
+//    String valueType = pValue.getClass().getSimpleName();
+    for (Schema currentSchema : pUnionSchema.getTypes()) {
+      Schema.Type schemaType = currentSchema.getType();
+      if (pValue instanceof CharSequence && 
schemaType.equals(Schema.Type.STRING))
+        return unionSchemaPos;
+      else if (pValue instanceof ByteBuffer && 
schemaType.equals(Schema.Type.BYTES))
+        return unionSchemaPos;
+      else if (pValue instanceof Integer && schemaType.equals(Schema.Type.INT))
+        return unionSchemaPos;
+      else if (pValue instanceof Long && schemaType.equals(Schema.Type.LONG))
+        return unionSchemaPos;
+      else if (pValue instanceof Double && 
schemaType.equals(Schema.Type.DOUBLE))
+        return unionSchemaPos;
+      else if (pValue instanceof Float && schemaType.equals(Schema.Type.FLOAT))
+        return unionSchemaPos;
+      else if (pValue instanceof Boolean && 
schemaType.equals(Schema.Type.BOOLEAN))
+        return unionSchemaPos;
+      else if (pValue instanceof Map && schemaType.equals(Schema.Type.MAP))
+        return unionSchemaPos;
+      else if (pValue instanceof List && schemaType.equals(Schema.Type.ARRAY))
+        return unionSchemaPos;
+      else if (pValue instanceof Persistent && 
schemaType.equals(Schema.Type.RECORD))
+        return unionSchemaPos;
+      else if (pValue != null && 
ByteBuffer.class.isAssignableFrom(pValue.getClass()) && 
schemaType.equals(Schema.Type.STRING))
+        return unionSchemaPos;
+      else if (pValue != null && 
ByteBuffer.class.isAssignableFrom(pValue.getClass()) && 
schemaType.equals(Schema.Type.INT))
+        return unionSchemaPos;
+      else if (pValue != null && 
ByteBuffer.class.isAssignableFrom(pValue.getClass()) && 
schemaType.equals(Schema.Type.LONG))
+        return unionSchemaPos;
+      else if (pValue != null && 
ByteBuffer.class.isAssignableFrom(pValue.getClass()) && 
schemaType.equals(Schema.Type.DOUBLE))
+        return unionSchemaPos;
+      else if (pValue != null && 
ByteBuffer.class.isAssignableFrom(pValue.getClass()) && 
schemaType.equals(Schema.Type.FLOAT))
+        return unionSchemaPos;
+      else if (pValue != null && 
ByteBuffer.class.isAssignableFrom(pValue.getClass()) && 
schemaType.equals(Schema.Type.RECORD))
+        return unionSchemaPos;
+      unionSchemaPos++;
+    }
+    // if we weren't able to determine which data type it is, then we return 
the default
+    return DEFAULT_UNION_SCHEMA;
+  }
+
+  static Object getAvroFieldValue(Object value, Schema schema) {
+    Object result;
+    switch (schema.getType()) {
+
+      case MAP:
+        Map<String, Object> rawMap = (Map<String, Object>) value;
+        Map<Utf8, Object> utf8ObjectHashMap = new HashMap<>();
+        if (rawMap == null) {
+          result = new DirtyMapWrapper(utf8ObjectHashMap);
+          break;
+        }
+        for (Map.Entry<?, ?> e : rawMap.entrySet()) {
+          Schema innerSchema = schema.getValueType();
+          Object obj = getAvroFieldValue(e.getValue(), innerSchema);
+          if (e.getKey().getClass().getSimpleName().equalsIgnoreCase("Utf8")) {
+            utf8ObjectHashMap.put((Utf8) e.getKey(), obj);
+          } else {
+            utf8ObjectHashMap.put(new Utf8((String) e.getKey()), obj);
+          }
+        }
+        result = new DirtyMapWrapper<>(utf8ObjectHashMap);
+        break;
+
+      case ARRAY:
+        List<Object> rawList = (List<Object>) value;
+        List<Object> objectArrayList = new ArrayList<>();
+        if (rawList == null) {
+          return new DirtyListWrapper(objectArrayList);
+        }
+        for (Object item : rawList) {
+          Object obj = getAvroFieldValue(item, schema.getElementType());
+          objectArrayList.add(obj);
+        }
+        result = new DirtyListWrapper<>(objectArrayList);
+        break;
+
+      case RECORD:
+        if (value != null && 
ByteBuffer.class.isAssignableFrom(value.getClass())) {
+          ByteBuffer buffer = (ByteBuffer) value;
+          byte[] arr = new byte[buffer.remaining()];
+          buffer.get(arr);
+          try {
+            result = (PersistentBase) HBaseByteInterface.fromBytes(schema, 
arr);
+          } catch (IOException e) {
+            LOG.error("Error occurred while deserialize the Record. :" + 
e.getMessage());
+            result = null;
+          }
+        } else {
+          result = (PersistentBase) value;
+        }
+        break;
+
+      case UNION:
+        int index = getUnionSchema(value, schema);
+        Schema resolvedSchema = schema.getTypes().get(index);
+        result = getAvroFieldValue(value, resolvedSchema);
+        break;
+
+      case ENUM:
+        result = org.apache.gora.util.AvroUtils.getEnumValue(schema, (String) 
value);
+        break;
+
+      case BYTES:
+        if (ByteBuffer.class.isAssignableFrom(value.getClass())) {
+          result = value;
+        } else {
+          result = ByteBuffer.wrap((byte[]) value);
+        }
+        break;
+
+      case STRING:
+        if (value instanceof org.apache.avro.util.Utf8) {
+          result = value;
+        } else if (ByteBuffer.class.isAssignableFrom(value.getClass())) {
+          result = new Utf8(((ByteBuffer) value).array());
+        } else {
+          result = new Utf8((String) value);
+        }
+        break;
+
+      case INT:
+        if (ByteBuffer.class.isAssignableFrom(value.getClass())) {
+          result = ((ByteBuffer) value).getInt();
+        } else {
+          result = value;
+        }
+        break;
+
+      case FLOAT:
+        if (ByteBuffer.class.isAssignableFrom(value.getClass())) {
+          result = ((ByteBuffer) value).getFloat();
+        } else {
+          result = value;
+        }
+        break;
+
+      case DOUBLE:
+        if (ByteBuffer.class.isAssignableFrom(value.getClass())) {
+          result = ((ByteBuffer) value).getDouble();
+        } else {
+          result = value;
+        }
+        break;
+
+      case LONG:
+        if (ByteBuffer.class.isAssignableFrom(value.getClass())) {
+          result = ((ByteBuffer) value).getLong();
+        } else {
+          result = value;
+        }
+        break;
+
+      default:
+        result = value;
+    }
+    return result;
+  }
+
+  static Class getRelevantClassForCassandraDataType(String dataType) {
+    switch (dataType) {
+      case "ascii":
+      case "text":
+      case "varchar":
+        return String.class;
+      case "blob":
+        return ByteBuffer.class;
+      case "int":
+        return Integer.class;
+      case "double":
+        return Double.class;
+      case "bigint":
+      case "counter":
+        return Long.class;
+      case "decimal":
+        return BigDecimal.class;
+      case "float":
+        return Float.class;
+      case "boolean":
+        return Boolean.class;
+      case "inet":
+        return InetAddress.class;
+      case "varint":
+        return BigInteger.class;
+      case "uuid":
+        return UUID.class;
+      case "timestamp":
+        return Date.class;
+      default:
+        throw new RuntimeException("Invalid Cassandra DataType");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
new file mode 100644
index 0000000..204ae52
--- /dev/null
+++ 
b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
@@ -0,0 +1,446 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.gora.cassandra.serializers;
+
+import com.datastax.driver.core.AbstractGettableData;
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.UDTValue;
+import com.datastax.driver.core.UserType;
+import org.apache.avro.Schema;
+import org.apache.avro.specific.SpecificData;
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.gora.cassandra.bean.CassandraKey;
+import org.apache.gora.cassandra.bean.Field;
+import org.apache.gora.cassandra.query.CassandraResultSet;
+import org.apache.gora.cassandra.store.CassandraClient;
+import org.apache.gora.cassandra.store.CassandraMapping;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.store.DataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This class contains the operations relates to Avro Serialization.
+ */
+class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer {
+
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AvroSerializer.class);
+
+  private DataStore<K, T> cassandraDataStore;
+
+  private Schema persistentSchema;
+
+  AvroSerializer(CassandraClient cassandraClient, DataStore<K, T> dataStore, 
CassandraMapping mapping) {
+    super(cassandraClient, dataStore.getKeyClass(), 
dataStore.getPersistentClass(), mapping);
+    if (PersistentBase.class.isAssignableFrom(dataStore.getPersistentClass())) 
{
+      persistentSchema = ((PersistentBase) 
dataStore.getBeanFactory().getCachedPersistent()).getSchema();
+    } else {
+      persistentSchema = null;
+    }
+    this.cassandraDataStore = dataStore;
+    try {
+      analyzePersistent();
+    } catch (Exception e) {
+      throw new RuntimeException("Error occurred while analyzing the 
persistent class, :" + e.getMessage());
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @throws Exception
+   */
+  protected void analyzePersistent() throws Exception {
+    userDefineTypeMaps = new HashMap<>();
+    for (Field field : mapping.getFieldList()) {
+      String fieldType = field.getType();
+      if (fieldType.contains("frozen")) {
+        String udtType = fieldType.substring(fieldType.indexOf("<") + 1, 
fieldType.indexOf(">"));
+        if (PersistentBase.class.isAssignableFrom(persistentClass)) {
+          Schema fieldSchema = 
persistentSchema.getField(field.getFieldName()).schema();
+          if (fieldSchema.getType().equals(Schema.Type.UNION)) {
+            for (Schema currentSchema : fieldSchema.getTypes()) {
+              if (currentSchema.getType().equals(Schema.Type.RECORD)) {
+                fieldSchema = currentSchema;
+                break;
+              }
+            }
+          }
+          String createQuery = 
CassandraQueryFactory.getCreateUDTTypeForAvro(mapping, udtType, fieldSchema);
+          userDefineTypeMaps.put(udtType, createQuery);
+        } else {
+          throw new RuntimeException("Unsupported Class for User Define Types, 
Please use PersistentBase class. field : " + udtType);
+        }
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param query
+   * @return
+   */
+  @Override
+  public boolean updateByQuery(Query query) {
+    List<Object> objectArrayList = new ArrayList<>();
+    String cqlQuery = CassandraQueryFactory.getUpdateByQueryForAvro(mapping, 
query, objectArrayList, persistentSchema);
+    ResultSet results;
+    SimpleStatement statement;
+    if (objectArrayList.size() == 0) {
+      statement = new SimpleStatement(cqlQuery);
+    } else {
+      statement = new SimpleStatement(cqlQuery, objectArrayList.toArray());
+    }
+    if (writeConsistencyLevel != null) {
+      
statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel));
+    }
+    results = client.getSession().execute(statement);
+    return results.wasApplied();
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param key
+   * @param fields
+   * @return
+   */
+  @Override
+  public Persistent get(Object key, String[] fields) {
+    if (fields == null) {
+      fields = getFields();
+    }
+    ArrayList<String> cassandraKeys = new ArrayList<>();
+    ArrayList<Object> cassandraValues = new ArrayList<>();
+    AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, 
cassandraValues);
+    String cqlQuery = 
CassandraQueryFactory.getSelectObjectWithFieldsQuery(mapping, fields, 
cassandraKeys);
+    SimpleStatement statement = new SimpleStatement(cqlQuery, 
cassandraValues.toArray());
+    if (readConsistencyLevel != null) {
+      
statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel));
+    }
+    ResultSet resultSet = this.client.getSession().execute(statement);
+    Iterator<Row> iterator = resultSet.iterator();
+    ColumnDefinitions definitions = resultSet.getColumnDefinitions();
+    T obj = null;
+    if (iterator.hasNext()) {
+      obj = cassandraDataStore.newPersistent();
+      AbstractGettableData row = (AbstractGettableData) iterator.next();
+      populateValuesToPersistent(row, definitions, obj, fields);
+    }
+    return obj;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param key
+   * @param persistent
+   */
+  @Override
+  public void put(Object key, Persistent persistent) {
+    if (persistent instanceof PersistentBase) {
+      if (persistent.isDirty()) {
+        PersistentBase persistentBase = (PersistentBase) persistent;
+        ArrayList<String> fields = new ArrayList<>();
+        ArrayList<Object> values = new ArrayList<>();
+        AvroCassandraUtils.processKeys(mapping, key, fields, values);
+        for (Schema.Field f : persistentBase.getSchema().getFields()) {
+          String fieldName = f.name();
+          Field field = mapping.getFieldFromFieldName(fieldName);
+          if (field == null) {
+            LOG.debug("Ignoring {} adding field, {} field can't find in {} 
mapping", new Object[]{fieldName, fieldName, persistentClass});
+            continue;
+          }
+          if (persistent.isDirty(f.pos()) || 
mapping.getInlinedDefinedPartitionKey().equals(mapping.getFieldFromFieldName(fieldName)))
 {
+            Object value = persistentBase.get(f.pos());
+            String fieldType = field.getType();
+            if (fieldType.contains("frozen")) {
+              fieldType = fieldType.substring(fieldType.indexOf("<") + 1, 
fieldType.indexOf(">"));
+              UserType userType = 
client.getSession().getCluster().getMetadata().getKeyspace(mapping.getKeySpace().getName()).getUserType(fieldType);
+              UDTValue udtValue = userType.newValue();
+              Schema udtSchema = f.schema();
+              if (udtSchema.getType().equals(Schema.Type.UNION)) {
+                for (Schema schema : udtSchema.getTypes()) {
+                  if (schema.getType().equals(Schema.Type.RECORD)) {
+                    udtSchema = schema;
+                    break;
+                  }
+                }
+              }
+              PersistentBase udtObjectBase = (PersistentBase) value;
+              for (Schema.Field udtField : udtSchema.getFields()) {
+                Object udtFieldValue = 
AvroCassandraUtils.getFieldValueFromAvroBean(udtField.schema(), 
udtField.schema().getType(), udtObjectBase.get(udtField.name()), field);
+                if (udtField.schema().getType().equals(Schema.Type.MAP)) {
+                  udtValue.setMap(udtField.name(), (Map) udtFieldValue);
+                } else if 
(udtField.schema().getType().equals(Schema.Type.ARRAY)) {
+                  udtValue.setList(udtField.name(), (List) udtFieldValue);
+                } else {
+                  udtValue.set(udtField.name(), udtFieldValue, (Class) 
udtFieldValue.getClass());
+                }
+              }
+              value = udtValue;
+            } else {
+              value = AvroCassandraUtils.getFieldValueFromAvroBean(f.schema(), 
f.schema().getType(), value, field);
+            }
+            values.add(value);
+            fields.add(fieldName);
+          }
+        }
+        String cqlQuery = CassandraQueryFactory.getInsertDataQuery(mapping, 
fields);
+        SimpleStatement statement = new SimpleStatement(cqlQuery, 
values.toArray());
+        if (writeConsistencyLevel != null) {
+          
statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel));
+        }
+        client.getSession().execute(statement);
+      } else {
+        LOG.info("Ignored putting persistent bean {} in the store as it is 
neither "
+                + "new, neither dirty.", new Object[]{persistent});
+      }
+    } else {
+      LOG.error("{} Persistent bean isn't extended by {} .", new 
Object[]{this.persistentClass, PersistentBase.class});
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param key
+   * @return
+   */
+  @Override
+  public Persistent get(Object key) {
+    ArrayList<String> cassandraKeys = new ArrayList<>();
+    ArrayList<Object> cassandraValues = new ArrayList<>();
+    AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, 
cassandraValues);
+    String cqlQuery = CassandraQueryFactory.getSelectObjectQuery(mapping, 
cassandraKeys);
+    SimpleStatement statement = new SimpleStatement(cqlQuery, 
cassandraValues.toArray());
+    if (readConsistencyLevel != null) {
+      
statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel));
+    }
+    ResultSet resultSet = client.getSession().execute(statement);
+    Iterator<Row> iterator = resultSet.iterator();
+    ColumnDefinitions definitions = resultSet.getColumnDefinitions();
+    T obj = null;
+    if (iterator.hasNext()) {
+      obj = cassandraDataStore.newPersistent();
+      AbstractGettableData row = (AbstractGettableData) iterator.next();
+      populateValuesToPersistent(row, definitions, obj, 
mapping.getFieldNames());
+    }
+    return obj;
+  }
+
+  /**
+   * This method wraps result set data in to DataEntry and creates a list of 
DataEntry.
+   **/
+  private void populateValuesToPersistent(AbstractGettableData row, 
ColumnDefinitions columnDefinitions, PersistentBase base, String[] fields) {
+    Object paramValue;
+    for (String fieldName : fields) {
+      Schema.Field avroField = base.getSchema().getField(fieldName);
+      Field field = mapping.getFieldFromFieldName(fieldName);
+      //to ignore unspecified fields in the mapping
+      if (field == null || avroField == null) {
+        continue;
+      }
+      Schema fieldSchema = avroField.schema();
+      String columnName = field.getColumnName();
+      paramValue = getValue(row, columnDefinitions.getType(columnName), 
columnName, fieldSchema);
+      Object value = AvroCassandraUtils.getAvroFieldValue(paramValue, 
fieldSchema);
+      base.put(avroField.pos(), value);
+    }
+  }
+
+  private Object getValue(AbstractGettableData row, DataType columnType, 
String columnName, Schema schema) {
+    Object paramValue;
+    String dataType;
+    switch (columnType.getName()) {
+      case ASCII:
+        paramValue = row.getString(columnName);
+        break;
+      case BIGINT:
+        paramValue = row.isNull(columnName) ? null : row.getLong(columnName);
+        break;
+      case BLOB:
+        paramValue = row.isNull(columnName) ? null : row.getBytes(columnName);
+        break;
+      case BOOLEAN:
+        paramValue = row.isNull(columnName) ? null : row.getBool(columnName);
+        break;
+      case COUNTER:
+        paramValue = row.isNull(columnName) ? null : row.getLong(columnName);
+        break;
+      case DECIMAL:
+        paramValue = row.isNull(columnName) ? null : 
row.getDecimal(columnName);
+        break;
+      case DOUBLE:
+        paramValue = row.isNull(columnName) ? null : row.getDouble(columnName);
+        break;
+      case FLOAT:
+        paramValue = row.isNull(columnName) ? null : row.getFloat(columnName);
+        break;
+      case INET:
+        paramValue = row.isNull(columnName) ? null : 
row.getInet(columnName).toString();
+        break;
+      case INT:
+        paramValue = row.isNull(columnName) ? null : row.getInt(columnName);
+        break;
+      case TEXT:
+        paramValue = row.getString(columnName);
+        break;
+      case TIMESTAMP:
+        paramValue = row.isNull(columnName) ? null : row.getDate(columnName);
+        break;
+      case UUID:
+        paramValue = row.isNull(columnName) ? null : row.getUUID(columnName);
+        break;
+      case VARCHAR:
+        paramValue = row.getString(columnName);
+        break;
+      case VARINT:
+        paramValue = row.isNull(columnName) ? null : row.getVarint(columnName);
+        break;
+      case TIMEUUID:
+        paramValue = row.isNull(columnName) ? null : row.getUUID(columnName);
+        break;
+      case LIST:
+        dataType = columnType.getTypeArguments().get(0).toString();
+        paramValue = row.isNull(columnName) ? null : row.getList(columnName, 
AvroCassandraUtils.getRelevantClassForCassandraDataType(dataType));
+        break;
+      case SET:
+        dataType = columnType.getTypeArguments().get(0).toString();
+        paramValue = row.isNull(columnName) ? null : row.getList(columnName, 
AvroCassandraUtils.getRelevantClassForCassandraDataType(dataType));
+        break;
+      case MAP:
+        dataType = columnType.getTypeArguments().get(1).toString();
+        // Avro supports only String for keys
+        paramValue = row.isNull(columnName) ? null : row.getMap(columnName, 
String.class, 
AvroCassandraUtils.getRelevantClassForCassandraDataType(dataType));
+        break;
+      case UDT:
+        paramValue = row.isNull(columnName) ? null : 
row.getUDTValue(columnName);
+        if (paramValue != null) {
+          try {
+            PersistentBase udtObject = (PersistentBase) 
SpecificData.newInstance(Class.forName(schema.getFullName()), schema);
+            for (Schema.Field f : udtObject.getSchema().getFields()) {
+              DataType dType = ((UDTValue) 
paramValue).getType().getFieldType(f.name());
+              Object fieldValue = getValue((UDTValue) paramValue, dType, 
f.name(), f.schema());
+              udtObject.put(f.pos(), fieldValue);
+            }
+            paramValue = udtObject;
+          } catch (ClassNotFoundException e) {
+            throw new RuntimeException("Error occurred while populating data 
to " + schema.getFullName() + " : " + e.getMessage());
+          }
+        }
+        break;
+      case TUPLE:
+        paramValue = row.isNull(columnName) ? null : 
row.getTupleValue(columnName).toString();
+        break;
+      case CUSTOM:
+        paramValue = row.isNull(columnName) ? null : row.getBytes(columnName);
+        break;
+      default:
+        paramValue = row.getString(columnName);
+        break;
+    }
+    return paramValue;
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param key
+   * @return
+   */
+  @Override
+  public boolean delete(Object key) {
+    ArrayList<String> cassandraKeys = new ArrayList<>();
+    ArrayList<Object> cassandraValues = new ArrayList<>();
+    AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, 
cassandraValues);
+    String cqlQuery = CassandraQueryFactory.getDeleteDataQuery(mapping, 
cassandraKeys);
+    SimpleStatement statement = new SimpleStatement(cqlQuery, 
cassandraValues.toArray());
+    if (writeConsistencyLevel != null) {
+      
statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel));
+    }
+    ResultSet resultSet = client.getSession().execute(statement);
+    return resultSet.wasApplied();
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * @param dataStore
+   * @param query
+   * @return
+   */
+  @Override
+  public Result execute(DataStore dataStore, Query query) {
+    List<Object> objectArrayList = new ArrayList<>();
+    String[] fields = query.getFields();
+    if (fields != null) {
+      fields = (String[]) ArrayUtils.addAll(fields, mapping.getAllKeys());
+    } else {
+      fields = mapping.getAllFieldsIncludingKeys();
+    }
+    CassandraResultSet<K, T> cassandraResult = new 
CassandraResultSet<>(dataStore, query);
+    String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, 
objectArrayList, fields);
+    ResultSet results;
+    SimpleStatement statement;
+    if (objectArrayList.size() == 0) {
+      statement = new SimpleStatement(cqlQuery);
+    } else {
+      statement = new SimpleStatement(cqlQuery, objectArrayList.toArray());
+    }
+    if (readConsistencyLevel != null) {
+      
statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel));
+    }
+    results = client.getSession().execute(statement);
+    Iterator<Row> iterator = results.iterator();
+    ColumnDefinitions definitions = results.getColumnDefinitions();
+    T obj;
+    K keyObject;
+    CassandraKey cassandraKey = mapping.getCassandraKey();
+    while (iterator.hasNext()) {
+      AbstractGettableData row = (AbstractGettableData) iterator.next();
+      obj = cassandraDataStore.newPersistent();
+      keyObject = cassandraDataStore.newKey();
+      populateValuesToPersistent(row, definitions, obj, fields);
+      if (cassandraKey != null) {
+        populateValuesToPersistent(row, definitions, (PersistentBase) 
keyObject, cassandraKey.getFieldNames());
+      } else {
+        Field key = mapping.getInlinedDefinedPartitionKey();
+        keyObject = (K) getValue(row, 
definitions.getType(key.getColumnName()), key.getColumnName(), null);
+      }
+      cassandraResult.addResultElement(keyObject, obj);
+    }
+    return cassandraResult;
+  }
+
+}

Reply via email to