http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/compiler/GoraCassandraNativeCompiler.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/compiler/GoraCassandraNativeCompiler.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/compiler/GoraCassandraNativeCompiler.java new file mode 100644 index 0000000..cd0a5d2 --- /dev/null +++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/compiler/GoraCassandraNativeCompiler.java @@ -0,0 +1,326 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.cassandra.compiler; + +import org.apache.commons.io.FilenameUtils; +import org.apache.gora.cassandra.bean.Field; +import org.apache.gora.cassandra.store.CassandraMapping; +import org.apache.gora.cassandra.store.CassandraMappingBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.charset.Charset; +import java.util.List; +import java.util.Locale; + +/** + * This class generate Java classes for Cassandra Native Serialization. + * <p> + * Generate specific Java classes for defined Gora cassandra mapping. + * Different from the @see org.apache.gora.compiler.GoraCompiler, + * which uses an .avsc or .json schema definition, this compiler + * expects an XML mapping file as input. + */ +public class GoraCassandraNativeCompiler { + + private static final Logger log = LoggerFactory.getLogger(GoraCassandraNativeCompiler.class); + + private Writer out; + private File dest; + + GoraCassandraNativeCompiler(File dest) { + this.dest = dest; + } + + /** + * Start point of the compiler program + * + * @param args the schema file to be compiled and where this should be written + */ + public static void main(String[] args) { + try { + if (args.length < 2) { + log.error("Usage: Compiler <mapping file> <output dir>"); + System.exit(1); + } + compileSchema(new File(args[0]), new File(args[1])); + } catch (Exception e) { + log.error("Something went wrong. Please check the input file.", e.getMessage()); + throw new RuntimeException(e); + } + } + + /** + * Generates Java classes for a mapping. + */ + private static void compileSchema(File src, File dest) throws Exception { + log.info("Compiling {} to {}", src, dest); + GoraCassandraNativeCompiler compiler = new GoraCassandraNativeCompiler(dest); + List<CassandraMapping> mappings = readMappingFile(src); + for (CassandraMapping mapping : mappings) { + compiler.compile(mapping); + } + } + + private static List<CassandraMapping> readMappingFile(File src) throws Exception { + List<CassandraMapping> mappings = new CassandraMappingBuilder().readMappingFile(src); + return mappings; + } + + /** + * Returns the string received with the first letter in uppercase + * + * @param name to be converted + * @return camelCase String + */ + static String cap(String name) { + return name.substring(0, 1).toUpperCase(Locale.getDefault()) + + name.substring(1, name.length()); + } + + /** + * Method in charge of compiling a specific table using a key schema and a set + * of attributes + * + * @param mapping Cassandra Mapping + */ + private void compile(CassandraMapping mapping) { + String fullQualifiedName = mapping.getProperty("name"); + String tableName = mapping.getCoreName(); + String packageName = fullQualifiedName.substring(0, fullQualifiedName.lastIndexOf(".")); + String className = fullQualifiedName.substring(packageName.length() + 1, fullQualifiedName.length()); + String keySpace = mapping.getKeySpace().getName(); + + try { + startFile(className, packageName); + setHeaders(packageName); + line(0, ""); + line(0, "@Table(keyspace = \"" + keySpace + "\", name = \"" + tableName + "\"," + + "readConsistency = \"QUORUM\"," + + "writeConsistency = \"QUORUM\"," + + "caseSensitiveKeyspace = false," + + "caseSensitiveTable = false)"); + line(0, "public class " + className + " implements Persistent {"); + for (Field field : mapping.getFieldList()) { + processFields(field); + processGetterAndSetters(field); + line(2, ""); + } + + setDefaultMethods(2, className); + line(0, "}"); + out.flush(); + out.close(); + } catch (IOException e) { + log.error("Error while compiling table {}", className, e.getMessage()); + throw new RuntimeException(e); + } + } + + /** + * Sets the necessary imports for the generated java class to work + * + * @param namespace Namespace + * @throws IOException + */ + private void setHeaders(String namespace) throws IOException { + if (namespace != null) { + line(0, "package " + namespace + ";\n"); + } + line(0, "import java.util.List;"); + line(0, "import java.util.Set;"); + line(0, "import java.util.Map;"); + line(0, "import java.util.UUID;"); + line(0, "import java.math.BigDecimal;"); + line(0, "import java.math.BigInteger;"); + line(0, "import java.net.InetAddress;"); + line(0, "import java.nio.ByteBuffer;"); + line(0, "import java.util.Date;"); + line(0, ""); + line(0, "import org.apache.avro.Schema.Field;"); + line(0, "import org.apache.gora.persistency.Persistent;"); + line(0, "import org.apache.gora.persistency.Tombstone;"); + line(0, "import com.datastax.driver.mapping.annotations.Column;"); + line(0, "import com.datastax.driver.mapping.annotations.PartitionKey;"); + line(0, "import com.datastax.driver.mapping.annotations.Table;"); + line(0, "import com.datastax.driver.mapping.annotations.Transient;"); + } + + /** + * Starts the java generated class file + * + * @param name Class name + * @throws IOException + */ + private void startFile(String name, String packageName) throws IOException { + String fullDest = FilenameUtils.normalize + (dest.getAbsolutePath() + File.separatorChar + packageName.replace('.', File.separatorChar)); + File dir = new File(fullDest); + if (!dir.exists()) + if (!dir.mkdirs()) + throw new IOException("Unable to create " + dir); + name = cap(name) + ".java"; + out = new OutputStreamWriter(new FileOutputStream(new File(dir, name)), Charset.defaultCharset()); + } + + /** + * Creates default methods inherited from upper classes + * + * @param pIden of spaces used for indentation + * @param className class Name + * @throws IOException + */ + private void setDefaultMethods(int pIden, String className) throws IOException { + line(pIden, "@Transient"); + line(pIden, "@Override"); + line(pIden, "public void clear() { }"); + line(pIden, "@Transient"); + line(pIden, "@Override"); + line(pIden, "public boolean isDirty() { return false; }"); + line(pIden, "@Transient"); + line(pIden, "@Override"); + line(pIden, "public boolean isDirty(int fieldIndex) { return false; }"); + line(pIden, "@Transient"); + line(pIden, "@Override"); + line(pIden, "public boolean isDirty(String field) { return false; }"); + line(pIden, "@Transient"); + line(pIden, "@Override"); + line(pIden, "public void setDirty() { }"); + line(pIden, "@Transient"); + line(pIden, "@Override"); + line(pIden, "public void setDirty(int fieldIndex) { }"); + line(pIden, "@Transient"); + line(pIden, "@Override"); + line(pIden, "public void setDirty(String field) { }"); + line(pIden, "@Transient"); + line(pIden, "@Override"); + line(pIden, "public void clearDirty(int fieldIndex) { }"); + line(pIden, "@Transient"); + line(pIden, "@Override"); + line(pIden, "public void clearDirty(String field) { }"); + line(pIden, "@Transient"); + line(pIden, "@Override"); + line(pIden, "public void clearDirty() { }"); + line(pIden, "@Transient"); + line(pIden, "@Override"); + line(pIden, "public Tombstone getTombstone() { return null; }"); + line(pIden, "@Transient"); + line(pIden, "@Override"); + line(pIden, "public List<Field> getUnmanagedFields() { return null; }"); + line(pIden, "@Transient"); + line(pIden, "@Override"); + line(pIden, "public Persistent newInstance() { return new " + className + "(); }"); + } + + private void processFields(Field field) throws IOException { + String fieldName = field.getFieldName(); + String columnName = field.getColumnName(); + if (Boolean.parseBoolean(field.getProperty("primarykey"))) { + line(2, "@PartitionKey"); + } + line(2, "@Column(name = \"" + columnName + "\")"); + line(2, "private " + getDataType(field.getType(), false) + " " + fieldName + ";"); + } + + private void processGetterAndSetters(Field field) throws IOException { + String dataType = getDataType(field.getType(), false); + line(2, "public " + dataType + " get" + cap(field.getFieldName()) + "() {"); + line(2, "return " + field.getFieldName() + ";"); + line(2, "}"); + line(2, "public void set" + cap(field.getFieldName()) + "(" + dataType + " field) {"); + line(2, field.getFieldName() + " = field;"); + line(2, "}"); + } + + private String getDataType(String dbType, boolean isInner) { + if (dbType.equalsIgnoreCase("uuid")) { + return "UUID"; + } else if (dbType.equalsIgnoreCase("text") || dbType.equalsIgnoreCase("ascii") || dbType.equalsIgnoreCase("varchar")) { + return "String"; + } else if (dbType.equalsIgnoreCase("timestamp")) { + return "Date"; + } else if (dbType.startsWith("list")) { + String innerType = dbType.substring(dbType.indexOf("<") + 1, dbType.indexOf(">")); + return "List<" + getDataType(innerType, true) + ">"; + } else if (dbType.startsWith("set")) { + String innerType = dbType.substring(dbType.indexOf("<") + 1, dbType.indexOf(">")); + return "Set<" + getDataType(innerType, true) + ">"; + } else if (dbType.startsWith("map")) { + String innerTypes = dbType.substring(dbType.indexOf("<") + 1, dbType.indexOf(">")); + String[] types = innerTypes.split(","); + return "Map<" + getDataType(types[0], true) + "," + getDataType(types[1], true) + ">"; + } else if (dbType.equalsIgnoreCase("blob")) { + return "ByteBuffer"; + } else if (dbType.equalsIgnoreCase("int")) { + if (isInner) { + return "Integer"; + } else { + return "int"; + } + } else if (dbType.equalsIgnoreCase("float")) { + if (isInner) { + return "Float"; + } else { + return "float"; + } + } else if (dbType.equalsIgnoreCase("double")) { + if (isInner) { + return "Double"; + } else { + return "double"; + } + } else if (dbType.equalsIgnoreCase("decimal")) { + return "BigDecimal"; + } else if (dbType.equalsIgnoreCase("bigint") || dbType.equalsIgnoreCase("counter")) { + return "Long"; + } else if (dbType.equalsIgnoreCase("boolean")) { + if (isInner) { + return "Boolean"; + } else { + return "boolean"; + } + } else if (dbType.equalsIgnoreCase("varint")) { + return "BigInteger"; + } else if (dbType.equalsIgnoreCase("inet")) { + return "InetAddress"; + } else if (dbType.contains("frozen")) { + throw new RuntimeException("Compiler Doesn't support user define types"); + } + throw new RuntimeException("Invalid Cassandra DataType"); + } + + /** + * Writes a line within the output stream + * + * @param indent Number of spaces used for indentation + * @param text Text to be written + * @throws IOException + */ + private void line(int indent, String text) throws IOException { + for (int i = 0; i < indent; i++) { + out.append(" "); + } + out.append(text); + out.append("\n"); + } +}
http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/package-info.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/package-info.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/package-info.java new file mode 100644 index 0000000..3ad9186 --- /dev/null +++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains Casandra datastore related all classes. + */ +package org.apache.gora.cassandra; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java new file mode 100644 index 0000000..e24822d --- /dev/null +++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraQuery.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.cassandra.query; + +import org.apache.gora.filter.Filter; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.query.Query; +import org.apache.gora.query.ws.impl.QueryWSBase; +import org.apache.gora.store.DataStore; + +import java.util.HashMap; +import java.util.Map; + +/** + * Cassandra specific implementation of the {@link Query} interface. + */ +public class CassandraQuery<K, T extends Persistent> extends QueryWSBase<K, T> { + + private Filter<K, T> filter; + + private boolean localFilterEnabled = true; + + private Map<String, Object> updateFields = new HashMap<>(); + + public CassandraQuery(DataStore<K, T> dataStore) { + super(dataStore); + } + + /** + * {@inheritDoc} + */ + @Override + public Filter<K, T> getFilter() { + return filter; + } + + /** + * {@inheritDoc} + */ + @Override + public void setFilter(Filter<K, T> filter) { + this.filter = filter; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean isLocalFilterEnabled() { + return localFilterEnabled; + } + + /** + * {@inheritDoc} + */ + @Override + public void setLocalFilterEnabled(boolean enable) { + localFilterEnabled = enable; + } + + /** + * This method adds Update field with the relevant Value + * + * @param field field Name + * @param newValue New Value of the field + */ + public void addUpdateField(String field, Object newValue) { + updateFields.put(field, newValue); + } + + /** + * This method returns the updated field value of the particular field. + * + * @param key Field Name + * @return Object value + */ + public Object getUpdateFieldValue(String key) { + return updateFields.get(key); + } + + /** + * {@inheritDoc} + */ + @Override + public String[] getFields() { + if (updateFields.size() == 0) { + return super.getFields(); + } else { + String[] updateFieldsArray = new String[updateFields.size()]; + return updateFields.keySet().toArray(updateFieldsArray); + } + } +} http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java new file mode 100644 index 0000000..4e44d0d --- /dev/null +++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/CassandraResultSet.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.cassandra.query; + +import org.apache.gora.persistency.Persistent; +import org.apache.gora.query.Query; +import org.apache.gora.query.impl.ResultBase; +import org.apache.gora.store.DataStore; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * CassandraResult specific implementation of the {@link org.apache.gora.query.Result} + * interface. + */ +public class CassandraResultSet<K, T extends Persistent> extends ResultBase<K, T> { + + private List<T> persistentObject = new ArrayList<T>(); + + private List<K> persistentKey = new ArrayList<K>(); + + private int size = 0; + + private int position = 0; + + /** + * Constructor of the Cassandra Result + * @param dataStore Cassandra Data Store + * @param query Cassandra Query + */ + public CassandraResultSet(DataStore<K, T> dataStore, Query<K, T> query) { + super(dataStore, query); + } + + /** + * {@inheritDoc} + * + * @return + * @throws IOException + */ + @Override + protected boolean nextInner() throws IOException { + if (offset < size) { + persistent = persistentObject.get(position); + key = persistentKey.get(position); + position++; + return true; + } + return false; + } + + /** + * {@inheritDoc} + * + * @return + * @throws IOException + * @throws InterruptedException + */ + @Override + public float getProgress() throws IOException, InterruptedException { + return ((float) position) / size; + } + + /** + * {@inheritDoc} + * + * @return + */ + @Override + public T get() { + return super.get(); + } + + /** + * {@inheritDoc} + * + * @return + */ + @Override + public K getKey() { + return super.getKey(); + } + + /** + * This method adds Result Element into result lists, So when user retrieves values from the Result these objects will be passed. + * + * @param key key + * @param token persistent Object + */ + public void addResultElement(K key, T token) { + this.persistentKey.add(key); + this.persistentObject.add(token); + this.size++; + } + + @Override + /** + * Returns whether the limit for the query is reached. + * @return true if result limit is reached + */ + protected boolean isLimitReached() { + return (limit > 0 && offset >= limit) || (offset >= size); + } +} http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/package-info.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/package-info.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/package-info.java new file mode 100644 index 0000000..275c8d9 --- /dev/null +++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/query/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains all the Cassandra store query representation class as well as Result set representing class + * when query is executed over the Cassandra dataStore. + */ +package org.apache.gora.cassandra.query; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java new file mode 100644 index 0000000..9c33bf6 --- /dev/null +++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java @@ -0,0 +1,354 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.cassandra.serializers; + +import org.apache.avro.Schema; +import org.apache.avro.util.Utf8; +import org.apache.gora.cassandra.bean.CassandraKey; +import org.apache.gora.cassandra.bean.Field; +import org.apache.gora.cassandra.store.CassandraMapping; +import org.apache.gora.hbase.util.HBaseByteInterface; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.persistency.impl.DirtyListWrapper; +import org.apache.gora.persistency.impl.DirtyMapWrapper; +import org.apache.gora.persistency.impl.PersistentBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * This class is Utils class for Avro serialization. + */ +class AvroCassandraUtils { + + /** + * Default schema index with value "0" used when AVRO Union data types are stored. + */ + private static final int DEFAULT_UNION_SCHEMA = 0; + + private static final Logger LOG = LoggerFactory.getLogger(AvroCassandraUtils.class); + + static void processKeys(CassandraMapping cassandraMapping, Object key, List<String> keys, List<Object> values) { + CassandraKey cassandraKey = cassandraMapping.getCassandraKey(); + if (cassandraKey != null) { + if (key instanceof PersistentBase) { + PersistentBase keyBase = (PersistentBase) key; + for (Schema.Field field : keyBase.getSchema().getFields()) { + Field mappedField = cassandraKey.getFieldFromFieldName(field.name()); + if (mappedField != null) { + keys.add(field.name()); + Object value = keyBase.get(field.pos()); + value = getFieldValueFromAvroBean(field.schema(), field.schema().getType(), value, mappedField); + values.add(value); + } else { + LOG.debug("Ignoring field {}, Since field couldn't find in the {} mapping", new Object[]{field.name(), cassandraMapping.getPersistentClass()}); + } + } + } else { + LOG.error("Key bean isn't extended by {} .", new Object[]{cassandraMapping.getKeyClass(), PersistentBase.class}); + } + } else { + keys.add(cassandraMapping.getInlinedDefinedPartitionKey().getFieldName()); + values.add(key); + } + } + + /** + * For every field within an object, we pass in a field schema, Type and value. + * This enables us to process fields (based on their characteristics) + * preparing them for persistence. + * + * @param fieldSchema the associated field schema + * @param type the field type + * @param fieldValue the field value. + * @return field value + */ + static Object getFieldValueFromAvroBean(Schema fieldSchema, Schema.Type type, Object fieldValue, Field field) { + switch (type) { + // Record can be persist with two ways, udt and bytes + case RECORD: + if (field.getType().contains("blob")) { + try { + byte[] serializedBytes = HBaseByteInterface.toBytes(fieldValue, fieldSchema); + fieldValue = ByteBuffer.wrap(serializedBytes); + } catch (IOException e) { + LOG.error("Error occurred when serializing {} field. {}", new Object[]{field.getFieldName(), e.getMessage()}); + } + } else { + throw new RuntimeException("Unsupported Data Type for Record, Currently Supported Data Types are blob and UDT for Records"); + } + break; + case MAP: + Schema valueSchema = fieldSchema.getValueType(); + Schema.Type valuetype = valueSchema.getType(); + Map<String, Object> map = new HashMap<>(); + for (Map.Entry<CharSequence, ?> e : ((Map<CharSequence, ?>) fieldValue).entrySet()) { + String mapKey = e.getKey().toString(); + Object mapValue = e.getValue(); + mapValue = getFieldValueFromAvroBean(valueSchema, valuetype, mapValue, field); + map.put(mapKey, mapValue); + } + fieldValue = map; + break; + case ARRAY: + valueSchema = fieldSchema.getElementType(); + valuetype = valueSchema.getType(); + ArrayList<Object> list = new ArrayList<>(); + for (Object item : (Collection<?>) fieldValue) { + Object value = getFieldValueFromAvroBean(valueSchema, valuetype, item, field); + list.add(value); + } + fieldValue = list; + break; + case UNION: + // storing the union selected schema, the actual value will + // be stored as soon as we get break out. + if (fieldValue != null) { + int schemaPos = getUnionSchema(fieldValue, fieldSchema); + Schema unionSchema = fieldSchema.getTypes().get(schemaPos); + Schema.Type unionType = unionSchema.getType(); + fieldValue = getFieldValueFromAvroBean(unionSchema, unionType, fieldValue, field); + } + break; + case STRING: + fieldValue = fieldValue.toString(); + break; + default: + break; + } + return fieldValue; + } + + /** + * Given an object and the object schema this function obtains, + * from within the UNION schema, the position of the type used. + * If no data type can be inferred then we return a default value + * of position 0. + * + * @param pValue Object + * @param pUnionSchema avro Schema + * @return the unionSchemaPosition. + */ + private static int getUnionSchema(Object pValue, Schema pUnionSchema) { + int unionSchemaPos = 0; +// String valueType = pValue.getClass().getSimpleName(); + for (Schema currentSchema : pUnionSchema.getTypes()) { + Schema.Type schemaType = currentSchema.getType(); + if (pValue instanceof CharSequence && schemaType.equals(Schema.Type.STRING)) + return unionSchemaPos; + else if (pValue instanceof ByteBuffer && schemaType.equals(Schema.Type.BYTES)) + return unionSchemaPos; + else if (pValue instanceof Integer && schemaType.equals(Schema.Type.INT)) + return unionSchemaPos; + else if (pValue instanceof Long && schemaType.equals(Schema.Type.LONG)) + return unionSchemaPos; + else if (pValue instanceof Double && schemaType.equals(Schema.Type.DOUBLE)) + return unionSchemaPos; + else if (pValue instanceof Float && schemaType.equals(Schema.Type.FLOAT)) + return unionSchemaPos; + else if (pValue instanceof Boolean && schemaType.equals(Schema.Type.BOOLEAN)) + return unionSchemaPos; + else if (pValue instanceof Map && schemaType.equals(Schema.Type.MAP)) + return unionSchemaPos; + else if (pValue instanceof List && schemaType.equals(Schema.Type.ARRAY)) + return unionSchemaPos; + else if (pValue instanceof Persistent && schemaType.equals(Schema.Type.RECORD)) + return unionSchemaPos; + else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.STRING)) + return unionSchemaPos; + else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.INT)) + return unionSchemaPos; + else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.LONG)) + return unionSchemaPos; + else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.DOUBLE)) + return unionSchemaPos; + else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.FLOAT)) + return unionSchemaPos; + else if (pValue != null && ByteBuffer.class.isAssignableFrom(pValue.getClass()) && schemaType.equals(Schema.Type.RECORD)) + return unionSchemaPos; + unionSchemaPos++; + } + // if we weren't able to determine which data type it is, then we return the default + return DEFAULT_UNION_SCHEMA; + } + + static Object getAvroFieldValue(Object value, Schema schema) { + Object result; + switch (schema.getType()) { + + case MAP: + Map<String, Object> rawMap = (Map<String, Object>) value; + Map<Utf8, Object> utf8ObjectHashMap = new HashMap<>(); + if (rawMap == null) { + result = new DirtyMapWrapper(utf8ObjectHashMap); + break; + } + for (Map.Entry<?, ?> e : rawMap.entrySet()) { + Schema innerSchema = schema.getValueType(); + Object obj = getAvroFieldValue(e.getValue(), innerSchema); + if (e.getKey().getClass().getSimpleName().equalsIgnoreCase("Utf8")) { + utf8ObjectHashMap.put((Utf8) e.getKey(), obj); + } else { + utf8ObjectHashMap.put(new Utf8((String) e.getKey()), obj); + } + } + result = new DirtyMapWrapper<>(utf8ObjectHashMap); + break; + + case ARRAY: + List<Object> rawList = (List<Object>) value; + List<Object> objectArrayList = new ArrayList<>(); + if (rawList == null) { + return new DirtyListWrapper(objectArrayList); + } + for (Object item : rawList) { + Object obj = getAvroFieldValue(item, schema.getElementType()); + objectArrayList.add(obj); + } + result = new DirtyListWrapper<>(objectArrayList); + break; + + case RECORD: + if (value != null && ByteBuffer.class.isAssignableFrom(value.getClass())) { + ByteBuffer buffer = (ByteBuffer) value; + byte[] arr = new byte[buffer.remaining()]; + buffer.get(arr); + try { + result = (PersistentBase) HBaseByteInterface.fromBytes(schema, arr); + } catch (IOException e) { + LOG.error("Error occurred while deserialize the Record. :" + e.getMessage()); + result = null; + } + } else { + result = (PersistentBase) value; + } + break; + + case UNION: + int index = getUnionSchema(value, schema); + Schema resolvedSchema = schema.getTypes().get(index); + result = getAvroFieldValue(value, resolvedSchema); + break; + + case ENUM: + result = org.apache.gora.util.AvroUtils.getEnumValue(schema, (String) value); + break; + + case BYTES: + if (ByteBuffer.class.isAssignableFrom(value.getClass())) { + result = value; + } else { + result = ByteBuffer.wrap((byte[]) value); + } + break; + + case STRING: + if (value instanceof org.apache.avro.util.Utf8) { + result = value; + } else if (ByteBuffer.class.isAssignableFrom(value.getClass())) { + result = new Utf8(((ByteBuffer) value).array()); + } else { + result = new Utf8((String) value); + } + break; + + case INT: + if (ByteBuffer.class.isAssignableFrom(value.getClass())) { + result = ((ByteBuffer) value).getInt(); + } else { + result = value; + } + break; + + case FLOAT: + if (ByteBuffer.class.isAssignableFrom(value.getClass())) { + result = ((ByteBuffer) value).getFloat(); + } else { + result = value; + } + break; + + case DOUBLE: + if (ByteBuffer.class.isAssignableFrom(value.getClass())) { + result = ((ByteBuffer) value).getDouble(); + } else { + result = value; + } + break; + + case LONG: + if (ByteBuffer.class.isAssignableFrom(value.getClass())) { + result = ((ByteBuffer) value).getLong(); + } else { + result = value; + } + break; + + default: + result = value; + } + return result; + } + + static Class getRelevantClassForCassandraDataType(String dataType) { + switch (dataType) { + case "ascii": + case "text": + case "varchar": + return String.class; + case "blob": + return ByteBuffer.class; + case "int": + return Integer.class; + case "double": + return Double.class; + case "bigint": + case "counter": + return Long.class; + case "decimal": + return BigDecimal.class; + case "float": + return Float.class; + case "boolean": + return Boolean.class; + case "inet": + return InetAddress.class; + case "varint": + return BigInteger.class; + case "uuid": + return UUID.class; + case "timestamp": + return Date.class; + default: + throw new RuntimeException("Invalid Cassandra DataType"); + } + } + +} http://git-wip-us.apache.org/repos/asf/gora/blob/89683c74/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java ---------------------------------------------------------------------- diff --git a/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java new file mode 100644 index 0000000..204ae52 --- /dev/null +++ b/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/AvroSerializer.java @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.cassandra.serializers; + +import com.datastax.driver.core.AbstractGettableData; +import com.datastax.driver.core.ColumnDefinitions; +import com.datastax.driver.core.ConsistencyLevel; +import com.datastax.driver.core.DataType; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.SimpleStatement; +import com.datastax.driver.core.UDTValue; +import com.datastax.driver.core.UserType; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; +import org.apache.commons.lang.ArrayUtils; +import org.apache.gora.cassandra.bean.CassandraKey; +import org.apache.gora.cassandra.bean.Field; +import org.apache.gora.cassandra.query.CassandraResultSet; +import org.apache.gora.cassandra.store.CassandraClient; +import org.apache.gora.cassandra.store.CassandraMapping; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.Query; +import org.apache.gora.query.Result; +import org.apache.gora.store.DataStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * This class contains the operations relates to Avro Serialization. + */ +class AvroSerializer<K, T extends PersistentBase> extends CassandraSerializer { + + + private static final Logger LOG = LoggerFactory.getLogger(AvroSerializer.class); + + private DataStore<K, T> cassandraDataStore; + + private Schema persistentSchema; + + AvroSerializer(CassandraClient cassandraClient, DataStore<K, T> dataStore, CassandraMapping mapping) { + super(cassandraClient, dataStore.getKeyClass(), dataStore.getPersistentClass(), mapping); + if (PersistentBase.class.isAssignableFrom(dataStore.getPersistentClass())) { + persistentSchema = ((PersistentBase) dataStore.getBeanFactory().getCachedPersistent()).getSchema(); + } else { + persistentSchema = null; + } + this.cassandraDataStore = dataStore; + try { + analyzePersistent(); + } catch (Exception e) { + throw new RuntimeException("Error occurred while analyzing the persistent class, :" + e.getMessage()); + } + } + + /** + * {@inheritDoc} + * + * @throws Exception + */ + protected void analyzePersistent() throws Exception { + userDefineTypeMaps = new HashMap<>(); + for (Field field : mapping.getFieldList()) { + String fieldType = field.getType(); + if (fieldType.contains("frozen")) { + String udtType = fieldType.substring(fieldType.indexOf("<") + 1, fieldType.indexOf(">")); + if (PersistentBase.class.isAssignableFrom(persistentClass)) { + Schema fieldSchema = persistentSchema.getField(field.getFieldName()).schema(); + if (fieldSchema.getType().equals(Schema.Type.UNION)) { + for (Schema currentSchema : fieldSchema.getTypes()) { + if (currentSchema.getType().equals(Schema.Type.RECORD)) { + fieldSchema = currentSchema; + break; + } + } + } + String createQuery = CassandraQueryFactory.getCreateUDTTypeForAvro(mapping, udtType, fieldSchema); + userDefineTypeMaps.put(udtType, createQuery); + } else { + throw new RuntimeException("Unsupported Class for User Define Types, Please use PersistentBase class. field : " + udtType); + } + } + } + } + + /** + * {@inheritDoc} + * + * @param query + * @return + */ + @Override + public boolean updateByQuery(Query query) { + List<Object> objectArrayList = new ArrayList<>(); + String cqlQuery = CassandraQueryFactory.getUpdateByQueryForAvro(mapping, query, objectArrayList, persistentSchema); + ResultSet results; + SimpleStatement statement; + if (objectArrayList.size() == 0) { + statement = new SimpleStatement(cqlQuery); + } else { + statement = new SimpleStatement(cqlQuery, objectArrayList.toArray()); + } + if (writeConsistencyLevel != null) { + statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel)); + } + results = client.getSession().execute(statement); + return results.wasApplied(); + } + + /** + * {@inheritDoc} + * + * @param key + * @param fields + * @return + */ + @Override + public Persistent get(Object key, String[] fields) { + if (fields == null) { + fields = getFields(); + } + ArrayList<String> cassandraKeys = new ArrayList<>(); + ArrayList<Object> cassandraValues = new ArrayList<>(); + AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues); + String cqlQuery = CassandraQueryFactory.getSelectObjectWithFieldsQuery(mapping, fields, cassandraKeys); + SimpleStatement statement = new SimpleStatement(cqlQuery, cassandraValues.toArray()); + if (readConsistencyLevel != null) { + statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel)); + } + ResultSet resultSet = this.client.getSession().execute(statement); + Iterator<Row> iterator = resultSet.iterator(); + ColumnDefinitions definitions = resultSet.getColumnDefinitions(); + T obj = null; + if (iterator.hasNext()) { + obj = cassandraDataStore.newPersistent(); + AbstractGettableData row = (AbstractGettableData) iterator.next(); + populateValuesToPersistent(row, definitions, obj, fields); + } + return obj; + } + + /** + * {@inheritDoc} + * + * @param key + * @param persistent + */ + @Override + public void put(Object key, Persistent persistent) { + if (persistent instanceof PersistentBase) { + if (persistent.isDirty()) { + PersistentBase persistentBase = (PersistentBase) persistent; + ArrayList<String> fields = new ArrayList<>(); + ArrayList<Object> values = new ArrayList<>(); + AvroCassandraUtils.processKeys(mapping, key, fields, values); + for (Schema.Field f : persistentBase.getSchema().getFields()) { + String fieldName = f.name(); + Field field = mapping.getFieldFromFieldName(fieldName); + if (field == null) { + LOG.debug("Ignoring {} adding field, {} field can't find in {} mapping", new Object[]{fieldName, fieldName, persistentClass}); + continue; + } + if (persistent.isDirty(f.pos()) || mapping.getInlinedDefinedPartitionKey().equals(mapping.getFieldFromFieldName(fieldName))) { + Object value = persistentBase.get(f.pos()); + String fieldType = field.getType(); + if (fieldType.contains("frozen")) { + fieldType = fieldType.substring(fieldType.indexOf("<") + 1, fieldType.indexOf(">")); + UserType userType = client.getSession().getCluster().getMetadata().getKeyspace(mapping.getKeySpace().getName()).getUserType(fieldType); + UDTValue udtValue = userType.newValue(); + Schema udtSchema = f.schema(); + if (udtSchema.getType().equals(Schema.Type.UNION)) { + for (Schema schema : udtSchema.getTypes()) { + if (schema.getType().equals(Schema.Type.RECORD)) { + udtSchema = schema; + break; + } + } + } + PersistentBase udtObjectBase = (PersistentBase) value; + for (Schema.Field udtField : udtSchema.getFields()) { + Object udtFieldValue = AvroCassandraUtils.getFieldValueFromAvroBean(udtField.schema(), udtField.schema().getType(), udtObjectBase.get(udtField.name()), field); + if (udtField.schema().getType().equals(Schema.Type.MAP)) { + udtValue.setMap(udtField.name(), (Map) udtFieldValue); + } else if (udtField.schema().getType().equals(Schema.Type.ARRAY)) { + udtValue.setList(udtField.name(), (List) udtFieldValue); + } else { + udtValue.set(udtField.name(), udtFieldValue, (Class) udtFieldValue.getClass()); + } + } + value = udtValue; + } else { + value = AvroCassandraUtils.getFieldValueFromAvroBean(f.schema(), f.schema().getType(), value, field); + } + values.add(value); + fields.add(fieldName); + } + } + String cqlQuery = CassandraQueryFactory.getInsertDataQuery(mapping, fields); + SimpleStatement statement = new SimpleStatement(cqlQuery, values.toArray()); + if (writeConsistencyLevel != null) { + statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel)); + } + client.getSession().execute(statement); + } else { + LOG.info("Ignored putting persistent bean {} in the store as it is neither " + + "new, neither dirty.", new Object[]{persistent}); + } + } else { + LOG.error("{} Persistent bean isn't extended by {} .", new Object[]{this.persistentClass, PersistentBase.class}); + } + } + + /** + * {@inheritDoc} + * + * @param key + * @return + */ + @Override + public Persistent get(Object key) { + ArrayList<String> cassandraKeys = new ArrayList<>(); + ArrayList<Object> cassandraValues = new ArrayList<>(); + AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues); + String cqlQuery = CassandraQueryFactory.getSelectObjectQuery(mapping, cassandraKeys); + SimpleStatement statement = new SimpleStatement(cqlQuery, cassandraValues.toArray()); + if (readConsistencyLevel != null) { + statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel)); + } + ResultSet resultSet = client.getSession().execute(statement); + Iterator<Row> iterator = resultSet.iterator(); + ColumnDefinitions definitions = resultSet.getColumnDefinitions(); + T obj = null; + if (iterator.hasNext()) { + obj = cassandraDataStore.newPersistent(); + AbstractGettableData row = (AbstractGettableData) iterator.next(); + populateValuesToPersistent(row, definitions, obj, mapping.getFieldNames()); + } + return obj; + } + + /** + * This method wraps result set data in to DataEntry and creates a list of DataEntry. + **/ + private void populateValuesToPersistent(AbstractGettableData row, ColumnDefinitions columnDefinitions, PersistentBase base, String[] fields) { + Object paramValue; + for (String fieldName : fields) { + Schema.Field avroField = base.getSchema().getField(fieldName); + Field field = mapping.getFieldFromFieldName(fieldName); + //to ignore unspecified fields in the mapping + if (field == null || avroField == null) { + continue; + } + Schema fieldSchema = avroField.schema(); + String columnName = field.getColumnName(); + paramValue = getValue(row, columnDefinitions.getType(columnName), columnName, fieldSchema); + Object value = AvroCassandraUtils.getAvroFieldValue(paramValue, fieldSchema); + base.put(avroField.pos(), value); + } + } + + private Object getValue(AbstractGettableData row, DataType columnType, String columnName, Schema schema) { + Object paramValue; + String dataType; + switch (columnType.getName()) { + case ASCII: + paramValue = row.getString(columnName); + break; + case BIGINT: + paramValue = row.isNull(columnName) ? null : row.getLong(columnName); + break; + case BLOB: + paramValue = row.isNull(columnName) ? null : row.getBytes(columnName); + break; + case BOOLEAN: + paramValue = row.isNull(columnName) ? null : row.getBool(columnName); + break; + case COUNTER: + paramValue = row.isNull(columnName) ? null : row.getLong(columnName); + break; + case DECIMAL: + paramValue = row.isNull(columnName) ? null : row.getDecimal(columnName); + break; + case DOUBLE: + paramValue = row.isNull(columnName) ? null : row.getDouble(columnName); + break; + case FLOAT: + paramValue = row.isNull(columnName) ? null : row.getFloat(columnName); + break; + case INET: + paramValue = row.isNull(columnName) ? null : row.getInet(columnName).toString(); + break; + case INT: + paramValue = row.isNull(columnName) ? null : row.getInt(columnName); + break; + case TEXT: + paramValue = row.getString(columnName); + break; + case TIMESTAMP: + paramValue = row.isNull(columnName) ? null : row.getDate(columnName); + break; + case UUID: + paramValue = row.isNull(columnName) ? null : row.getUUID(columnName); + break; + case VARCHAR: + paramValue = row.getString(columnName); + break; + case VARINT: + paramValue = row.isNull(columnName) ? null : row.getVarint(columnName); + break; + case TIMEUUID: + paramValue = row.isNull(columnName) ? null : row.getUUID(columnName); + break; + case LIST: + dataType = columnType.getTypeArguments().get(0).toString(); + paramValue = row.isNull(columnName) ? null : row.getList(columnName, AvroCassandraUtils.getRelevantClassForCassandraDataType(dataType)); + break; + case SET: + dataType = columnType.getTypeArguments().get(0).toString(); + paramValue = row.isNull(columnName) ? null : row.getList(columnName, AvroCassandraUtils.getRelevantClassForCassandraDataType(dataType)); + break; + case MAP: + dataType = columnType.getTypeArguments().get(1).toString(); + // Avro supports only String for keys + paramValue = row.isNull(columnName) ? null : row.getMap(columnName, String.class, AvroCassandraUtils.getRelevantClassForCassandraDataType(dataType)); + break; + case UDT: + paramValue = row.isNull(columnName) ? null : row.getUDTValue(columnName); + if (paramValue != null) { + try { + PersistentBase udtObject = (PersistentBase) SpecificData.newInstance(Class.forName(schema.getFullName()), schema); + for (Schema.Field f : udtObject.getSchema().getFields()) { + DataType dType = ((UDTValue) paramValue).getType().getFieldType(f.name()); + Object fieldValue = getValue((UDTValue) paramValue, dType, f.name(), f.schema()); + udtObject.put(f.pos(), fieldValue); + } + paramValue = udtObject; + } catch (ClassNotFoundException e) { + throw new RuntimeException("Error occurred while populating data to " + schema.getFullName() + " : " + e.getMessage()); + } + } + break; + case TUPLE: + paramValue = row.isNull(columnName) ? null : row.getTupleValue(columnName).toString(); + break; + case CUSTOM: + paramValue = row.isNull(columnName) ? null : row.getBytes(columnName); + break; + default: + paramValue = row.getString(columnName); + break; + } + return paramValue; + } + + /** + * {@inheritDoc} + * + * @param key + * @return + */ + @Override + public boolean delete(Object key) { + ArrayList<String> cassandraKeys = new ArrayList<>(); + ArrayList<Object> cassandraValues = new ArrayList<>(); + AvroCassandraUtils.processKeys(mapping, key, cassandraKeys, cassandraValues); + String cqlQuery = CassandraQueryFactory.getDeleteDataQuery(mapping, cassandraKeys); + SimpleStatement statement = new SimpleStatement(cqlQuery, cassandraValues.toArray()); + if (writeConsistencyLevel != null) { + statement.setConsistencyLevel(ConsistencyLevel.valueOf(writeConsistencyLevel)); + } + ResultSet resultSet = client.getSession().execute(statement); + return resultSet.wasApplied(); + } + + /** + * {@inheritDoc} + * + * @param dataStore + * @param query + * @return + */ + @Override + public Result execute(DataStore dataStore, Query query) { + List<Object> objectArrayList = new ArrayList<>(); + String[] fields = query.getFields(); + if (fields != null) { + fields = (String[]) ArrayUtils.addAll(fields, mapping.getAllKeys()); + } else { + fields = mapping.getAllFieldsIncludingKeys(); + } + CassandraResultSet<K, T> cassandraResult = new CassandraResultSet<>(dataStore, query); + String cqlQuery = CassandraQueryFactory.getExecuteQuery(mapping, query, objectArrayList, fields); + ResultSet results; + SimpleStatement statement; + if (objectArrayList.size() == 0) { + statement = new SimpleStatement(cqlQuery); + } else { + statement = new SimpleStatement(cqlQuery, objectArrayList.toArray()); + } + if (readConsistencyLevel != null) { + statement.setConsistencyLevel(ConsistencyLevel.valueOf(readConsistencyLevel)); + } + results = client.getSession().execute(statement); + Iterator<Row> iterator = results.iterator(); + ColumnDefinitions definitions = results.getColumnDefinitions(); + T obj; + K keyObject; + CassandraKey cassandraKey = mapping.getCassandraKey(); + while (iterator.hasNext()) { + AbstractGettableData row = (AbstractGettableData) iterator.next(); + obj = cassandraDataStore.newPersistent(); + keyObject = cassandraDataStore.newKey(); + populateValuesToPersistent(row, definitions, obj, fields); + if (cassandraKey != null) { + populateValuesToPersistent(row, definitions, (PersistentBase) keyObject, cassandraKey.getFieldNames()); + } else { + Field key = mapping.getInlinedDefinedPartitionKey(); + keyObject = (K) getValue(row, definitions.getType(key.getColumnName()), key.getColumnName(), null); + } + cassandraResult.addResultElement(keyObject, obj); + } + return cassandraResult; + } + +}