Add execute method, get by fields for native serialization
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/4ce6a6e4 Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/4ce6a6e4 Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/4ce6a6e4 Branch: refs/heads/master Commit: 4ce6a6e423700c72b99d28c7419aacc1dfe9e1ab Parents: 5e383ef Author: madhawa <madhaw...@gmail.com> Authored: Fri Jul 7 22:18:36 2017 +0530 Committer: madhawa <madhaw...@gmail.com> Committed: Fri Jul 7 22:18:36 2017 +0530 ---------------------------------------------------------------------- .../generated/nativeSerialization/User.java | 20 +++- .../gora/cassandra/query/CassandraColumn.java | 51 ++++++++ .../gora/cassandra/query/CassandraQuery.java | 67 +++++++++++ .../cassandra/query/CassandraResultSet.java | 82 +++++++++++++ .../gora/cassandra/query/CassandraRow.java | 79 ++++++++++++ .../cassandra/serializers/AvroSerializer.java | 35 +++++- .../serializers/CassandraQueryFactory.java | 119 +++++++++++++++++-- .../serializers/CassandraSerializer.java | 39 ++++-- .../cassandra/serializers/NativeSerializer.java | 80 ++++++++++++- .../gora/cassandra/store/CassandraClient.java | 5 +- .../gora/cassandra/store/CassandraMapping.java | 18 +++ .../store/CassandraMappingBuilder.java | 6 +- .../gora/cassandra/store/CassandraStore.java | 13 +- .../test/conf/avro/gora-cassandra-mapping.xml | 2 +- .../cassandra/store/TestCassandraStore.java | 95 +++++++++++++++ ...stCassandraStoreWithNativeSerialization.java | 79 +++++++++++- 16 files changed, 746 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/4ce6a6e4/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/User.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/User.java b/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/User.java index 1e810a0..105dfb7 100644 --- a/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/User.java +++ b/gora-cassandra-cql/src/examples/java/org/apache/gora/cassandra/example/generated/nativeSerialization/User.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.gora.cassandra.example.generated.nativeSerialization; import com.datastax.driver.mapping.annotations.Column; @@ -10,9 +27,8 @@ import java.util.Date; import java.util.UUID; /** - * Created by madhawa on 6/23/17. + * Sample class for native cassandra persistent example. */ - @Table(keyspace = "nativeTestKeySpace", name = "users", readConsistency = "QUORUM", writeConsistency = "QUORUM", http://git-wip-us.apache.org/repos/asf/gora/blob/4ce6a6e4/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java new file mode 100644 index 0000000..e95b815 --- /dev/null +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraColumn.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.cassandra.query; + +import org.apache.avro.Schema.Field; + +/** + * Created by madhawa on 7/1/17. + */ +public class CassandraColumn { + + public CassandraColumn(Field field, Object value) { + this.field = field; + this.name = field.name(); + this.value = value; + } + + public String getName() { + return name; + } + + public Field getField() { + return field; + } + + private String name; + + private Field field; + + public Object getValue() { + return value; + } + + private Object value; +} http://git-wip-us.apache.org/repos/asf/gora/blob/4ce6a6e4/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java new file mode 100644 index 0000000..251e9df --- /dev/null +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.cassandra.query; + +import org.apache.gora.filter.Filter; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; +import org.apache.gora.query.impl.QueryBase; +import org.apache.gora.query.ws.impl.QueryWSBase; +import org.apache.gora.store.DataStore; +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.io.Writable; + +/** + * Cassandra specific implementation of the {@link Query} interface. + */ +public class CassandraQuery<K, T extends Persistent> extends QueryWSBase<K,T> { + + protected Filter<K, T> filter; + protected boolean localFilterEnabled=true; + + public CassandraQuery(DataStore<K, T> dataStore) { + super(dataStore); + } + + @Override + public Filter<K, T> getFilter() { + return filter; + } + + @Override + public void setFilter(Filter<K, T> filter) { + this.filter = filter; + } + + @Override + public boolean isLocalFilterEnabled() { + return localFilterEnabled; + } + + @Override + public void setLocalFilterEnabled(boolean enable) { + localFilterEnabled = enable; + } + + public void addUpdateField(String field, Object oldValue, Object newValue) { + + } +} http://git-wip-us.apache.org/repos/asf/gora/blob/4ce6a6e4/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java new file mode 100644 index 0000000..c23a11c --- /dev/null +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.cassandra.query; + +import org.apache.gora.persistency.Persistent; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.Query; +import org.apache.gora.query.impl.ResultBase; +import org.apache.gora.store.DataStore; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * CassandraResult specific implementation of the {@link org.apache.gora.query.Result} + * interface. + */ +public class CassandraResultSet<K, T extends Persistent> extends ResultBase<K, T> { + + private List<T> persistentObject = new ArrayList<T>(); + + private List<K> persistentKey = new ArrayList<K>(); + + private int size = 0; + + private int position = 0; + + @Override + protected boolean nextInner() throws IOException { + if(offset < size) { + persistent = persistentObject.get(position); + key = persistentKey.get(position); + position ++; + return true; + } + return false; + } + + public CassandraResultSet(DataStore<K, T> dataStore, Query<K, T> query) { + super(dataStore, query); + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return ((float)position)/size; + } + + @Override + public T get() { + return super.get(); + } + + @Override + public K getKey() { + return super.getKey(); + } + + public void addResultElement(K key, T token) { + this.persistentKey.add(key); + this.persistentObject.add(token); + this.size++; + } +} http://git-wip-us.apache.org/repos/asf/gora/blob/4ce6a6e4/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java new file mode 100644 index 0000000..857e1b2 --- /dev/null +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/query/CassandraRow.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.cassandra.query; + +import java.util.ArrayList; +import java.util.List; + +/** + * Created by madhawa on 7/1/17. + */ +public class CassandraRow<K> extends ArrayList<CassandraColumn> { + private static final long serialVersionUID = -7620939600192859652L; + private K key; + + public K getKey() { + return this.key; + } + + public void setKey(K key) { + this.key = key; + } + + /** + * Gets a specific CassandraColumn within a row using its name + * + * @param cassandraColumnName columnName + * @return CassandraColumn + */ + public CassandraColumn getCassandraColumn(String cassandraColumnName) { + for (CassandraColumn cColumn : this) { + if (cassandraColumnName.equals(cColumn.getName())) { + return cColumn; + } + } + return null; + } + + /** + * + * @return + */ + public String[] getFields() { + List<String> columnNames = new ArrayList<>(); + for (CassandraColumn cColumn : this) { + columnNames.add(cColumn.getName()); + } + String[] columnNameArray = new String[columnNames.size()]; + columnNameArray = columnNames.toArray(columnNameArray); + return columnNameArray; + } + + /** + * + * @return + */ + public Object[] getValues() { + List<Object> columnValues = new ArrayList<>(); + for (CassandraColumn cColumn : this) { + columnValues.add(cColumn.getValue()); + } + return columnValues.toArray(); + } +} http://git-wip-us.apache.org/repos/asf/gora/blob/4ce6a6e4/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java index bb9d99c..d5dd548 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java @@ -17,35 +17,62 @@ package org.apache.gora.cassandra.serializers; +import com.datastax.driver.core.ColumnMetadata; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; +import org.apache.gora.cassandra.query.CassandraColumn; +import org.apache.gora.cassandra.query.CassandraRow; import org.apache.gora.cassandra.store.CassandraClient; import org.apache.gora.cassandra.store.CassandraMapping; +import org.apache.gora.persistency.Persistent; import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; +import org.apache.gora.store.DataStore; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; /** - * Created by madhawa on 6/26/17. + * This class contains the operations relates to Avro Serialization */ public class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { + /** + * Default schema index with value "0" used when AVRO Union data types are stored + */ + public static final int DEFAULT_UNION_SCHEMA = 0; + public AvroSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping) { super(cassandraClient, keyClass, persistentClass, mapping); } - @Override - public PersistentBase get(Object key) { + public Persistent get(Object key, String[] fields) { return null; } @Override - public void put(Object key, Object value) { + public void put(Object key, Persistent value) { } @Override + public Persistent get(Object key) { + return null; + } + + @Override public boolean delete(Object key) { return false; } + @Override + public Result execute(DataStore dataStore,Query query) { + return null; + } + } http://git-wip-us.apache.org/repos/asf/gora/blob/4ce6a6e4/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java index 84f5ccb..3939c34 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraQueryFactory.java @@ -16,13 +16,16 @@ */ package org.apache.gora.cassandra.serializers; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.querybuilder.Select; import org.apache.gora.cassandra.bean.CassandraKey; import org.apache.gora.cassandra.bean.ClusterKeyField; import org.apache.gora.cassandra.bean.Field; import org.apache.gora.cassandra.bean.KeySpace; import org.apache.gora.cassandra.bean.PartitionKeyField; +import org.apache.gora.cassandra.query.CassandraRow; import org.apache.gora.cassandra.store.CassandraMapping; -import org.apache.gora.persistency.Persistent; +import org.apache.gora.query.Query; import java.util.List; import java.util.Map; @@ -235,14 +238,116 @@ class CassandraQueryFactory { } /** - * * refer : http://docs.datastax.com/en/cql/3.1/cql/cql_reference/insert_r.html + * * @return */ - static String getInsertDataQuery(CassandraMapping mapping, Object obj) { -// ( (Persistent) obj).getS - StringBuilder stringBuffer = new StringBuilder(); -// o -return null; + static String getInsertDataQuery(CassandraMapping mapping, CassandraRow row) { + String query = QueryBuilder.insertInto(mapping.getKeySpace().getName(), mapping.getCoreName()).values(row.getFields(), row.getValues()).getQueryString(); + return query; + } + +// static <T> String getUpdateDataQuery(CassandraMapping mapping, T obj) { +//// QueryBuilder.update(mapping.getKeySpace().getName(),mapping.getCoreName()). +// } + + static <K> String getObjectWithFieldsQuery(CassandraMapping mapping, String[] fields, K key, List<Object> objects) { + String cqlQuery = null; + Select select = QueryBuilder.select(fields).from(mapping.getKeySpace().getName(), mapping.getCoreName()); + CassandraKey cKey = mapping.getCassandraKey(); + if (cKey != null) { + Select.Where query = null; + boolean isWhereNeeded = true; + for (PartitionKeyField field : cKey.getPartitionKeyFields()) { + if (field.isComposite()) { + for (Field compositeField : field.getFields()) { + if (isWhereNeeded) { + query = select.where(QueryBuilder.eq(compositeField.getColumnName(), "?")); + isWhereNeeded = false; + } + query = query.and(QueryBuilder.eq(compositeField.getColumnName(), "?")); + } + } else { + if (isWhereNeeded) { + query = select.where(QueryBuilder.eq(field.getColumnName(), "?")); + isWhereNeeded = false; + } + query = query.and(QueryBuilder.eq(field.getColumnName(), "?")); + } + } + cqlQuery = query != null ? query.getQueryString() : null; + } else { + for (Field field : mapping.getFieldList()) { + boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey")); + if (isPrimaryKey) { + cqlQuery = select.where(QueryBuilder.eq(field.getColumnName(), "?")).getQueryString(); + objects.add(key); + break; + } + } + } + return cqlQuery; + } + + + static<K> String getExecuteQuery(CassandraMapping mapping, Query cassandraQuery, List<Object> objects ) { + String[] fields = cassandraQuery.getFields(); + fields = fields != null ? fields : mapping.getFieldNames(); + Object startKey = cassandraQuery.getStartKey(); + Object endKey = cassandraQuery.getEndKey(); + long limit = cassandraQuery.getLimit(); + Select select = QueryBuilder.select(getColumnNames(mapping,fields)).from(mapping.getKeySpace().getName(), mapping.getCoreName()); + if(limit > 0) { + select = select.limit((int)limit); + } + Select.Where query = null; + boolean isWhereNeeded = true; + if(startKey != null) { + if (mapping.getCassandraKey() != null) { +//todo avro serialization + } else { + for (Field field : mapping.getFieldList()) { + boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey")); + if (isPrimaryKey) { + query = select.where(QueryBuilder.gte(field.getColumnName(), "?")); + objects.add(startKey); + isWhereNeeded = false; + break; + } + } + } + } + if(endKey != null) { + if (mapping.getCassandraKey() != null) { +//todo avro serialization + } else { + for (Field field : mapping.getFieldList()) { + boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey")); + if (isPrimaryKey) { + if(isWhereNeeded) { + query = select.where(QueryBuilder.lte(field.getColumnName(), "?")); + } else { + query = query.and(QueryBuilder.lte(field.getColumnName(), "?")); + } + objects.add(endKey); + break; + } + } + } + } + if(startKey == null && endKey == null) { + return select.getQueryString(); + } + return query.getQueryString(); + } + + private static String[] getColumnNames(CassandraMapping mapping, String[] fields) { + String[] columnNames = new String[fields.length]; + int i = 0; + for(String field : fields) { + columnNames[i] = mapping.getField(field).getColumnName(); + i++; + } + return columnNames; } } http://git-wip-us.apache.org/repos/asf/gora/blob/4ce6a6e4/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java index 09272ce..cafc1be 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/CassandraSerializer.java @@ -22,22 +22,26 @@ import com.datastax.driver.core.TableMetadata; import org.apache.gora.cassandra.store.CassandraClient; import org.apache.gora.cassandra.store.CassandraMapping; import org.apache.gora.cassandra.store.CassandraStore; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; +import org.apache.gora.store.DataStore; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Locale; /** - * Created by madhawa on 6/26/17. + * This is the abstract Cassandra Serializer class. */ -public abstract class CassandraSerializer<K, T> { +public abstract class CassandraSerializer<K, T extends Persistent> { CassandraClient client; - private Class<K> keyClass; + protected Class<K> keyClass; - private Class<T> persistentClass; + protected Class<T> persistentClass; - private CassandraMapping mapping; + protected CassandraMapping mapping; private static final Logger LOG = LoggerFactory.getLogger(CassandraStore.class); @@ -81,19 +85,30 @@ public abstract class CassandraSerializer<K, T> { } } + /** + * This method returns the Cassandra Serializer according the Casssandra serializer property. + * + * @param cc Cassandra Client + * @param type Serialization type + * @param keyClass key class + * @param persistentClass persistent class + * @param mapping Cassandra Mapping + * @param <K> key class + * @param <T> persistent class + * @return Serializer + */ public static <K, T> CassandraSerializer getSerializer(CassandraClient cc, String type, final Class<K> keyClass, final Class<T> persistentClass, CassandraMapping mapping) { CassandraStore.SerializerType serType = type.isEmpty() ? CassandraStore.SerializerType.NATIVE : CassandraStore.SerializerType.valueOf(type.toUpperCase(Locale.ENGLISH)); - CassandraSerializer ser; + CassandraSerializer serializer; switch (serType) { case AVRO: - ser = new AvroSerializer(cc,keyClass, persistentClass, mapping); + serializer = new AvroSerializer(cc, keyClass, persistentClass, mapping); break; case NATIVE: default: - ser = new NativeSerializer(cc, keyClass, persistentClass, mapping); - + serializer = new NativeSerializer(cc, keyClass, persistentClass, mapping); } - return ser; + return serializer; } public abstract void put(K key, T value); @@ -102,4 +117,8 @@ public abstract class CassandraSerializer<K, T> { public abstract boolean delete(K key); + public abstract T get(K key, String[] fields); + + public abstract Result<K, T> execute(DataStore<K, T> dataStore,Query<K, T> query); + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/4ce6a6e4/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java index 0c30cba..c0c8d15 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/NativeSerializer.java @@ -17,25 +17,34 @@ package org.apache.gora.cassandra.serializers; +import com.datastax.driver.core.ResultSet; import com.datastax.driver.mapping.Mapper; import com.datastax.driver.mapping.MappingManager; +import com.datastax.driver.mapping.Result; +import org.apache.gora.cassandra.bean.Field; import org.apache.gora.cassandra.persistent.CassandraNativePersistent; +import org.apache.gora.cassandra.query.CassandraResultSet; import org.apache.gora.cassandra.store.CassandraClient; import org.apache.gora.cassandra.store.CassandraMapping; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.query.Query; +import org.apache.gora.store.DataStore; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; /** - * Created by madhawa on 6/26/17. + * This Class contains the operation relates to Native Serialization. */ public class NativeSerializer<K, T extends CassandraNativePersistent> extends CassandraSerializer { private Mapper<T> mapper; - - - @Override - public void put(Object key, Object value) { - mapper.save((T)value); + public void put(Object key, Persistent value) { + mapper.save((T) value); } @Override @@ -49,13 +58,72 @@ public class NativeSerializer<K, T extends CassandraNativePersistent> extends Ca return true; } + @Override + public Persistent get(Object key, String[] fields) { + List<Object> objectArrayList = new ArrayList<>(); + String cqlQuery = CassandraQueryFactory.getObjectWithFieldsQuery(mapping, fields, key, objectArrayList); + ResultSet results = client.getSession().execute(cqlQuery, objectArrayList.toArray()); + Result<T> objects = mapper.map(results); + List<T> objectList = objects.all(); + if (objectList != null) { + return objectList.get(0); + } + return null; + } + @Override + public org.apache.gora.query.Result execute(DataStore dataStore, Query query) { + List<Object> objectArrayList = new ArrayList<>(); + CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<K, T>(dataStore, query); + String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList); + ResultSet results; + if (objectArrayList.size() == 0) { + results = client.getSession().execute(cqlQuery); + } else { + results = client.getSession().execute(cqlQuery, objectArrayList.toArray()); + } + Result<T> objects = mapper.map(results); + Iterator iterator = objects.iterator(); + while (iterator.hasNext()) { + T result = (T) iterator.next(); + K key = getKey(result); + cassandraResult.addResultElement(key, result); + } + return cassandraResult; + } public NativeSerializer(CassandraClient cassandraClient, Class<K> keyClass, Class<T> persistentClass, CassandraMapping mapping) { super(cassandraClient, keyClass, persistentClass, mapping); this.createSchema(); MappingManager mappingManager = new MappingManager(cassandraClient.getSession()); mapper = mappingManager.mapper(persistentClass); + } + private K getKey(T object) { + String keyField = null; + for (Field field : mapping.getFieldList()) { + boolean isPrimaryKey = Boolean.parseBoolean(field.getProperty("primarykey")); + if (isPrimaryKey) { + keyField = field.getFieldName(); + break; + } + } + K key = null; + Method keyMethod = null; + try { + for (Method method : this.persistentClass.getMethods()) { + if (method.getName().equalsIgnoreCase("get" + keyField)) { + keyMethod = method; + } + } + key = (K) keyMethod.invoke(object); + } catch (Exception e) { + try { + key = (K) this.persistentClass.getField(keyField).get(object); + } catch (Exception e1) { + throw new RuntimeException("Field" + keyField + " is not accessible in " + persistentClass + " : " + e1.getMessage()); + } + } + return key; } } http://git-wip-us.apache.org/repos/asf/gora/blob/4ce6a6e4/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java index 847343e..7f89bbb 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java @@ -33,11 +33,12 @@ import java.util.List; import java.util.Properties; /** - * Created by madhawa on 6/28/17. + * This class provides the Cassandra Client Connection. + * Initialize the Cassandra Connection according to the Properties. */ public class CassandraClient { - public static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class); + private static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class); private Cluster cluster; http://git-wip-us.apache.org/repos/asf/gora/blob/4ce6a6e4/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java index 7acc99c..7b5d265 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMapping.java @@ -53,6 +53,24 @@ public class CassandraMapping { return fieldList; } + public Field getField(String field) { + for (Field field1 : fieldList) { + if (field1.getFieldName().equals(field)) { + return field1; + } + } + return null; + } + + public String[] getFieldNames() { + List<String> fieldNames = new ArrayList<>(fieldList.size()); + for (Field field : fieldList) { + fieldNames.add(field.getFieldName()); + } + String[] fieldNameArray = new String[fieldNames.size()]; + return fieldNames.toArray(fieldNameArray); + } + public CassandraKey getCassandraKey() { return cassandraKey; } http://git-wip-us.apache.org/repos/asf/gora/blob/4ce6a6e4/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java index bb8c6ad..3302bcb 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraMappingBuilder.java @@ -18,20 +18,20 @@ import java.util.List; import java.util.Locale; /** - * Created by madhawa on 6/28/17. + * This Class reads the Cassandra Mapping file and create tha Cassandra Mapping object. + * {@link org.apache.gora.cassandra.store.CassandraMapping} */ public class CassandraMappingBuilder<K, T extends Persistent> { private static final Logger LOG = LoggerFactory.getLogger(CassandraMappingBuilder.class); - private CassandraStore dataStore; /** * Constructor for builder to create the mapper. * - * @param store + * @param store Cassandra Store */ public CassandraMappingBuilder(final CassandraStore<K, T> store) { this.dataStore = store; http://git-wip-us.apache.org/repos/asf/gora/blob/4ce6a6e4/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java index 1d9093d..7c24175 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java @@ -18,6 +18,7 @@ package org.apache.gora.cassandra.store; import org.apache.gora.cassandra.persistent.CassandraNativePersistent; +import org.apache.gora.cassandra.query.CassandraQuery; import org.apache.gora.cassandra.serializers.CassandraSerializer; import org.apache.gora.persistency.BeanFactory; import org.apache.gora.persistency.Persistent; @@ -194,7 +195,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> @Override public T get(K key, String[] fields) { - return null; + return (T) cassandraSerializer.get(key, fields); } @Override @@ -214,12 +215,18 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> @Override public Result<K, T> execute(Query<K, T> query) { - return null; + return (Result<K,T>) cassandraSerializer.execute(this, query); + } + + public void updateByQuery(Query<K,T> query) { + } @Override public Query<K, T> newQuery() { - return null; + Query<K,T> query = new CassandraQuery(this); + query.setFields(mapping.getFieldNames()); + return query; } @Override http://git-wip-us.apache.org/repos/asf/gora/blob/4ce6a6e4/gora-cassandra-cql/src/test/conf/avro/gora-cassandra-mapping.xml ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/test/conf/avro/gora-cassandra-mapping.xml b/gora-cassandra-cql/src/test/conf/avro/gora-cassandra-mapping.xml index e99cf92..8c970dc 100644 --- a/gora-cassandra-cql/src/test/conf/avro/gora-cassandra-mapping.xml +++ b/gora-cassandra-cql/src/test/conf/avro/gora-cassandra-mapping.xml @@ -66,7 +66,7 @@ table="Employee" compactStorage="true" > <field name="name" column="name" type="text" ttl="10"/> <field name="dateOfBirth" column="dob" type="bigint" ttl="10"/> - <field name="ssn" column="ssn" type="int" ttl="10" primarykey="true"/> + <field name="ssn" column="ssn" type="text" ttl="10" primarykey="true"/> <field name="salary" column="salary" type="int" ttl="10" /> </class> http://git-wip-us.apache.org/repos/asf/gora/blob/4ce6a6e4/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java new file mode 100644 index 0000000..34255a6 --- /dev/null +++ b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStore.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Testing class for all standard gora-cassandra functionality. + * We extend DataStoreTestBase enabling us to run the entire base test + * suite for Gora. + */ +package org.apache.gora.cassandra.store; + +import org.apache.gora.cassandra.GoraCassandraTestDriver; +import org.apache.gora.store.DataStoreTestBase; +import org.junit.Before; +import org.junit.Ignore; + +import java.io.IOException; +import java.util.Properties; + +/** + * Test for CassandraStore. + */ +public class TestCassandraStore extends DataStoreTestBase{ + private static Properties properties; + + static { + GoraCassandraTestDriver testDriver = new GoraCassandraTestDriver(); + setProperties(); + testDriver.setParameters(properties); + setTestDriver(testDriver); + } + + @Before + public void setUp() throws Exception { + super.setUp(); + } + + + private static void setProperties() { + properties = new Properties(); + properties.setProperty(CassandraStoreParameters.CASSANDRA_SERVERS, "localhost"); + properties.setProperty(CassandraStoreParameters.PORT, "9042"); + properties.setProperty(CassandraStoreParameters.ENABLE_JMX_REPORTING, "false"); + properties.setProperty(CassandraStoreParameters.PROTOCOL_VERSION, "3"); + properties.setProperty(CassandraStoreParameters.CLUSTER_NAME,"Test Cluster"); + properties.setProperty("gora.cassandrastore.mapping.file", "avro/gora-cassandra-mapping.xml"); + } + @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));") + @Override + public void testQuery() throws IOException {} + @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));") + @Override + public void testQueryStartKey() throws IOException {} + @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));") + @Override + public void testQueryEndKey() throws IOException {} + @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));") + @Override + public void testQueryKeyRange() throws IOException {} + @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));") + @Override + public void testQueryWebPageSingleKey() throws IOException {} + @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));") + @Override + public void testQueryWebPageSingleKeyDefaultFields() throws IOException {} + @Ignore("GORA-299 o.a.g.cassandra.CassandraStore#newQuery() should not use query.setFields(getFieldsToQuery(null));") + @Override + public void testQueryWebPageQueryEmptyResults() throws IOException {} + @Ignore("GORA-154 delete() and deleteByQuery() methods are not implemented at CassandraStore, and always returns false or 0") + @Override + public void testDelete() throws IOException {} + @Ignore("GORA-154 delete() and deleteByQuery() methods are not implemented at CassandraStore, and always returns false or 0") + @Override + public void testDeleteByQuery() throws IOException {} + @Ignore("GORA-154 delete() and deleteByQuery() methods are not implemented at CassandraStore, and always returns false or 0") + @Override + public void testDeleteByQueryFields() throws IOException {} + @Ignore("GORA-298 Implement CassandraStore#getPartitions") + @Override + public void testGetPartitions() throws IOException {} +} http://git-wip-us.apache.org/repos/asf/gora/blob/4ce6a6e4/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java index 9ebcae5..160a5ae 100644 --- a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java +++ b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java @@ -19,6 +19,8 @@ package org.apache.gora.cassandra.store; import org.apache.gora.cassandra.GoraCassandraTestDriver; import org.apache.gora.cassandra.example.generated.nativeSerialization.User; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; import org.apache.gora.store.DataStore; import org.junit.After; import org.junit.AfterClass; @@ -28,6 +30,8 @@ import org.junit.Test; import java.time.Instant; import java.util.Date; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import java.util.UUID; @@ -53,7 +57,7 @@ public class TestCassandraStoreWithNativeSerialization { parameter.setProperty(CassandraStoreParameters.PORT, "9042"); parameter.setProperty(CassandraStoreParameters.CASSANDRA_SERIALIZATION_TYPE, "native"); parameter.setProperty(CassandraStoreParameters.PROTOCOL_VERSION, "3"); - parameter.setProperty(CassandraStoreParameters.CLUSTER_NAME,"Test Cluster"); + parameter.setProperty(CassandraStoreParameters.CLUSTER_NAME, "Test Cluster"); parameter.setProperty("gora.cassandrastore.mapping.file", "nativeSerialization/gora-cassandra-mapping.xml"); } @@ -92,7 +96,7 @@ public class TestCassandraStoreWithNativeSerialization { // storing data; userDataStore.put(id, user1); // get data; - User olduser = userDataStore.get(user1.getUserId()); + User olduser = userDataStore.get(user1.getUserId()); Assert.assertEquals(olduser.getName(), user1.getName()); Assert.assertEquals(olduser.getDateOfBirth(), user1.getDateOfBirth()); // delete data; @@ -105,7 +109,7 @@ public class TestCassandraStoreWithNativeSerialization { /** * In this test case, schema exists method behavior of the data store is testing. */ - @Test + @Test() public void testSchemaExists() { userDataStore.deleteSchema(); Assert.assertFalse(userDataStore.schemaExists()); @@ -118,17 +122,80 @@ public class TestCassandraStoreWithNativeSerialization { */ @Test public void testTruncateSchema() { - if(!userDataStore.schemaExists()) { + if (!userDataStore.schemaExists()) { userDataStore.createSchema(); } UUID id = UUID.randomUUID(); User user1 = new User(id, "Madhawa Kasun", Date.from(Instant.now())); userDataStore.put(id, user1); - User olduser = userDataStore.get(id); + User olduser = userDataStore.get(id); Assert.assertEquals(olduser.getName(), user1.getName()); Assert.assertEquals(olduser.getDateOfBirth(), user1.getDateOfBirth()); userDataStore.truncateSchema(); - olduser = userDataStore.get(id); + olduser = userDataStore.get(id); Assert.assertNull(olduser); } + + /** + * In this test case, get with fields method behavior of the data store is testing. + */ + @Test + public void testGetWithFields() { + UUID id = UUID.randomUUID(); + User user1 = new User(id, "Madhawa Kasun Gunasekara", Date.from(Instant.now())); + userDataStore.put(id, user1); + // get data; + User olduser = userDataStore.get(user1.getUserId()); + Assert.assertEquals(olduser.getName(), user1.getName()); + Assert.assertEquals(olduser.getDateOfBirth(), user1.getDateOfBirth()); + User olduserWithFields = userDataStore.get(id, new String[]{"name"}); + Assert.assertNull(olduserWithFields.getDateOfBirth()); + } + + /** + * In this test case, get with fields method behavior of the data store is testing. + */ + @Test + public void testExecute() throws Exception { + userDataStore.truncateSchema(); + Map<UUID, User> users = new HashMap<>(); + UUID id1 = UUID.randomUUID(); + User user1 = new User(id1, "user1", Date.from(Instant.now())); + users.put(id1, user1); + userDataStore.put(id1, user1); + UUID id2 = UUID.randomUUID(); + User user2 = new User(id2, "user2", Date.from(Instant.now())); + users.put(id2, user2); + userDataStore.put(id2, user2); + UUID id3 = UUID.randomUUID(); + User user3 = new User(id3, "user3", Date.from(Instant.now())); + users.put(id3, user3); + userDataStore.put(id3, user3); + Query<UUID, User> query1 = userDataStore.newQuery(); + Result<UUID, User> result1 = userDataStore.execute(query1); + int i = 0; + Assert.assertEquals(result1.getProgress(),0.0,0.0); + while (result1.next()) { + // check objects values + Assert.assertEquals(result1.get().getName(), users.get(result1.getKey()).getName()); + Assert.assertEquals(result1.get().getDateOfBirth(), users.get(result1.getKey()).getDateOfBirth()); + Assert.assertEquals(result1.get().getUserId(), users.get(result1.getKey()).getUserId()); + i++; + } + Assert.assertEquals(result1.getProgress(),1.0,0.0); + Assert.assertEquals(i, 3); + + // Check limit query + Query<UUID, User> query2 = userDataStore.newQuery(); + query2.setLimit(2); + Result<UUID, User> result2 = userDataStore.execute(query2); + i = 0; + while (result2.next()) { + Assert.assertEquals(result2.get().getName(), users.get(result2.getKey()).getName()); + Assert.assertEquals(result2.get().getDateOfBirth(), users.get(result2.getKey()).getDateOfBirth()); + Assert.assertEquals(result2.get().getUserId(), users.get(result2.getKey()).getUserId()); + i++; + } + Assert.assertEquals(i, 2); + } }