http://git-wip-us.apache.org/repos/asf/gora/blob/655cd3aa/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java
----------------------------------------------------------------------
diff --git 
a/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java 
b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java
index e445d51..699d051 100644
--- 
a/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java
+++ 
b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/query/DynamoDBQuery.java
@@ -20,6 +20,12 @@ package org.apache.gora.dynamodb.query;
 
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.gora.filter.Filter;
 import org.apache.gora.persistency.Persistent;
@@ -29,13 +35,12 @@ import org.apache.gora.store.DataStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.amazonaws.services.dynamodb.datamodeling.DynamoDBQueryExpression;
-import com.amazonaws.services.dynamodb.datamodeling.DynamoDBScanExpression;
-import com.amazonaws.services.dynamodb.model.AttributeValue;
-import com.amazonaws.services.dynamodb.model.ComparisonOperator;
-import com.amazonaws.services.dynamodb.model.Condition;
-import com.amazonaws.services.dynamodb.model.KeySchema;
-import com.amazonaws.services.dynamodb.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBScanExpression;
+import com.amazonaws.services.dynamodbv2.model.AttributeValue;
+import com.amazonaws.services.dynamodbv2.model.ComparisonOperator;
+import com.amazonaws.services.dynamodbv2.model.Condition;
+import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.model.KeyType;
 
 public class DynamoDBQuery<K, T extends Persistent> extends QueryWSBase<K, T> {
 
@@ -69,6 +74,8 @@ public class DynamoDBQuery<K, T extends Persistent> extends 
QueryWSBase<K, T> {
    */
   public static final String SCAN_QUERY = "scan";
 
+  public static final ComparisonOperator DEFAULT_SCAN_OP = 
ComparisonOperator.GE;
+
   /**
    * Query type property
    */
@@ -88,26 +95,29 @@ public class DynamoDBQuery<K, T extends Persistent> extends 
QueryWSBase<K, T> {
   /**
    * Key schema used for the query
    */
-  private KeySchema keySchema;
+  private ArrayList<KeySchemaElement> keySchema;
 
   /**
    * Hash key used for the query
    */
   private K hashKey;
 
+  private Map<String, String> keyItems;
+
   /**
    * Default Constructor
    */
   public DynamoDBQuery(){
-  super(null);
+    super(null);
   }
-  
+
   /**
    * Constructor
+   * 
    * @param dataStore
    */
   public DynamoDBQuery(DataStore<K, T> dataStore) {
-  super(dataStore);
+    super(dataStore);
   }
 
   /**
@@ -126,153 +136,213 @@ public class DynamoDBQuery<K, T extends Persistent> 
extends QueryWSBase<K, T> {
     return this.hashKey;
   }
 
+  private void defineQueryParams() {
+    if ((query.getStartKey() != null || query.getKey() != null)
+        && query.getEndKey() != null) {
+      DynamoDBQuery.setType(RANGE_QUERY);
+    } else if (query.getKey() != null || query.getStartKey() != null) {
+      DynamoDBQuery.setType(SCAN_QUERY);
+    }
+  }
+
   /**
    * Builds query expression depending on query type (range or scan) 
    */
-  public void buildExpression(){
-    AttributeValue hashAttrValue = buildKeyHashAttribute();
-    if (hashAttrValue == null)
-      throw new IllegalStateException("There is not a key schema defined.");
-    if (DynamoDBQuery.getType().equals(RANGE_QUERY)){
-      Condition newCondition = buildRangeCondition();
-      buildQueryExpression(newCondition, hashAttrValue);
+  public void buildExpression() {
+    defineQueryParams();
+    if (DynamoDBQuery.getType().equals(RANGE_QUERY)) {
+      buildRangeExpression();
+    } else if (DynamoDBQuery.getType().equals(SCAN_QUERY)) {
+      buildScanExpression();
+    } else {
+      throw new IllegalArgumentException("Query type not supported");
     }
-    if (DynamoDBQuery.getType().equals(SCAN_QUERY))
-      buildScanExpression(hashAttrValue);
   }
 
   /**
-   * Builds scan query expression using a hash attribute value where to start
-   * @param pHashAttrValueHash attribute value where to start scanning
+   * Builds hash key attribute from generic query received.
+   * 
+   * @param qKey
+   * 
+   * @returnAttributeValue build from query
    */
-  public void buildScanExpression(AttributeValue pHashAttrValue){
-    DynamoDBScanExpression newScanExpression = new DynamoDBScanExpression();
-    // TODO right now we only support scanning using the key, but we should 
support other types of scans
-    
newScanExpression.addFilterCondition(getKeySchema().getHashKeyElement().getAttributeName(),
 buildKeyScanCondition());
-    dynamoDBExpression = newScanExpression;
+  private Map<String, AttributeValue> buildHashKey(K qKey) {
+    Map<String, AttributeValue> hashKey = new HashMap<>();
+    for (KeySchemaElement key : getKeySchema()) {
+      AttributeValue attr = new AttributeValue();
+      if (key.getKeyType().equals(KeyType.HASH.toString())) {
+        if (keyItems.get(key.getAttributeName()).equals("N")) {
+          attr.withN(getHashKey(qKey).toString());
+        } else if (keyItems.get(key.getAttributeName()).equals("S")) {
+          attr.withS(getHashKey(qKey).toString());
+        } else if (keyItems.get(key.getAttributeName()).equals("B")) {
+          
attr.withB(ByteBuffer.wrap(getHashKey(qKey).toString().getBytes(Charset.defaultCharset())));
+        } else {
+          throw new IllegalArgumentException("Data type not supported for "
+              + key.getAttributeName());
+        }
+        hashKey.put(key.getAttributeName(), attr);
+      }
+    }
+    if (hashKey.isEmpty()) {
+      throw new IllegalStateException("No key value has been defined.");
+    }
+    return hashKey;
   }
 
   /**
-   * Builds range query expression
-   * @param pNewConditionCondition for querying
-   * @param pHashAttrValueHash attribute value where to start
-   */
-  public void buildQueryExpression(Condition pNewCondition, AttributeValue 
pHashAttrValue) {
-    DynamoDBQueryExpression newQueryExpression = new 
DynamoDBQueryExpression(pHashAttrValue); 
-    newQueryExpression.setConsistentRead(getConsistencyReadLevel());
-    newQueryExpression.setRangeKeyCondition(pNewCondition);
-    dynamoDBExpression = newQueryExpression;
+   * Builds range key attribute from generic query received.
+   * 
+   * @param qKey
+   * 
+   * @return
+   */
+  private Map<String, AttributeValue> buildRangeKey(K qKey) {
+    Map<String, AttributeValue> kAttrs = new HashMap<>();
+    for (KeySchemaElement key : getKeySchema()) {
+      AttributeValue attr = new AttributeValue();
+      if (key.getKeyType().equals(KeyType.RANGE.toString())) {
+        if (keyItems.get(key.getAttributeName()).equals("N")) {
+          attr.withN(getRangeKey(qKey).toString());
+        } else if (keyItems.get(key.getAttributeName()).equals("S")) {
+          attr.withS(getRangeKey(qKey).toString());
+        } else if (keyItems.get(key.getAttributeName()).equals("B")) {
+          
attr.withB(ByteBuffer.wrap(getRangeKey(qKey).toString().getBytes(Charset.defaultCharset())));
+        } else {
+          throw new IllegalArgumentException("Data type not supported for "
+              + key.getAttributeName());
+        }
+        kAttrs.put(key.getAttributeName(), attr);
+      }
+    }
+    return kAttrs;
   }
 
   /**
-   * Builds hash key attribute from generic query received
-   * @returnAttributeValue build from query
-   */
-  private AttributeValue buildKeyHashAttribute(){
-    String pAttrType = getKeySchema().getHashKeyElement().getAttributeType();
-    if(pAttrType.equals("S"))
-      return new AttributeValue().withS(getHashKey(query.getKey()).toString());
-    else if(pAttrType.equals("N"))
-      return new AttributeValue().withN(getHashKey(query.getKey()).toString());
-    return null;
+   * Builds scan query expression using a hash attribute value where to start
+   * 
+   * @param pHashAttrValueHash
+   *          attribute value where to start scanning
+   */
+  public void buildScanExpression() {
+    K qKey = getKey();
+    if (qKey == null) {
+      LOG.warn("No key defined. Trying with startKey.");
+      qKey = query.getStartKey();
+      if (qKey == null) {
+        throw new IllegalStateException("No key has been defined please 
check");
+      }
+    }
+    ComparisonOperator compOp = getScanCompOp() != null ? getScanCompOp()
+        : DEFAULT_SCAN_OP;
+
+    DynamoDBScanExpression newScanExpression = new DynamoDBScanExpression();
+    // hash key condition
+    Map<String, AttributeValue> hashAttrVals = buildHashKey(qKey);
+    for (Entry<String, AttributeValue> en : hashAttrVals.entrySet()) {
+      Condition scanFilterHashCondition = new 
Condition().withComparisonOperator(
+          compOp.toString()).withAttributeValueList(en.getValue());
+      newScanExpression.addFilterCondition(en.getKey(), 
scanFilterHashCondition);
+    }
+    // range key condition
+    Map<String, AttributeValue> rangeAttrVals = buildRangeKey(qKey);
+    for (Entry<String, AttributeValue> en : rangeAttrVals.entrySet()) {
+      Condition scanFilterRangeCondition = new 
Condition().withComparisonOperator(
+          compOp.toString()).withAttributeValueList(en.getValue());
+      newScanExpression.addFilterCondition(en.getKey(), 
scanFilterRangeCondition);
+    }
+    dynamoDBExpression = newScanExpression;
+  }
+
+  /**
+   * Builds range query expression
+   * 
+   */
+  public void buildRangeExpression() {
+    DynamoDBScanExpression queryExpression = new DynamoDBScanExpression();
+    ComparisonOperator compOp = ComparisonOperator.BETWEEN;
+    // hash key range
+    Map<String, AttributeValue> hashAttrVals = 
buildHashKey(query.getStartKey());
+    Map<String, AttributeValue> endHashAttrVals = 
buildHashKey(query.getEndKey());
+    for (Entry<String, AttributeValue> en : hashAttrVals.entrySet()) {
+      Condition scanFilterHashCondition = new 
Condition().withComparisonOperator(
+          compOp.toString()).withAttributeValueList(en.getValue(), 
endHashAttrVals.get(en.getKey()));
+      queryExpression.addFilterCondition(en.getKey(), scanFilterHashCondition);
+    }
+    // range key range
+    Map<String, AttributeValue> rangeAttrVals = 
buildRangeKey(query.getStartKey());
+    Map<String, AttributeValue> endRangeAttrVals = 
buildRangeKey(query.getEndKey());
+    for (Entry<String, AttributeValue> en : rangeAttrVals.entrySet()) {
+      Condition scanFilterRangeCondition = new 
Condition().withComparisonOperator(
+          compOp.toString()).withAttributeValueList(en.getValue(), 
endRangeAttrVals.get(en.getKey()));
+      queryExpression.addFilterCondition(en.getKey(), 
scanFilterRangeCondition);
+    }
+    dynamoDBExpression = queryExpression;
   }
 
   /**
    * Gets hash key for querying
+   * 
    * @param key
    * @return
    */
   private Object getHashKey(K key){
     Object hashKey = null;
     try {
-    // Our key may be have hash and range keys
-    for (Method met :key.getClass().getDeclaredMethods()){
-      if(met.getName().equals("getHashKey")){
-        Object [] params = null;
-        hashKey = met.invoke(key, params);
-        break;
+      // Our key may be have hash and range keys
+      for (Method met : key.getClass().getDeclaredMethods()) {
+        if (met.getName().equals("getHashKey")) {
+          Object[] params = null;
+          hashKey = met.invoke(key, params);
+          break;
+        }
       }
+    } catch (IllegalArgumentException e) {
+      LOG.info("DynamoDBStore: Error while trying to fetch range key.", 
e.getMessage());
+      throw new IllegalArgumentException(e);
+    } catch (IllegalAccessException e) {
+      LOG.info("DynamoDBStore: Error while trying to fetch range key.", 
e.getMessage());
+      throw new RuntimeException(e);
+    } catch (InvocationTargetException e) {
+      LOG.info("DynamoDBStore: Error while trying to fetch range key.", 
e.getMessage());
+      throw new RuntimeException(e);
     }
-  } catch (IllegalArgumentException e) {
-    LOG.info("DynamoDBStore: Error while trying to fetch range key.");
-    e.printStackTrace();
-  } catch (IllegalAccessException e) {
-    LOG.info("DynamoDBStore: Error while trying to fetch range key.");
-    e.printStackTrace();
-  } catch (InvocationTargetException e) {
-    LOG.info("DynamoDBStore: Error while trying to fetch range key.");
-    e.printStackTrace();
-  }
-  return hashKey;
+    return hashKey;
   }
 
   /**
    * Gets range key for querying from generic query object received
+   * 
    * @param key
    * @return
    */
   private Object getRangeKey(K key){
     Object rangeKey = null;
     try {
-        // Our key may be have hash and range keys
+      // Our key may be have hash and range keys
       for (Method met :key.getClass().getDeclaredMethods()){
-      if(met.getName().equals("getRangeKey")){
-        Object [] params = null;
-        rangeKey = met.invoke(key, params);
-        break;
+        if(met.getName().equals("getRangeKey")){
+          Object [] params = null;
+          rangeKey = met.invoke(key, params);
+          break;
         }
       }
     } catch (IllegalArgumentException e) {
-      LOG.info("DynamoDBStore: Error while trying to fetch range key.");
-    e.printStackTrace();
+      LOG.info("DynamoDBStore: Error while trying to fetch range key.", 
e.getMessage());
+      throw new IllegalArgumentException(e);
     } catch (IllegalAccessException e) {
-    LOG.info("DynamoDBStore: Error while trying to fetch range key.");
-    e.printStackTrace();
+      LOG.info("DynamoDBStore: Error while trying to fetch range key.", 
e.getMessage());
+      throw new RuntimeException(e);
     } catch (InvocationTargetException e) {
-    LOG.info("DynamoDBStore: Error while trying to fetch range key.");
-    e.printStackTrace();
+      LOG.info("DynamoDBStore: Error while trying to fetch range key.", 
e.getMessage());
+      throw new RuntimeException(e);
     }
     return rangeKey;
   }
 
   /**
-   * Builds key scan condition using scan comparator, and hash key attribute
-   * @return
-   */
-  private Condition buildKeyScanCondition(){
-  Condition scanKeyCondition = new Condition();
-  scanKeyCondition.setComparisonOperator(getScanCompOp());
-  scanKeyCondition.withAttributeValueList(buildKeyHashAttribute());
-  return scanKeyCondition;
-  }
-
-  /**
-   * Builds range condition based on elements set 
-   * @return
-   */
-  private Condition buildRangeCondition(){
-  KeySchemaElement kRangeSchema = getKeySchema().getRangeKeyElement();
-  Condition rangeKeyCondition = null;
-  if(kRangeSchema != null){
-    rangeKeyCondition = new Condition();
-    
rangeKeyCondition.setComparisonOperator(ComparisonOperator.BETWEEN.toString());
-    AttributeValue startVal = null, endVal = null;
-    //startVal = buildKeyHashAttribute();
-    if(kRangeSchema.getAttributeType().equals("S")){
-      startVal = new 
AttributeValue().withS(getRangeKey(query.getStartKey()).toString());
-      endVal = new 
AttributeValue().withS(getRangeKey(query.getEndKey()).toString());
-    }
-    else if (kRangeSchema.getAttributeType().equals("N")){
-      startVal = new 
AttributeValue().withN(getRangeKey(query.getStartKey()).toString());
-      endVal = new 
AttributeValue().withN(getRangeKey(query.getEndKey()).toString());
-    }
-    rangeKeyCondition.withAttributeValueList(startVal, endVal);
-  }
-  return rangeKeyCondition;
-  }
-
-  /**
    * Gets read consistency level
+   * 
    * @return
    */
   public boolean getConsistencyReadLevel(){
@@ -281,6 +351,7 @@ public class DynamoDBQuery<K, T extends Persistent> extends 
QueryWSBase<K, T> {
 
   /**
    * Sets read consistency level
+   * 
    * @param pConsistencyReadLevel
    */
   public void setConsistencyReadLevel(boolean pConsistencyReadLevel){
@@ -289,14 +360,16 @@ public class DynamoDBQuery<K, T extends Persistent> 
extends QueryWSBase<K, T> {
 
   /**
    * Gets key schema
+   * 
    * @return
    */
-  public KeySchema getKeySchema(){
+  public ArrayList<KeySchemaElement> getKeySchema(){
     return keySchema;
   }
 
   /**
    * Gets query expression for query
+   * 
    * @return
    */
   public Object getQueryExpression(){
@@ -305,22 +378,26 @@ public class DynamoDBQuery<K, T extends Persistent> 
extends QueryWSBase<K, T> {
 
   /**
    * Sets query key schema used for queying
-   * @param pKeySchema
+   * 
+   * @param arrayList
    */
-  public void setKeySchema(KeySchema pKeySchema){
-    this.keySchema = pKeySchema;
+  public void setKeySchema(ArrayList<KeySchemaElement> arrayList) {
+    this.keySchema = arrayList;
   }
 
   /**
    * Sets query to be performed
+   * 
    * @param pQuery
    */
   public void setQuery(Query<K, T> pQuery){
-    this.query = pQuery;
+    this.setStartKey(query.getStartKey());
+    this.setEndKey(query.getEndKey());
   }
 
   /**
    * Gets query performed
+   * 
    * @return
    */
   public Query<K, T> getQuery(){
@@ -329,6 +406,7 @@ public class DynamoDBQuery<K, T extends Persistent> extends 
QueryWSBase<K, T> {
 
   /**
    * Gets query type
+   * 
    * @return
    */
   public static String getType() {
@@ -337,6 +415,7 @@ public class DynamoDBQuery<K, T extends Persistent> extends 
QueryWSBase<K, T> {
 
   /**
    * Sets query type
+   * 
    * @param pType
    */
   public static void setType(String pType) {
@@ -345,16 +424,16 @@ public class DynamoDBQuery<K, T extends Persistent> 
extends QueryWSBase<K, T> {
 
   /**
    * Gets scan comparator operator
+   * 
    * @return
    */
   public static ComparisonOperator getScanCompOp() {
-    if (scanCompOp == null)
-      scanCompOp = ComparisonOperator.GE;
     return scanCompOp;
   }
 
   /**
    * Sets scan query comparator operator
+   * 
    * @param scanCompOp
    */
   public static void setScanCompOp(ComparisonOperator scanCompOp) {
@@ -363,6 +442,7 @@ public class DynamoDBQuery<K, T extends Persistent> extends 
QueryWSBase<K, T> {
 
   /**
    * Gets range query comparator operator
+   * 
    * @return
    */
   public static ComparisonOperator getRangeCompOp(){
@@ -379,10 +459,19 @@ public class DynamoDBQuery<K, T extends Persistent> 
extends QueryWSBase<K, T> {
     rangeCompOp = pRangeCompOp;
   }
 
+  /**
+   * Sets the keyItems that could be used.
+   * 
+   * @param items
+   */
+  public void setKeyItems(Map<String, String> items) {
+    keyItems = items;
+  }
+
   @Override
   public void setFilter(Filter<K, T> filter) {
     // TODO Auto-generated method stub
-    
+
   }
 
   @Override
@@ -394,7 +483,7 @@ public class DynamoDBQuery<K, T extends Persistent> extends 
QueryWSBase<K, T> {
   @Override
   public void setLocalFilterEnabled(boolean enable) {
     // TODO Auto-generated method stub
-    
+
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/gora/blob/655cd3aa/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBAvroStore.java
----------------------------------------------------------------------
diff --git 
a/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBAvroStore.java
 
b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBAvroStore.java
new file mode 100644
index 0000000..98f64ae
--- /dev/null
+++ 
b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBAvroStore.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.dynamodb.store;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.PartitionQuery;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.store.impl.DataStoreBase;
+
+public class DynamoDBAvroStore<K, T extends PersistentBase> extends
+DataStoreBase<K, T> implements IDynamoDB<K, T> {
+
+  /**
+   * The values are Avro fields pending to be stored.
+   *
+   * We want to iterate over the keys in insertion order. We don't want to lock
+   * the entire collection before iterating over the keys, since in the 
meantime
+   * other threads are adding entries to the map.
+   */
+  private Map<K, T> buffer = Collections
+      .synchronizedMap(new LinkedHashMap<K, T>());
+
+  private DynamoDBStore<K, ? extends Persistent> dynamoDBStoreHandler;
+
+  /**
+   * Sets the handler to the main DynamoDB
+   * 
+   * @param DynamoDBStore
+   *          handler to main DynamoDB
+   */
+  @Override
+  public void setDynamoDBStoreHandler(DynamoDBStore<K, T> dynamoHandler) {
+    this.dynamoDBStoreHandler = dynamoHandler;
+  }
+
+  @Override
+  public void close() {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void createSchema() {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public boolean delete(K arg0) {
+    // TODO Auto-generated method stub
+    return false;
+  }
+
+  @Override
+  public long deleteByQuery(Query<K, T> arg0) {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  @Override
+  public void deleteSchema() {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public Result<K, T> execute(Query<K, T> arg0) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public void flush() {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public T get(K arg0, String[] arg1) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> arg0)
+      throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public String getSchemaName() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public Query<K, T> newQuery() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public void put(K key, T value) {
+    buffer.put(key, value);
+  }
+
+  @Override
+  public boolean schemaExists() {
+    // TODO Auto-generated method stub
+    return false;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/gora/blob/655cd3aa/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBFactory.java
----------------------------------------------------------------------
diff --git 
a/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBFactory.java
 
b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBFactory.java
new file mode 100644
index 0000000..589367e
--- /dev/null
+++ 
b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.dynamodb.store;
+
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DynamoDBFactory {
+
+  /** Helper to write useful information into the logs. */
+  public static final Logger LOG = LoggerFactory
+      .getLogger(DynamoDBFactory.class);
+
+  @SuppressWarnings("unchecked")
+  public static <K, T extends Persistent> IDynamoDB<K, T> buildDynamoDBStore(
+      DynamoDBUtils.DynamoDBType serType) {
+    final IDynamoDB<K, T> ds;
+    switch (serType) {
+      case DYNAMO:
+        ds = new DynamoDBNativeStore<K, T>();
+        LOG.debug("Using DynamoDB based serialization mode.");
+        break;
+      case AVRO:
+        ds = (IDynamoDB<K, T>) new DynamoDBAvroStore<K, PersistentBase>();
+        LOG.debug("Using Avro based serialization mode.");
+        break;
+      default:
+        throw new IllegalStateException("Serialization mode not supported.");
+    }
+    return ds;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/gora/blob/655cd3aa/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBMapping.java
----------------------------------------------------------------------
diff --git 
a/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBMapping.java
 
b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBMapping.java
index f6b23a9..2db5fd8 100644
--- 
a/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBMapping.java
+++ 
b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBMapping.java
@@ -18,90 +18,102 @@
 
 package org.apache.gora.dynamodb.store;
 
+import static 
org.apache.gora.dynamodb.store.DynamoDBUtils.DYNAMO_KEY_HASHRANGE;
+
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.amazonaws.services.dynamodb.model.KeySchema;
-import com.amazonaws.services.dynamodb.model.KeySchemaElement;
-import com.amazonaws.services.dynamodb.model.ProvisionedThroughput;
+import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
+import com.amazonaws.services.dynamodbv2.model.KeyType;
+import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
 
 public class DynamoDBMapping {
-  
+
   /**
    * Helper to write useful information into the logs
    */
   public static final Logger LOG = 
LoggerFactory.getLogger(DynamoDBMapping.class);
-  
+
   /**
    *  a map from field name to attribute value
    */
-  private final Map<String, List<Map<String, String>>> tablesToItems;
-  
+  private final Map<String, Map<String, String>> tablesToItems;
+
   /**
    * Maps tables to their own key schemas
    */
-  private final Map<String, KeySchema> tablesToKeySchemas;
-  
+  private final Map<String, ArrayList<KeySchemaElement>> tablesToKeySchemas;
+
   /**
    * Maps tables to their provisioned throughput
    */
   private final Map<String, ProvisionedThroughput> tablesToPrTh;
-  
+
   /**
-   * Constructor for DynamoDBMapping 
-   * @param tables     Tables mapped.
-   * @param tablesToKeySchemas KeySchemas used within tables mapped.
-   * @param provisionedThroughput      Provisioned throughput used within 
tables mapped.
+   * Constructor for DynamoDBMapping
+   * 
+   * @param tablesToItems2
+   *          Tables mapped.
+   * @param tablesToKeySchemas
+   *          KeySchemas used within tables mapped.
+   * @param provisionedThroughput
+   *          Provisioned throughput used within tables mapped.
    */
-  public DynamoDBMapping(Map<String, List<Map<String, String>>> tables,
-    Map<String, KeySchema> tablesToKeySchemas,
-    Map<String, ProvisionedThroughput> provisionedThroughput) {
-    
-    this.tablesToItems = tables;
+  public DynamoDBMapping(Map<String, Map<String, String>> tablesToItems2,
+      Map<String, ArrayList<KeySchemaElement>> tablesToKeySchemas,
+      Map<String, ProvisionedThroughput> provisionedThroughput) {
+
+    this.tablesToItems = tablesToItems2;
     this.tablesToKeySchemas = tablesToKeySchemas;
     this.tablesToPrTh = provisionedThroughput;
   }
 
   /**
    * Gets the tables with their own items
-   * @return tablesToItem HashMap 
+   * 
+   * @return tablesToItem 
+   *          HashMap 
    */
-  public Map<String,List<Map<String, String>>> getTables(){
+  public Map<String, Map<String, String>> getTables() {
     return tablesToItems;
   }
-  
+
   /**
    * Gets items or attributes from a specific table
-   * @param tableName  Table name to determine which attributes to get 
+   * 
+   * @param tableName
+   *          table name to determine which attributes to get 
    * @return
    */
-  public List<Map<String, String>> getItems(String tableName){
+  public Map<String, String> getItems(String tableName) {
     return tablesToItems.get(tableName);
   }
 
   /**
    * Gets the key schema from a specific table
-   * @param tableName  Table name to determine which key schema to get
+   * @param tableName
+   *          Table name to determine which key schema to get
    * @return
    */
-  public KeySchema getKeySchema(String tableName) {
+  public ArrayList<KeySchemaElement> getKeySchema(String tableName) {
     return tablesToKeySchemas.get(tableName);
   }
-  
+
   /**
    * Gets the provisioned throughput from a specific table
-   * @param tableName  Table name to determine which provisioned throughput to 
get
+   * 
+   * @param tableName
+   *          Table name to determine which provisioned throughput to get
    * @return
    */
   public ProvisionedThroughput getProvisionedThroughput(String tableName){
     return tablesToPrTh.get(tableName);
   }
-  
+
   /**
    * A builder for creating the mapper. This will allow building a thread safe
    * {@link DynamoDBMapping} using simple immutabilty.
@@ -110,199 +122,206 @@ public class DynamoDBMapping {
   public static class DynamoDBMappingBuilder {
 
     /**
-     * Table name to be used to build the DynamoDBMapping object 
-     */
-    private String tableName;
-  
-    /**
      * This data structure can hold several tables, with their own items.
      * Map<TableName, List<Map<AttributeName,AttributeType>>
      */
-    private Map<String, List<Map<String, String>>> tablesToItems = 
-      new HashMap<String, List<Map<String, String>>>();
+    private Map<String, Map<String, String>> tablesToItems = 
+        new HashMap<String, Map<String, String>>();
 
     /**
      * Maps tables to key schemas
      */
-    private Map<String, KeySchema> tablesToKeySchemas = new HashMap<String, 
KeySchema>();
+    private Map<String, ArrayList<KeySchemaElement>> tablesToKeySchemas = 
+        new HashMap<String, ArrayList<KeySchemaElement>>();
 
     /**
      * Maps tables to provisioned throughput
      */
-    private Map<String, ProvisionedThroughput> tablesToPrTh = new 
HashMap<String, ProvisionedThroughput>();
-  
-    /**
-     * Sets table name
-     * @param tabName
-     */
-    public void setTableName(String tabName){
-      tableName = tabName;
-    }
-  
+    private Map<String, ProvisionedThroughput> tablesToPrTh = 
+        new HashMap<String, ProvisionedThroughput>();
+
     /**
      * Gets the table name for which the table is being mapped
+     * 
      * @param tableName
      * @return
      */
     public String getTableName(String tableName){
       return tableName;
     }
-  
+
     /**
      * Sets the provisioned throughput for the specified table
+     * 
      * @param tableName
      * @param readCapUnits
      * @param writeCapUnits
      */
-    public void setProvisionedThroughput(String tableName, long readCapUnits, 
long writeCapUnits){
-      ProvisionedThroughput ptDesc = 
-      new 
ProvisionedThroughput().withReadCapacityUnits(readCapUnits).withWriteCapacityUnits(writeCapUnits);
-      tablesToPrTh.put(tableName, ptDesc);
+    public void setProvisionedThroughput(String tableName, long readCapUnits,
+        long writeCapUnits) {
+      ProvisionedThroughput ptDesc = new ProvisionedThroughput()
+          .withReadCapacityUnits(readCapUnits).withWriteCapacityUnits(
+              writeCapUnits);
     }
-  
+
     /**
      * Sets the hash range key schema for the specified table
+     * 
      * @param tableName
      * @param rangeKeyName
      * @param rangeKeyType
      */
-    public void setHashRangeKeySchema(String tableName, String rangeKeyName, 
String rangeKeyType){
-      KeySchema kSchema = tablesToKeySchemas.get(tableName);
-      if ( kSchema == null)
-        kSchema = new KeySchema();
-   
-      KeySchemaElement rangeKeyElement = new 
KeySchemaElement().withAttributeName(rangeKeyName).withAttributeType(rangeKeyType);
-      kSchema.setRangeKeyElement(rangeKeyElement);
-      tablesToKeySchemas.put(tableName, kSchema);
-    }
-  
+    // public void setHashRangeKeySchema(String tableName, String rangeKeyName,
+    // String rangeKeyType){
+    // KeySchemaElement kSchema = tablesToKeySchemas.get(tableName);
+    // if ( kSchema == null)
+    // kSchema = new KeySchemaElement();
+
+    // KeySchemaElement rangeKeyElement = new
+    // 
KeySchemaElement().withAttributeName(rangeKeyName).withKeyType(KeyType.RANGE).withKeyType(rangeKeyType);
+    // kSchema.
+    // kSchema.setRangeKeyElement(rangeKeyElement);
+    // tablesToKeySchemas.put(tableName, kSchema);
+    // }
+
     /**
      * Sets the hash key schema for the specified table
      * @param tableName
      * @param keyName
      * @param keyType
+     * @param keyType2
      */
-    public void setHashKeySchema(String tableName, String keyName, String 
keyType){
-      KeySchema kSchema = tablesToKeySchemas.get(tableName);
-        if ( kSchema == null)
-          kSchema = new KeySchema();
-        KeySchemaElement hashKey = new 
KeySchemaElement().withAttributeName(keyName).withAttributeType(keyType);
-        kSchema.setHashKeyElement(hashKey);
+    public void setKeySchema(String tableName, String keyName, String keyType) 
{
+      ArrayList<KeySchemaElement> kSchema = tablesToKeySchemas.get(tableName);
+      if (kSchema == null) {
+        kSchema = new ArrayList<KeySchemaElement>();
         tablesToKeySchemas.put(tableName, kSchema);
+      }
+      KeyType type = keyType.equals(DYNAMO_KEY_HASHRANGE) ? KeyType.RANGE : 
KeyType.HASH;
+      kSchema.add(new KeySchemaElement().withAttributeName(keyName)
+          .withKeyType(type));
     }
-  
+
     /**
-     * Checks if a table exists, and if doesn't exist it creates the new 
table. 
+     * Checks if a table exists, and if doesn't exist it creates the new table.
+     * 
      * @param tableName
      * @return The table identified by the parameter
      */
-    private List<Map<String, String>> getOrCreateTable(String tableName) {
-      
-      List<Map<String, String>> items = tablesToItems.get(tableName);
+    private Map<String, String> getOrCreateTable(String tableName) {
+      Map<String, String> items = tablesToItems.get(tableName);
       if (items == null) {
-        items = new ArrayList<Map<String, String>>();
+        items = new HashMap<String, String>();
         tablesToItems.put(tableName, items);
       }
       return items;
     }
-  
+
     /**
-     * Gets the attribute for a specific item. The idea is to be able to get 
different items with different attributes.
-     * TODO This method is incomplete because the itemNumber might not be 
present and this would be a problem
+     * Gets the attribute for a specific item. The idea is to be able to get 
+     * different items with different attributes.
+     * TODO This method is incomplete because the itemNumber might not 
+     * be present and this would be a problem
+     * 
      * @param items
      * @param itemNumber
      * @return
      */
-    private HashMap<String, String> getOrCreateItemAttribs(List<Map<String, 
String>> items, int itemNumber){
+    /*private HashMap<String, String> getOrCreateItemAttribs(
+        Map<String, String> items) {
       HashMap<String, String> itemAttribs;
-     
+
       if (items.isEmpty())
         items.add(new HashMap<String, String>());
-     
+
       itemAttribs = (HashMap<String, String>) items.get(itemNumber);
-      if (itemAttribs == null)
-        items.add(new HashMap<String, String>());
-        return (HashMap<String, String>) items.get(itemNumber);
-    }
-      
+      if (itemAttribs == null) {
+        itemAttribs = new HashMap<String, String>();
+      }
+
+      items.add(itemAttribs);
+      return null;
+    }*/
+
     /**
      * Adds an attribute to an specific item
+     * 
      * @param tableName
      * @param attributeName
      * @param attrType
      * @param itemNumber
      */
-     public void addAttribute(String tableName, String attributeName, String 
attrType, int itemNumber) {
-       // selecting table
-       List<Map<String, String>> items = getOrCreateTable(tableName);
-       // add attribute to item
-       HashMap<String, String> itemAttribs = getOrCreateItemAttribs(items, 
itemNumber);
-       itemAttribs.put(attributeName, attrType);
-       //items.add(itemAttribs);
-       // add item to table
-       //tablesToItems.put(tableName, items);
-     }
-  
+    public void addAttribute(String tableName, String attributeName,
+        String attrType) {
+      // selecting table
+      Map<String, String> items = getOrCreateTable(tableName);
+      // add attribute to item
+      //HashMap<String, String> itemAttribs = getOrCreateItemAttribs(items);
+      //itemAttribs.put(attributeName, attrType);
+      // add item to table
+      items.put(attributeName, attrType);
+      // tablesToItems.put(tableName, items);
+    }
+
     /**
      * Method to verify whether or not the schemas have been initialized
+     * 
      * @return
      */
-    private String verifyAllKeySchemas(){
-  
-      String wrongTable = "";
-      // if there are not tables defined
-      if (tablesToItems.isEmpty()) return "";
-        for(String tableName : tablesToItems.keySet()){
-          // if there are not schemas defined
-          if (tablesToKeySchemas.isEmpty()) return "";
-          if (!verifyKeySchema(tableName)) return "";
+    private boolean verifyAllKeySchemas() {
+      boolean rsl = true;
+      if (tablesToItems.isEmpty() || tablesToKeySchemas.isEmpty())
+        rsl = false;
+      for (String tableName : tablesToItems.keySet()) {
+        // if there are not schemas defined
+        if (tablesToKeySchemas.get(tableName) == null) {
+          LOG.error("No schema defined for DynamoDB table '" + tableName + 
'\'');
+          rsl = false;
         }
-      return wrongTable;
+        rsl = verifyKeySchema(tableName);
+      }
+      return rsl;
     }
-  
+
     /**
      * Verifies is a table has a key schema defined
+     * 
      * @param tableName        Table name to determine which key schema to 
obtain 
      * @return
      */
-    private boolean verifyKeySchema(String tableName){
-      KeySchema kSchema = tablesToKeySchemas.get(tableName);
-  
-      if (kSchema == null) 
-        return false;
-  
-      KeySchemaElement rangeKey = kSchema.getRangeKeyElement();
-      KeySchemaElement hashKey = kSchema.getHashKeyElement();
-      // A range key must have a hash key as well
-      
-      if (rangeKey != null){
-        if (hashKey != null)   
-          return true;
-        else     
-          return false;
+    private boolean verifyKeySchema(String tableName) {
+      ArrayList<KeySchemaElement> kSchema = tablesToKeySchemas.get(tableName);
+      boolean hashPk = false;
+      if (kSchema == null) {
+        LOG.error("No keys defined for '{}'. Please check your schema!", 
tableName);
+        return hashPk;
       }
-      // A hash key may exist by itself
-      if (hashKey != null)  
-        return true;
-      return false;
+      for (KeySchemaElement ks : kSchema) {
+        if (ks.getKeyType().equals(KeyType.HASH.toString())) {
+          hashPk = true;
+        }
+      }
+      return hashPk;
     }
-  
+
     /**
      * Constructs the DynamoDBMapping object
+     * 
      * @return A newly constructed mapping.
      */
     public DynamoDBMapping build() {
 
-      if (tableName == null) throw new IllegalStateException("tableName is not 
specified");
-        // verifying items for at least a table
-        if (tablesToItems.isEmpty()) throw new IllegalStateException("No 
tables");
-      
-        // verifying if key schemas have been properly defined
-        String wrongTableName = verifyAllKeySchemas();  
-        if (!wrongTableName.equals("")) throw new IllegalStateException("no 
key schemas defined for table " + wrongTableName);
-     
-        // Return the tableDescription and all the attributes needed
-            return new DynamoDBMapping(tablesToItems,tablesToKeySchemas, 
tablesToPrTh);
+      // verifying items for at least a table
+      if (tablesToItems.isEmpty())
+        throw new IllegalStateException("No tables were defined.");
+
+      // verifying if key schemas have been properly defined
+      if (!verifyAllKeySchemas())
+        throw new IllegalStateException("no key schemas defined for table ");
+
+      // Return the tableDescription and all the attributes needed
+      return new DynamoDBMapping(tablesToItems, tablesToKeySchemas,
+          tablesToPrTh);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/655cd3aa/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBNativeStore.java
----------------------------------------------------------------------
diff --git 
a/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBNativeStore.java
 
b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBNativeStore.java
new file mode 100644
index 0000000..6fe1742
--- /dev/null
+++ 
b/gora-dynamodb/src/main/java/org/apache/gora/dynamodb/store/DynamoDBNativeStore.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.dynamodb.store;
+
+import static org.apache.gora.dynamodb.store.DynamoDBUtils.WS_PROVIDER;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.lang.NullArgumentException;
+import org.apache.gora.dynamodb.query.DynamoDBKey;
+import org.apache.gora.dynamodb.query.DynamoDBQuery;
+import org.apache.gora.dynamodb.query.DynamoDBResult;
+import org.apache.gora.persistency.BeanFactory;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.PartitionQuery;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.store.ws.impl.WSDataStoreBase;
+import org.apache.gora.util.GoraException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapper;
+import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBScanExpression;
+
+public class DynamoDBNativeStore<K, T extends Persistent> extends 
+    WSDataStoreBase<K, T> implements IDynamoDB<K, T> {
+
+  /** Method's names for getting range and hash keys. */
+  private static final String GET_RANGE_KEY_METHOD = "getRangeKey";
+  private static final String GET_HASH_KEY_METHOD = "getHashKey";
+
+  /** Logger for {@link DynamoDBNativeStore}. */
+  public static final Logger LOG = LoggerFactory
+      .getLogger(DynamoDBNativeStore.class);
+
+  /** Handler to {@link DynamoDBStore} so common methods can be accessed. */
+  private DynamoDBStore<K, T> dynamoDBStoreHandler;
+
+  /**
+   * Deletes items using a specific query
+   * 
+   * @throws IOException
+   */
+  @Override
+  @SuppressWarnings("unchecked")
+  public long deleteByQuery(Query<K, T> query) {
+    // TODO verify whether or not we are deleting a whole row
+    // String[] fields = getFieldsToQuery(query.getFields());
+    // find whether all fields are queried, which means that complete
+    // rows will be deleted
+    // boolean isAllFields = Arrays.equals(fields
+    // , getBeanFactory().getCachedPersistent().getFields());
+    Result<K, T> result = execute(query);
+    ArrayList<T> deletes = new ArrayList<T>();
+    try {
+      while (result.next()) {
+        T resultObj = result.get();
+        deletes.add(resultObj);
+
+        @SuppressWarnings("rawtypes")
+        DynamoDBKey dKey = new DynamoDBKey();
+
+        dKey.setHashKey(getHashFromObj(resultObj));
+
+        dKey.setRangeKey(getRangeKeyFromObj(resultObj));
+        delete((K) dKey);
+      }
+    } catch (IllegalArgumentException e) {
+      LOG.error("Illegal argument detected", e.getMessage());
+      throw new IllegalArgumentException(e);
+    } catch (IllegalAccessException e) {
+      LOG.error("Illegal access detected", e.getMessage());
+      throw new IllegalAccessError(e.getMessage());
+    } catch (InvocationTargetException e) {
+      LOG.error(e.getMessage());
+      throw new RuntimeException(e);
+    } catch (Exception e) {
+      LOG.error(e.getMessage());
+      throw new RuntimeException(e);
+    }
+    return deletes.size();
+  }
+
+  /**
+   * Executes a query after building a DynamoDB specific query based on the
+   * received one
+   */
+  @Override
+  public Result<K, T> execute(Query<K, T> query) {
+    DynamoDBQuery<K, T> dynamoDBQuery = buildDynamoDBQuery(query);
+    DynamoDBMapper mapper = new DynamoDBMapper(
+        dynamoDBStoreHandler.getDynamoDbClient());
+    List<T> objList = null;
+    if (DynamoDBQuery.getType().equals(DynamoDBQuery.RANGE_QUERY))
+      objList = mapper.scan(persistentClass,
+          (DynamoDBScanExpression) dynamoDBQuery.getQueryExpression());
+    if (DynamoDBQuery.getType().equals(DynamoDBQuery.SCAN_QUERY))
+      objList = mapper.scan(persistentClass,
+          (DynamoDBScanExpression) dynamoDBQuery.getQueryExpression());
+    return new DynamoDBResult<K, T>(this, query, objList);
+  }
+
+  @Override
+  public T get(K key, String[] fields) {
+    /*
+     * DynamoDBQuery<K,T> query = new DynamoDBQuery<K,T>();
+     * query.setDataStore(this); //query.setKeyRange(key, key);
+     * //query.setFields(fields); //query.setLimit(1); Result<K,T> result =
+     * execute(query); boolean hasResult = result.next(); return hasResult ?
+     * result.get() : null;
+     */
+    return null;
+  }
+
+  @Override
+  /**
+   * Gets the object with the specific key
+   * @throws IOException
+   */
+  public T get(K key) {
+    T object = null;
+    try {
+      Object rangeKey;
+      rangeKey = getRangeKeyFromKey(key);
+      Object hashKey = getHashFromKey(key);
+      if (hashKey != null) {
+        DynamoDBMapper mapper = new DynamoDBMapper(
+            dynamoDBStoreHandler.getDynamoDbClient());
+        if (rangeKey != null)
+          object = mapper.load(persistentClass, hashKey, rangeKey);
+        else
+          object = mapper.load(persistentClass, hashKey);
+      } else
+        throw new GoraException("Error while retrieving keys from object: "
+            + key.toString());
+    } catch (IllegalArgumentException e) {
+      LOG.error("Illegal argument detected", e.getMessage());
+      throw new IllegalArgumentException(e);
+    } catch (IllegalAccessException e) {
+      LOG.error("Illegal access detected", e.getMessage());
+      throw new IllegalAccessError(e.getMessage());
+    } catch (InvocationTargetException e) {
+      LOG.error(e.getMessage());
+      throw new RuntimeException(e);
+    } catch (GoraException ge) {
+      LOG.error(ge.getMessage());
+      LOG.error(ge.getStackTrace().toString());
+    }
+    return object;
+  }
+
+  /**
+   * Creates a new DynamoDBQuery
+   */
+  public Query<K, T> newQuery() {
+    Query<K, T> query = new DynamoDBQuery<K, T>(this);
+    // query.setFields(getFieldsToQuery(null));
+    return query;
+  }
+
+  /**
+   * Returns a new instance of the key object.
+   * 
+   * @throws IOException
+   */
+  @Override
+  public K newKey() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  /**
+   * Returns a new persistent object
+   * 
+   * @throws IOException
+   */
+  @Override
+  public T newPersistent() {
+    T obj = null;
+    try {
+      obj = persistentClass.newInstance();
+    } catch (InstantiationException e) {
+      LOG.error("Error instantiating " + persistentClass.getCanonicalName());
+      throw new InstantiationError(e.getMessage());
+    } catch (IllegalAccessException e) {
+      LOG.error("Error instantiating " + persistentClass.getCanonicalName());
+      throw new IllegalAccessError(e.getMessage());
+    }
+    return obj;
+  }
+
+  /**
+   * Puts an object identified by a key
+   * 
+   * @throws IOException
+   */
+  @Override
+  public void put(K key, T obj) {
+    try {
+      Object hashKey = getHashKey(key, obj);
+      Object rangeKey = getRangeKey(key, obj);
+      if (hashKey != null) {
+        DynamoDBMapper mapper = new DynamoDBMapper(
+            dynamoDBStoreHandler.getDynamoDbClient());
+        if (rangeKey != null) {
+          mapper.load(persistentClass, hashKey, rangeKey);
+        } else {
+          mapper.load(persistentClass, hashKey);
+        }
+        mapper.save(obj);
+      } else
+        throw new GoraException("No HashKey found in Key nor in Object.");
+    } catch (NullPointerException npe) {
+      LOG.error("Error while putting an item. " + npe.toString());
+      throw new NullArgumentException(npe.getMessage());
+    } catch (Exception e) {
+      LOG.error("Error while putting an item. " + obj.toString());
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Deletes the object using key
+   * 
+   * @return true for a successful process
+   * @throws IOException
+   */
+  @Override
+  public boolean delete(K key) {
+    try {
+      T object = null;
+      Object rangeKey = null, hashKey = null;
+      DynamoDBMapper mapper = new DynamoDBMapper(
+          dynamoDBStoreHandler.getDynamoDbClient());
+      for (Method met : key.getClass().getDeclaredMethods()) {
+        if (met.getName().equals(GET_RANGE_KEY_METHOD)) {
+          Object[] params = null;
+          rangeKey = met.invoke(key, params);
+          break;
+        }
+      }
+      for (Method met : key.getClass().getDeclaredMethods()) {
+        if (met.getName().equals(GET_HASH_KEY_METHOD)) {
+          Object[] params = null;
+          hashKey = met.invoke(key, params);
+          break;
+        }
+      }
+      if (hashKey == null)
+        object = (T) mapper.load(persistentClass, key);
+      if (rangeKey == null)
+        object = (T) mapper.load(persistentClass, hashKey);
+      else
+        object = (T) mapper.load(persistentClass, hashKey, rangeKey);
+
+      if (object == null)
+        return false;
+
+      // setting key for dynamodbMapper
+      mapper.delete(object);
+      return true;
+    } catch (Exception e) {
+      LOG.error("Error while deleting value with key " + key.toString());
+      LOG.error(e.getMessage());
+      return false;
+    }
+  }
+
+  /**
+   * Initialize the data store by reading the credentials, setting the cloud
+   * provider, setting the client's properties up, setting the end point and
+   * reading the mapping file
+   */
+  public void initialize(Class<K> keyClass, Class<T> pPersistentClass,
+      Properties properties) {
+    super.initialize(keyClass, pPersistentClass, properties);
+    setWsProvider(WS_PROVIDER);
+    if (autoCreateSchema) {
+      createSchema();
+    }
+  }
+
+  /**
+   * Builds a DynamoDB query from a generic Query object
+   * 
+   * @param query
+   *          Generic query object
+   * @return DynamoDBQuery
+   */
+  private DynamoDBQuery<K, T> buildDynamoDBQuery(Query<K, T> query) {
+    if (getSchemaName() == null)
+      throw new IllegalStateException("There is not a preferred schema set.");
+
+    DynamoDBQuery<K, T> dynamoDBQuery = new DynamoDBQuery<K, T>();
+    dynamoDBQuery.setKeySchema(dynamoDBStoreHandler.getDynamoDbMapping()
+        .getKeySchema(getSchemaName()));
+    
dynamoDBQuery.setKeyItems(dynamoDBStoreHandler.getDynamoDbMapping().getItems(getSchemaName()));
+    dynamoDBQuery.setQuery(query);
+    dynamoDBQuery.setConsistencyReadLevel(dynamoDBStoreHandler
+        .getConsistencyReads());
+    dynamoDBQuery.buildExpression();
+
+    return dynamoDBQuery;
+  }
+
+  @Override
+  public void close() {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void flush() {
+    LOG.warn("DynamoDBNativeStore puts and gets directly into the datastore");
+  }
+
+  @Override
+  public BeanFactory<K, T> getBeanFactory() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> arg0)
+      throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public void setBeanFactory(BeanFactory<K, T> arg0) {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public void createSchema() {
+    LOG.info("Creating Native DynamoDB Schemas.");
+    if (dynamoDBStoreHandler.getDynamoDbMapping().getTables().isEmpty()) {
+      throw new IllegalStateException("There are not tables defined.");
+    }
+    if (dynamoDBStoreHandler.getPreferredSchema() == null) {
+      LOG.debug("Creating schemas.");
+      // read the mapping object
+      for (String tableName : dynamoDBStoreHandler.getDynamoDbMapping()
+          .getTables().keySet())
+        DynamoDBUtils.executeCreateTableRequest(
+            dynamoDBStoreHandler.getDynamoDbClient(), tableName,
+            dynamoDBStoreHandler.getTableKeySchema(tableName),
+            dynamoDBStoreHandler.getTableAttributes(tableName),
+            dynamoDBStoreHandler.getTableProvisionedThroughput(tableName));
+      LOG.debug("tables created successfully.");
+    } else {
+      String tableName = dynamoDBStoreHandler.getPreferredSchema();
+      LOG.debug("Creating schema " + tableName);
+      DynamoDBUtils.executeCreateTableRequest(
+          dynamoDBStoreHandler.getDynamoDbClient(), tableName,
+          dynamoDBStoreHandler.getTableKeySchema(tableName),
+          dynamoDBStoreHandler.getTableAttributes(tableName),
+          dynamoDBStoreHandler.getTableProvisionedThroughput(tableName));
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * 
+   * @see
+   * 
org.apache.gora.dynamodb.store.IDynamoDB#setDynamoDBStoreHandler(org.apache
+   * .gora.dynamodb.store.DynamoDBStore)
+   */
+  @Override
+  public void setDynamoDBStoreHandler(DynamoDBStore<K, T> dynamoHandler) {
+    this.dynamoDBStoreHandler = dynamoHandler;
+  }
+
+  @Override
+  public void deleteSchema() {
+    // TODO Auto-generated method stub
+
+  }
+
+  @Override
+  public String getSchemaName() {
+    return this.dynamoDBStoreHandler.getSchemaName();
+  }
+
+  @Override
+  public boolean schemaExists() {
+    return this.dynamoDBStoreHandler.schemaExists();
+  }
+
+  private Object getHashKey(K key, T obj) throws IllegalArgumentException,
+  IllegalAccessException, InvocationTargetException {
+    // try to get the hashKey from 'key'
+    Object hashKey = getHashFromKey(key);
+    // if the key does not have these attributes then try to get them from the
+    // object
+    if (hashKey == null)
+      hashKey = getHashFromObj(obj);
+    // if no key has been found, then we try with the key
+    if (hashKey == null)
+      hashKey = key;
+    return hashKey;
+  }
+
+  /**
+   * Gets a hash key from a key of type K
+   * 
+   * @param obj
+   *          Object from which we will get a hash key
+   * @return
+   * @throws IllegalArgumentException
+   * @throws IllegalAccessException
+   * @throws InvocationTargetException
+   */
+  private Object getHashFromKey(K obj) throws IllegalArgumentException,
+  IllegalAccessException, InvocationTargetException {
+    Object hashKey = null;
+    // check if it is a DynamoDBKey
+    if (obj instanceof DynamoDBKey) {
+      hashKey = ((DynamoDBKey<?, ?>) obj).getHashKey();
+    } else {
+      // maybe the class has the method defined
+      for (Method met : obj.getClass().getDeclaredMethods()) {
+        if (met.getName().equals(GET_HASH_KEY_METHOD)) {
+          Object[] params = null;
+          hashKey = met.invoke(obj, params);
+          break;
+        }
+      }
+    }
+    return hashKey;
+  }
+
+  /**
+   * Gets a hash key from an object of type T
+   * 
+   * @param obj
+   *          Object from which we will get a hash key
+   * @return
+   * @throws IllegalArgumentException
+   * @throws IllegalAccessException
+   * @throws InvocationTargetException
+   */
+  private Object getHashFromObj(T obj) throws IllegalArgumentException,
+  IllegalAccessException, InvocationTargetException {
+    Object hashKey = null;
+    // check if it is a DynamoDBKey
+    if (obj instanceof DynamoDBKey) {
+      hashKey = ((DynamoDBKey<?, ?>) obj).getHashKey();
+    } else {
+      // maybe the class has the method defined
+      for (Method met : obj.getClass().getDeclaredMethods()) {
+        if (met.getName().equals(GET_HASH_KEY_METHOD)) {
+          Object[] params = null;
+          hashKey = met.invoke(obj, params);
+          break;
+        }
+      }
+    }
+    return hashKey;
+  }
+
+  private Object getRangeKey(K key, T obj) throws IllegalArgumentException,
+  IllegalAccessException, InvocationTargetException {
+    Object rangeKey = getRangeKeyFromKey(key);
+    if (rangeKey == null)
+      rangeKey = getRangeKeyFromObj(obj);
+    return rangeKey;
+  }
+
+  /**
+   * Gets a range key from a key obj. This verifies if it is using a
+   * {@link DynamoDBKey}
+   * 
+   * @param obj
+   *          Object from which a range key will be extracted
+   * @return
+   * @throws IllegalArgumentException
+   * @throws IllegalAccessException
+   * @throws InvocationTargetException
+   */
+  private Object getRangeKeyFromKey(K obj) throws IllegalArgumentException,
+  IllegalAccessException, InvocationTargetException {
+    Object rangeKey = null;
+    // check if it is a DynamoDBKey
+    if (obj instanceof DynamoDBKey) {
+      rangeKey = ((DynamoDBKey<?, ?>) obj).getRangeKey();
+    } else {
+      // maybe the class has the method defined
+      for (Method met : obj.getClass().getDeclaredMethods()) {
+        if (met.getName().equals(GET_RANGE_KEY_METHOD)) {
+          Object[] params = null;
+          rangeKey = met.invoke(obj, params);
+          break;
+        }
+      }
+    }
+    return rangeKey;
+  }
+
+  /**
+   * Gets a range key from an object T
+   * 
+   * @param obj
+   *          Object from which a range key will be extracted
+   * @return
+   * @throws IllegalArgumentException
+   * @throws IllegalAccessException
+   * @throws InvocationTargetException
+   */
+  private Object getRangeKeyFromObj(T obj) throws IllegalArgumentException,
+  IllegalAccessException, InvocationTargetException {
+    Object rangeKey = null;
+    // check if it is a DynamoDBKey
+    if (obj instanceof DynamoDBKey) {
+      rangeKey = ((DynamoDBKey<?, ?>) obj).getRangeKey();
+    } else {
+      // maybe the class has the method defined
+      for (Method met : obj.getClass().getDeclaredMethods()) {
+        if (met.getName().equals(GET_RANGE_KEY_METHOD)) {
+          Object[] params = null;
+          rangeKey = met.invoke(obj, params);
+          break;
+        }
+      }
+    }
+    return rangeKey;
+  }
+
+}
\ No newline at end of file

Reply via email to