Add updateByQuery for native serialization
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/cc452f8d Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/cc452f8d Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/cc452f8d Branch: refs/heads/master Commit: cc452f8dc0d864c50279dfa61d56e57e5e40a740 Parents: 4ebfabb Author: madhawa <madhaw...@gmail.com> Authored: Sat Jul 8 15:02:54 2017 +0530 Committer: madhawa <madhaw...@gmail.com> Committed: Sat Jul 8 15:02:54 2017 +0530 ---------------------------------------------------------------------- .../gora/cassandra/query/CassandraQuery.java | 35 ++++++++---- .../cassandra/serializers/AvroSerializer.java | 4 ++ .../serializers/CassandraQueryFactory.java | 60 ++++++++++++++++++-- .../serializers/CassandraSerializer.java | 2 + .../cassandra/serializers/NativeSerializer.java | 13 +++++ .../gora/cassandra/store/CassandraStore.java | 19 +++---- ...stCassandraStoreWithNativeSerialization.java | 22 +++++++ 7 files changed, 129 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/cc452f8d/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java index 251e9df..c3f2e81 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,22 +20,21 @@ package org.apache.gora.cassandra.query; import org.apache.gora.filter.Filter; import org.apache.gora.persistency.Persistent; -import org.apache.gora.persistency.impl.PersistentBase; import org.apache.gora.query.Query; -import org.apache.gora.query.Result; -import org.apache.gora.query.impl.QueryBase; import org.apache.gora.query.ws.impl.QueryWSBase; import org.apache.gora.store.DataStore; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.io.Writable; + +import java.util.HashMap; +import java.util.Map; /** * Cassandra specific implementation of the {@link Query} interface. */ -public class CassandraQuery<K, T extends Persistent> extends QueryWSBase<K,T> { +public class CassandraQuery<K, T extends Persistent> extends QueryWSBase<K, T> { protected Filter<K, T> filter; - protected boolean localFilterEnabled=true; + protected boolean localFilterEnabled = true; + protected Map<String, Object> updateFields = new HashMap<>(); public CassandraQuery(DataStore<K, T> dataStore) { super(dataStore); @@ -61,7 +60,21 @@ public class CassandraQuery<K, T extends Persistent> extends QueryWSBase<K,T> { localFilterEnabled = enable; } - public void addUpdateField(String field, Object oldValue, Object newValue) { + public void addUpdateField(String field, Object newValue) { + updateFields.put(field, newValue); + } + public Object getUpdateFieldValue(String key) { + return updateFields.get(key); + } + + @Override + public String[] getFields() { + if(updateFields.size() == 0) { + return super.getFields(); + } else { + String[] updateFieldsArray = new String[updateFields.size()]; + return updateFields.keySet().toArray(updateFieldsArray); + } } } http://git-wip-us.apache.org/repos/asf/gora/blob/cc452f8d/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java index 83676dc..3b626a4 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java @@ -80,4 +80,8 @@ class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { return 0; } + @Override + public boolean updateByQuery(Query query) { + return false; + } } http://git-wip-us.apache.org/repos/asf/gora/blob/cc452f8d/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java index 2a980e5..ebb7c20 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java @@ -19,11 +19,13 @@ package org.apache.gora.cassandra.serializers; import com.datastax.driver.core.querybuilder.Delete; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.core.querybuilder.Select; +import com.datastax.driver.core.querybuilder.Update; import org.apache.gora.cassandra.bean.CassandraKey; import org.apache.gora.cassandra.bean.ClusterKeyField; import org.apache.gora.cassandra.bean.Field; import org.apache.gora.cassandra.bean.KeySpace; import org.apache.gora.cassandra.bean.PartitionKeyField; +import org.apache.gora.cassandra.query.CassandraQuery; import org.apache.gora.cassandra.query.CassandraRow; import org.apache.gora.cassandra.store.CassandraMapping; import org.apache.gora.query.Query; @@ -249,10 +251,6 @@ class CassandraQueryFactory { return query; } -// static <T> String getUpdateDataQuery(CassandraMapping mapping, T obj) { -//// QueryBuilder.update(mapping.getKeySpace().getName(),mapping.getCoreName()). -// } - static <K> String getObjectWithFieldsQuery(CassandraMapping mapping, String[] fields, K key, List<Object> objects) { String cqlQuery = null; Select select = QueryBuilder.select(fields).from(mapping.getKeySpace().getName(), mapping.getCoreName()); @@ -363,7 +361,7 @@ class CassandraQueryFactory { static String getDeleteByQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects) { String[] columns = null; - if(!Arrays.equals(cassandraQuery.getFields(), mapping.getFieldNames())) { + if (!Arrays.equals(cassandraQuery.getFields(), mapping.getFieldNames())) { columns = getColumnNames(mapping, cassandraQuery.getFields()); } Object startKey = cassandraQuery.getStartKey(); @@ -413,4 +411,56 @@ class CassandraQueryFactory { return query.getQueryString(); } + static String getUpdateByQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects) { + Update update = QueryBuilder.update(mapping.getKeySpace().getName(), mapping.getCoreName()); + Update.Assignments updateAssignments = null; + if (cassandraQuery instanceof CassandraQuery) { + String[] fields = cassandraQuery.getFields(); + String[] columnNames = getColumnNames(mapping, fields); + for (String column : columnNames) { + updateAssignments = update.with(QueryBuilder.set(column, "?")); + objects.add(((CassandraQuery) cassandraQuery).getUpdateFieldValue(column)); + } + } + String primaryKey = null; + Update.Where query = null; + Object startKey = cassandraQuery.getStartKey(); + Object endKey = cassandraQuery.getEndKey(); + Object key = cassandraQuery.getKey(); + boolean isWhereNeeded = true; + if (key != null) { + primaryKey = getPKey(mapping.getFieldList()); + query = updateAssignments.where(QueryBuilder.eq(primaryKey, "?")); + objects.add(key); + } else { + if (startKey != null) { + if (mapping.getCassandraKey() != null) { +//todo avro serialization + } else { + primaryKey = getPKey(mapping.getFieldList()); + query = updateAssignments.where(QueryBuilder.gte(primaryKey, "?")); + objects.add(startKey); + isWhereNeeded = false; + } + } + if (endKey != null) { + if (mapping.getCassandraKey() != null) { +//todo avro serialization + } else { + primaryKey = primaryKey != null ? primaryKey : getPKey(mapping.getFieldList()); + if (isWhereNeeded) { + query = updateAssignments.where(QueryBuilder.lte(primaryKey, "?")); + } else { + query = query.and(QueryBuilder.lte(primaryKey, "?")); + } + objects.add(endKey); + } + } + } + if (startKey == null && endKey == null && key == null) { + return updateAssignments.getQueryString(); + } + return query.getQueryString(); + } + } http://git-wip-us.apache.org/repos/asf/gora/blob/cc452f8d/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java index e125a33..237bd8a 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java @@ -123,4 +123,6 @@ public abstract class CassandraSerializer<K, T extends Persistent> { public abstract long deleteByQuery(Query<K, T> query); + public abstract boolean updateByQuery(Query<K, T> query); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/cc452f8d/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java index 2bac3dd..6f64fa2 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java @@ -128,6 +128,19 @@ class NativeSerializer<K, T extends CassandraNativePersistent> extends Cassandra mapper = mappingManager.mapper(persistentClass); } + @Override + public boolean updateByQuery(Query query) { + List<Object> objectArrayList = new ArrayList<>(); + String cqlQuery = CassandraQueryFactory.getUpdateByQuery(mapping, query, objectArrayList); + ResultSet results; + if (objectArrayList.size() == 0) { + results = client.getSession().execute(cqlQuery); + } else { + results = client.getSession().execute(cqlQuery, objectArrayList.toArray()); + } + return results.wasApplied(); + } + private K getKey(T object) { String keyField = null; for (Field field : mapping.getFieldList()) { http://git-wip-us.apache.org/repos/asf/gora/blob/cc452f8d/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java index 8a100aa..c2ea388 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java @@ -25,7 +25,6 @@ import org.apache.gora.persistency.Persistent; import org.apache.gora.query.PartitionQuery; import org.apache.gora.query.Query; import org.apache.gora.query.Result; -import org.apache.gora.query.impl.PartitionQueryImpl; import org.apache.gora.query.ws.impl.PartitionWSQueryImpl; import org.apache.gora.store.DataStore; import org.apache.gora.store.DataStoreFactory; @@ -93,7 +92,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> cassandraClient.initialize(properties); cassandraSerializer = CassandraSerializer.getSerializer(cassandraClient, properties.getProperty(CassandraStoreParameters.CASSANDRA_SERIALIZATION_TYPE), keyClass, persistentClass, mapping); } catch (Exception e) { - throw new RuntimeException("Error while initializing Cassandra store: "+ e.getMessage(),e); + throw new RuntimeException("Error while initializing Cassandra store: " + e.getMessage(), e); } } @@ -153,7 +152,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> return keyClass.newInstance(); } } catch (Exception ex) { - throw new RuntimeException("Error while instantiating a key: "+ ex.getMessage(),ex); + throw new RuntimeException("Error while instantiating a key: " + ex.getMessage(), ex); } } @@ -168,7 +167,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> return persistentClass.newInstance(); } } catch (Exception ex) { - throw new RuntimeException("Error while instantiating a persistent: "+ ex.getMessage(),ex); + throw new RuntimeException("Error while instantiating a persistent: " + ex.getMessage(), ex); } } @@ -214,23 +213,23 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> @Override public Result<K, T> execute(Query<K, T> query) { - return (Result<K,T>) cassandraSerializer.execute(this, query); + return (Result<K, T>) cassandraSerializer.execute(this, query); } - public void updateByQuery(Query<K,T> query) { - + public boolean updateByQuery(Query<K, T> query) { + return cassandraSerializer.updateByQuery(query); } @Override public Query<K, T> newQuery() { - Query<K,T> query = new CassandraQuery(this); + Query<K, T> query = new CassandraQuery(this); query.setFields(mapping.getFieldNames()); return query; } @Override public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException { - List<PartitionQuery<K,T>> partitions = new ArrayList<>(); + List<PartitionQuery<K, T>> partitions = new ArrayList<>(); PartitionWSQueryImpl<K, T> pqi = new PartitionWSQueryImpl<>(query); pqi.setDataStore(this); partitions.add(pqi); @@ -239,7 +238,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> @Override public void flush() { - // ignore since caching has been disabled + // ignore since caching has been disabled } @Override http://git-wip-us.apache.org/repos/asf/gora/blob/cc452f8d/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java index b2ac3c1..1fc76c6 100644 --- a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java +++ b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java @@ -19,6 +19,7 @@ package org.apache.gora.cassandra.store; import org.apache.gora.cassandra.GoraCassandraTestDriver; import org.apache.gora.cassandra.example.generated.nativeSerialization.User; +import org.apache.gora.cassandra.query.CassandraQuery; import org.apache.gora.query.Query; import org.apache.gora.query.Result; import org.apache.gora.store.DataStore; @@ -237,4 +238,25 @@ public class TestCassandraStoreWithNativeSerialization { Assert.assertNull(partialDeletedUser.getName()); Assert.assertEquals(partialDeletedUser.getDateOfBirth(),user2.getDateOfBirth()); } + + @Test + public void testUpdateByQuery() { + userDataStore.truncateSchema(); + UUID id1 = UUID.randomUUID(); + User user1 = new User(id1, "user1", Date.from(Instant.now())); + userDataStore.put(id1, user1); + UUID id2 = UUID.randomUUID(); + User user2 = new User(id2, "user2", Date.from(Instant.now())); + userDataStore.put(id2, user2); + Query<UUID, User> query1 = userDataStore.newQuery(); + if(query1 instanceof CassandraQuery) { + ((CassandraQuery) query1).addUpdateField("name", "madhawa"); + } + query1.setKey(id1); + if(userDataStore instanceof CassandraStore) { + ((CassandraStore) userDataStore).updateByQuery(query1); + } + User user = userDataStore.get(id1); + Assert.assertEquals(user.getName(),"madhawa"); + } }