Add updateByQuery for native serialization

Project: http://git-wip-us.apache.org/repos/asf/gora/repo
Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/cc452f8d
Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/cc452f8d
Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/cc452f8d

Branch: refs/heads/master
Commit: cc452f8dc0d864c50279dfa61d56e57e5e40a740
Parents: 4ebfabb
Author: madhawa <madhaw...@gmail.com>
Authored: Sat Jul 8 15:02:54 2017 +0530
Committer: madhawa <madhaw...@gmail.com>
Committed: Sat Jul 8 15:02:54 2017 +0530

----------------------------------------------------------------------
 .../gora/cassandra/query/CassandraQuery.java    | 35 ++++++++----
 .../cassandra/serializers/AvroSerializer.java   |  4 ++
 .../serializers/CassandraQueryFactory.java      | 60 ++++++++++++++++++--
 .../serializers/CassandraSerializer.java        |  2 +
 .../cassandra/serializers/NativeSerializer.java | 13 +++++
 .../gora/cassandra/store/CassandraStore.java    | 19 +++----
 ...stCassandraStoreWithNativeSerialization.java | 22 +++++++
 7 files changed, 129 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/gora/blob/cc452f8d/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
 
b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
index 251e9df..c3f2e81 100644
--- 
a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
+++ 
b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,22 +20,21 @@ package org.apache.gora.cassandra.query;
 
 import org.apache.gora.filter.Filter;
 import org.apache.gora.persistency.Persistent;
-import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.Query;
-import org.apache.gora.query.Result;
-import org.apache.gora.query.impl.QueryBase;
 import org.apache.gora.query.ws.impl.QueryWSBase;
 import org.apache.gora.store.DataStore;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.io.Writable;
+
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Cassandra specific implementation of the {@link Query} interface.
  */
-public class CassandraQuery<K, T extends Persistent> extends QueryWSBase<K,T> {
+public class CassandraQuery<K, T extends Persistent> extends QueryWSBase<K, T> 
{
 
   protected Filter<K, T> filter;
-  protected boolean localFilterEnabled=true;
+  protected boolean localFilterEnabled = true;
+  protected Map<String, Object> updateFields = new HashMap<>();
 
   public CassandraQuery(DataStore<K, T> dataStore) {
     super(dataStore);
@@ -61,7 +60,21 @@ public class CassandraQuery<K, T extends Persistent> extends 
QueryWSBase<K,T> {
     localFilterEnabled = enable;
   }
 
-  public void addUpdateField(String field, Object oldValue, Object newValue) {
+  public void addUpdateField(String field, Object newValue) {
+    updateFields.put(field, newValue);
+  }
 
+  public Object getUpdateFieldValue(String key) {
+    return updateFields.get(key);
+  }
+
+  @Override
+  public String[] getFields() {
+    if(updateFields.size() == 0) {
+      return super.getFields();
+    } else {
+      String[] updateFieldsArray = new String[updateFields.size()];
+      return updateFields.keySet().toArray(updateFieldsArray);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/cc452f8d/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
 
b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
index 83676dc..3b626a4 100644
--- 
a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
+++ 
b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java
@@ -80,4 +80,8 @@ class AvroSerializer<K, T extends PersistentBase> extends 
CassandraSerializer {
     return 0;
   }
 
+  @Override
+  public boolean updateByQuery(Query query) {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/cc452f8d/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
 
b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
index 2a980e5..ebb7c20 100644
--- 
a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
+++ 
b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java
@@ -19,11 +19,13 @@ package org.apache.gora.cassandra.serializers;
 import com.datastax.driver.core.querybuilder.Delete;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
 import com.datastax.driver.core.querybuilder.Select;
+import com.datastax.driver.core.querybuilder.Update;
 import org.apache.gora.cassandra.bean.CassandraKey;
 import org.apache.gora.cassandra.bean.ClusterKeyField;
 import org.apache.gora.cassandra.bean.Field;
 import org.apache.gora.cassandra.bean.KeySpace;
 import org.apache.gora.cassandra.bean.PartitionKeyField;
+import org.apache.gora.cassandra.query.CassandraQuery;
 import org.apache.gora.cassandra.query.CassandraRow;
 import org.apache.gora.cassandra.store.CassandraMapping;
 import org.apache.gora.query.Query;
@@ -249,10 +251,6 @@ class CassandraQueryFactory {
     return query;
   }
 
-//  static <T> String getUpdateDataQuery(CassandraMapping mapping, T obj) {
-////    
QueryBuilder.update(mapping.getKeySpace().getName(),mapping.getCoreName()).
-//  }
-
   static <K> String getObjectWithFieldsQuery(CassandraMapping mapping, 
String[] fields, K key, List<Object> objects) {
     String cqlQuery = null;
     Select select = 
QueryBuilder.select(fields).from(mapping.getKeySpace().getName(), 
mapping.getCoreName());
@@ -363,7 +361,7 @@ class CassandraQueryFactory {
 
   static String getDeleteByQuery(CassandraMapping mapping, Query 
cassandraQuery, List<Object> objects) {
     String[] columns = null;
-    if(!Arrays.equals(cassandraQuery.getFields(), mapping.getFieldNames())) {
+    if (!Arrays.equals(cassandraQuery.getFields(), mapping.getFieldNames())) {
       columns = getColumnNames(mapping, cassandraQuery.getFields());
     }
     Object startKey = cassandraQuery.getStartKey();
@@ -413,4 +411,56 @@ class CassandraQueryFactory {
     return query.getQueryString();
   }
 
+  static String getUpdateByQuery(CassandraMapping mapping, Query 
cassandraQuery, List<Object> objects) {
+    Update update = QueryBuilder.update(mapping.getKeySpace().getName(), 
mapping.getCoreName());
+    Update.Assignments updateAssignments = null;
+    if (cassandraQuery instanceof CassandraQuery) {
+      String[] fields = cassandraQuery.getFields();
+      String[] columnNames = getColumnNames(mapping, fields);
+      for (String column : columnNames) {
+        updateAssignments = update.with(QueryBuilder.set(column, "?"));
+        objects.add(((CassandraQuery) 
cassandraQuery).getUpdateFieldValue(column));
+      }
+    }
+    String primaryKey = null;
+    Update.Where query = null;
+    Object startKey = cassandraQuery.getStartKey();
+    Object endKey = cassandraQuery.getEndKey();
+    Object key = cassandraQuery.getKey();
+    boolean isWhereNeeded = true;
+    if (key != null) {
+      primaryKey = getPKey(mapping.getFieldList());
+      query = updateAssignments.where(QueryBuilder.eq(primaryKey, "?"));
+      objects.add(key);
+    } else {
+      if (startKey != null) {
+        if (mapping.getCassandraKey() != null) {
+//todo avro serialization
+        } else {
+          primaryKey = getPKey(mapping.getFieldList());
+          query = updateAssignments.where(QueryBuilder.gte(primaryKey, "?"));
+          objects.add(startKey);
+          isWhereNeeded = false;
+        }
+      }
+      if (endKey != null) {
+        if (mapping.getCassandraKey() != null) {
+//todo avro serialization
+        } else {
+          primaryKey = primaryKey != null ? primaryKey : 
getPKey(mapping.getFieldList());
+          if (isWhereNeeded) {
+            query = updateAssignments.where(QueryBuilder.lte(primaryKey, "?"));
+          } else {
+            query = query.and(QueryBuilder.lte(primaryKey, "?"));
+          }
+          objects.add(endKey);
+        }
+      }
+    }
+    if (startKey == null && endKey == null && key == null) {
+      return updateAssignments.getQueryString();
+    }
+    return query.getQueryString();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/gora/blob/cc452f8d/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
 
b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
index e125a33..237bd8a 100644
--- 
a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
+++ 
b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java
@@ -123,4 +123,6 @@ public abstract class CassandraSerializer<K, T extends 
Persistent> {
 
   public abstract long deleteByQuery(Query<K, T> query);
 
+  public abstract boolean updateByQuery(Query<K, T> query);
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/gora/blob/cc452f8d/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
 
b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
index 2bac3dd..6f64fa2 100644
--- 
a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
+++ 
b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java
@@ -128,6 +128,19 @@ class NativeSerializer<K, T extends 
CassandraNativePersistent> extends Cassandra
     mapper = mappingManager.mapper(persistentClass);
   }
 
+  @Override
+  public boolean updateByQuery(Query query) {
+    List<Object> objectArrayList = new ArrayList<>();
+    String cqlQuery = CassandraQueryFactory.getUpdateByQuery(mapping, query, 
objectArrayList);
+    ResultSet results;
+    if (objectArrayList.size() == 0) {
+      results = client.getSession().execute(cqlQuery);
+    } else {
+      results = client.getSession().execute(cqlQuery, 
objectArrayList.toArray());
+    }
+    return results.wasApplied();
+  }
+
   private K getKey(T object) {
     String keyField = null;
     for (Field field : mapping.getFieldList()) {

http://git-wip-us.apache.org/repos/asf/gora/blob/cc452f8d/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
 
b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
index 8a100aa..c2ea388 100644
--- 
a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
+++ 
b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java
@@ -25,7 +25,6 @@ import org.apache.gora.persistency.Persistent;
 import org.apache.gora.query.PartitionQuery;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
-import org.apache.gora.query.impl.PartitionQueryImpl;
 import org.apache.gora.query.ws.impl.PartitionWSQueryImpl;
 import org.apache.gora.store.DataStore;
 import org.apache.gora.store.DataStoreFactory;
@@ -93,7 +92,7 @@ public class CassandraStore<K, T extends Persistent> 
implements DataStore<K, T>
       cassandraClient.initialize(properties);
       cassandraSerializer = CassandraSerializer.getSerializer(cassandraClient, 
properties.getProperty(CassandraStoreParameters.CASSANDRA_SERIALIZATION_TYPE), 
keyClass, persistentClass, mapping);
     } catch (Exception e) {
-      throw new RuntimeException("Error while initializing Cassandra store: "+ 
e.getMessage(),e);
+      throw new RuntimeException("Error while initializing Cassandra store: " 
+ e.getMessage(), e);
     }
   }
 
@@ -153,7 +152,7 @@ public class CassandraStore<K, T extends Persistent> 
implements DataStore<K, T>
         return keyClass.newInstance();
       }
     } catch (Exception ex) {
-      throw new RuntimeException("Error while instantiating a key: "+ 
ex.getMessage(),ex);
+      throw new RuntimeException("Error while instantiating a key: " + 
ex.getMessage(), ex);
     }
   }
 
@@ -168,7 +167,7 @@ public class CassandraStore<K, T extends Persistent> 
implements DataStore<K, T>
         return persistentClass.newInstance();
       }
     } catch (Exception ex) {
-      throw new RuntimeException("Error while instantiating a persistent: "+ 
ex.getMessage(),ex);
+      throw new RuntimeException("Error while instantiating a persistent: " + 
ex.getMessage(), ex);
     }
   }
 
@@ -214,23 +213,23 @@ public class CassandraStore<K, T extends Persistent> 
implements DataStore<K, T>
 
   @Override
   public Result<K, T> execute(Query<K, T> query) {
-    return (Result<K,T>) cassandraSerializer.execute(this, query);
+    return (Result<K, T>) cassandraSerializer.execute(this, query);
   }
 
-  public void updateByQuery(Query<K,T> query) {
-
+  public boolean updateByQuery(Query<K, T> query) {
+    return cassandraSerializer.updateByQuery(query);
   }
 
   @Override
   public Query<K, T> newQuery() {
-    Query<K,T> query = new CassandraQuery(this);
+    Query<K, T> query = new CassandraQuery(this);
     query.setFields(mapping.getFieldNames());
     return query;
   }
 
   @Override
   public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws 
IOException {
-    List<PartitionQuery<K,T>> partitions = new ArrayList<>();
+    List<PartitionQuery<K, T>> partitions = new ArrayList<>();
     PartitionWSQueryImpl<K, T> pqi = new PartitionWSQueryImpl<>(query);
     pqi.setDataStore(this);
     partitions.add(pqi);
@@ -239,7 +238,7 @@ public class CassandraStore<K, T extends Persistent> 
implements DataStore<K, T>
 
   @Override
   public void flush() {
- // ignore since caching has been disabled
+    // ignore since caching has been disabled
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/gora/blob/cc452f8d/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java
----------------------------------------------------------------------
diff --git 
a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java
 
b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java
index b2ac3c1..1fc76c6 100644
--- 
a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java
+++ 
b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java
@@ -19,6 +19,7 @@ package org.apache.gora.cassandra.store;
 
 import org.apache.gora.cassandra.GoraCassandraTestDriver;
 import org.apache.gora.cassandra.example.generated.nativeSerialization.User;
+import org.apache.gora.cassandra.query.CassandraQuery;
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
 import org.apache.gora.store.DataStore;
@@ -237,4 +238,25 @@ public class TestCassandraStoreWithNativeSerialization {
     Assert.assertNull(partialDeletedUser.getName());
     
Assert.assertEquals(partialDeletedUser.getDateOfBirth(),user2.getDateOfBirth());
   }
+
+  @Test
+  public void testUpdateByQuery() {
+    userDataStore.truncateSchema();
+    UUID id1 = UUID.randomUUID();
+    User user1 = new User(id1, "user1", Date.from(Instant.now()));
+    userDataStore.put(id1, user1);
+    UUID id2 = UUID.randomUUID();
+    User user2 = new User(id2, "user2", Date.from(Instant.now()));
+    userDataStore.put(id2, user2);
+    Query<UUID, User> query1 = userDataStore.newQuery();
+    if(query1 instanceof CassandraQuery) {
+      ((CassandraQuery) query1).addUpdateField("name", "madhawa");
+    }
+    query1.setKey(id1);
+    if(userDataStore instanceof CassandraStore) {
+      ((CassandraStore) userDataStore).updateByQuery(query1);
+    }
+    User user = userDataStore.get(id1);
+    Assert.assertEquals(user.getName(),"madhawa");
+  }
 }

Reply via email to