http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java index 113d0c4..1d9093d 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStore.java @@ -17,35 +17,8 @@ package org.apache.gora.cassandra.store; -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.ConsistencyLevel; -import com.datastax.driver.core.HostDistance; -import com.datastax.driver.core.KeyspaceMetadata; -import com.datastax.driver.core.PoolingOptions; -import com.datastax.driver.core.ProtocolOptions; -import com.datastax.driver.core.ProtocolVersion; -import com.datastax.driver.core.QueryOptions; -import com.datastax.driver.core.Session; -import com.datastax.driver.core.SocketOptions; -import com.datastax.driver.core.TableMetadata; -import com.datastax.driver.core.TypeCodec; -import com.datastax.driver.core.policies.ConstantReconnectionPolicy; -import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; -import com.datastax.driver.core.policies.DefaultRetryPolicy; -import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy; -import com.datastax.driver.core.policies.ExponentialReconnectionPolicy; -import com.datastax.driver.core.policies.FallthroughRetryPolicy; -import com.datastax.driver.core.policies.LatencyAwarePolicy; -import com.datastax.driver.core.policies.LoggingRetryPolicy; -import com.datastax.driver.core.policies.RoundRobinPolicy; -import com.datastax.driver.core.policies.TokenAwarePolicy; -import com.datastax.driver.mapping.Mapper; -import com.datastax.driver.mapping.MappingManager; -import org.apache.gora.cassandra.bean.CassandraKey; -import org.apache.gora.cassandra.bean.ClusterKeyField; -import org.apache.gora.cassandra.bean.Field; -import org.apache.gora.cassandra.bean.KeySpace; -import org.apache.gora.cassandra.bean.PartitionKeyField; +import org.apache.gora.cassandra.persistent.CassandraNativePersistent; +import org.apache.gora.cassandra.serializers.CassandraSerializer; import org.apache.gora.persistency.BeanFactory; import org.apache.gora.persistency.Persistent; import org.apache.gora.query.PartitionQuery; @@ -53,18 +26,11 @@ import org.apache.gora.query.Query; import org.apache.gora.query.Result; import org.apache.gora.store.DataStore; import org.apache.gora.store.DataStoreFactory; -import org.jdom.Attribute; -import org.jdom.Document; -import org.jdom.Element; -import org.jdom.JDOMException; -import org.jdom.input.SAXBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; import java.util.List; -import java.util.Locale; import java.util.Properties; /** @@ -81,19 +47,23 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> private BeanFactory<K, T> beanFactory; - private Cluster cluster; + private Class<K> keyClass; - private Class keyClass; - - private Class persistentClass; + private Class<T> persistentClass; private CassandraMapping mapping; - private boolean isUseNativeSerialization; + private CassandraSerializer cassandraSerializer; + + public enum SerializerType { + AVRO("AVRO"), NATIVE("NATIVE"); + String val; - private Mapper<T> mapper; + SerializerType(String v) { + this.val = v; + } + } - private Session session; public CassandraStore() { super(); @@ -113,22 +83,11 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> this.keyClass = keyClass; this.persistentClass = persistentClass; String mappingFile = DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE); - List<String> codecs = readCustomCodec(properties); - mapping = readMapping(mappingFile); - isUseNativeSerialization = Boolean.parseBoolean(properties.getProperty(CassandraStoreParameters.USE_CASSANDRA_NATIVE_SERIALIZATION)); - Cluster.Builder builder = Cluster.builder(); - builder = populateSettings(builder, properties); - this.cluster = builder.build(); - if (codecs != null) { - registerCustomCodecs(codecs); - } - this.session = this.cluster.connect(); - if (isUseNativeSerialization) { - this.createSchema(); - MappingManager mappingManager = new MappingManager(session); - mapper = mappingManager.mapper(persistentClass); - } - + CassandraMappingBuilder mappingBuilder = new CassandraMappingBuilder(this); + mapping = mappingBuilder.readMapping(mappingFile); + CassandraClient cassandraClient = new CassandraClient(); + cassandraClient.initialize(properties); + cassandraSerializer = CassandraSerializer.getSerializer(cassandraClient, properties.getProperty(CassandraStoreParameters.CASSANDRA_SERIALIZATION_TYPE), keyClass, persistentClass, mapping); } catch (Exception e) { LOG.error("Error while initializing Cassandra store: {}", new Object[]{e.getMessage()}); @@ -136,501 +95,13 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> } } - private void registerCustomCodecs(List<String> codecs) throws Exception { - for (String codec : codecs) { - this.cluster.getConfiguration().getCodecRegistry().register((TypeCodec<?>) Class.forName(codec).newInstance()); - } - } - - /** - * In this method we reads the mapping file and creates the Cassandra Mapping. - * - * @param filename mapping file name - * @return @{@link CassandraMapping} - * @throws IOException - */ - private CassandraMapping readMapping(String filename) throws IOException { - CassandraMapping map = new CassandraMapping(); - try { - SAXBuilder builder = new SAXBuilder(); - Document doc = builder.build(getClass().getClassLoader().getResourceAsStream(filename)); - - List<Element> keyspaces = doc.getRootElement().getChildren("keyspace"); - List<Element> classes = doc.getRootElement().getChildren("class"); - List<Element> keys = doc.getRootElement().getChildren("cassandraKey"); - - boolean classMatched = false; - for (Element classElement : classes) { - if (classElement.getAttributeValue("keyClass").equals( - keyClass.getCanonicalName()) - && classElement.getAttributeValue("name").equals( - persistentClass.getCanonicalName())) { - - classMatched = true; - String tableName = classElement.getAttributeValue("table"); - map.setCoreName(tableName); - - List classAttributes = classElement.getAttributes(); - for (Object anAttributeList : classAttributes) { - Attribute attribute = (Attribute) anAttributeList; - String attributeName = attribute.getName(); - String attributeValue = attribute.getValue(); - map.addProperty(attributeName, attributeValue); - } - - List<Element> fields = classElement.getChildren("field"); - - for (Element field : fields) { - Field cassandraField = new Field(); - - List fieldAttributes = field.getAttributes(); - processAttributes(fieldAttributes, cassandraField); - map.addCassandraField(cassandraField); - } - break; - } - LOG.warn("Check that 'keyClass' and 'name' parameters in gora-solr-mapping.xml " - + "match with intended values. A mapping mismatch has been found therefore " - + "no mapping has been initialized for class mapping at position " - + " {} in mapping file.", classes.indexOf(classElement)); - } - if (!classMatched) { - LOG.error("Check that 'keyClass' and 'name' parameters in {} no mapping has been initialized for {} class mapping", filename, persistentClass); - } - - String keyspaceName = map.getProperty("keyspace"); - if (keyspaceName != null) { - KeySpace keyspace; - for (Element keyspaceElement : keyspaces) { - if (keyspaceName.equals(keyspaceElement.getAttributeValue("name"))) { - keyspace = new KeySpace(); - List fieldAttributes = keyspaceElement.getAttributes(); - for (Object attributeObject : fieldAttributes) { - Attribute attribute = (Attribute) attributeObject; - String attributeName = attribute.getName(); - String attributeValue = attribute.getValue(); - switch (attributeName) { - case "name": - keyspace.setName(attributeValue); - break; - case "durableWrite": - keyspace.setDurableWritesEnabled(Boolean.parseBoolean(attributeValue)); - break; - default: - keyspace.addProperty(attributeName, attributeValue); - break; - } - } - Element placementStrategy = keyspaceElement.getChild("placementStrategy"); - switch (KeySpace.PlacementStrategy.valueOf(placementStrategy.getAttributeValue("name"))) { - case SimpleStrategy: - keyspace.setPlacementStrategy(KeySpace.PlacementStrategy.SimpleStrategy); - keyspace.setReplicationFactor(Integer.parseInt(placementStrategy.getAttributeValue("replication_factor"))); - break; - case NetworkTopologyStrategy: - List<Element> dataCenters = placementStrategy.getChildren("datacenter"); - keyspace.setPlacementStrategy(KeySpace.PlacementStrategy.NetworkTopologyStrategy); - for (Element dataCenter : dataCenters) { - String dataCenterName = dataCenter.getAttributeValue("name"); - Integer dataCenterReplicationFactor = Integer.valueOf(dataCenter.getAttributeValue("replication_factor")); - keyspace.addDataCenter(dataCenterName, dataCenterReplicationFactor); - } - break; - } - map.setKeySpace(keyspace); - break; - } - - } - - } - - for (Element key : keys) { - if (keyClass.getName().equals(key.getAttributeValue("name"))) { - CassandraKey cassandraKey = new CassandraKey(keyClass.getName()); - Element partitionKeys = key.getChild("partitionKey"); - Element clusterKeys = key.getChild("clusterKey"); - List<Element> partitionKeyFields = partitionKeys.getChildren("field"); - List<Element> partitionCompositeKeyFields = partitionKeys.getChildren("compositeKey"); - // process non composite partition keys - for (Element partitionKeyField : partitionKeyFields) { - PartitionKeyField fieldKey = new PartitionKeyField(); - List fieldAttributes = partitionKeyField.getAttributes(); - processAttributes(fieldAttributes, fieldKey); - cassandraKey.addPartitionKeyField(fieldKey); - } - // process composite partitions keys - for (Element partitionCompositeKeyField : partitionCompositeKeyFields) { - PartitionKeyField compositeFieldKey = new PartitionKeyField(); - compositeFieldKey.setComposite(true); - List<Element> compositeKeyFields = partitionCompositeKeyField.getChildren("field"); - for (Element partitionKeyField : compositeKeyFields) { - PartitionKeyField fieldKey = new PartitionKeyField(); - List fieldAttributes = partitionKeyField.getAttributes(); - processAttributes(fieldAttributes, fieldKey); - compositeFieldKey.addField(fieldKey); - } - cassandraKey.addPartitionKeyField(compositeFieldKey); - } - - //process cluster keys - List<Element> clusterKeyFields = clusterKeys.getChildren("field"); - for (Element clusterKeyField : clusterKeyFields) { - ClusterKeyField keyField = new ClusterKeyField(); - List fieldAttributes = clusterKeyField.getAttributes(); - for (Object anAttributeList : fieldAttributes) { - Attribute attribute = (Attribute) anAttributeList; - String attributeName = attribute.getName(); - String attributeValue = attribute.getValue(); - switch (attributeName) { - case "name": - keyField.setFieldName(attributeValue); - break; - case "column": - keyField.setColumnName(attributeValue); - break; - case "type": - keyField.setType(attributeValue); - break; - case "order": - keyField.setOrder(ClusterKeyField.Order.valueOf(attributeValue.toUpperCase(Locale.ENGLISH))); - break; - default: - keyField.addProperty(attributeName, attributeValue); - break; - } - } - cassandraKey.addClusterKeyField(keyField); - } - map.setCassandraKey(cassandraKey); - } - } - } catch (Exception ex) { - throw new IOException(ex); - } - return map; - } - - private void processAttributes(List<Element> attributes, Field fieldKey) { - for (Object anAttributeList : attributes) { - Attribute attribute = (Attribute) anAttributeList; - String attributeName = attribute.getName(); - String attributeValue = attribute.getValue(); - switch (attributeName) { - case "name": - fieldKey.setFieldName(attributeValue); - break; - case "column": - fieldKey.setColumnName(attributeValue); - break; - case "type": - fieldKey.setType(attributeValue); - break; - default: - fieldKey.addProperty(attributeName, attributeValue); - break; - } - } - } - - private List<String> readCustomCodec(Properties properties) throws JDOMException, IOException { - String filename = properties.getProperty(CassandraStoreParameters.CUSTOM_CODEC_FILE); - if (filename != null) { - List<String> codecs = new ArrayList<>(); - SAXBuilder builder = new SAXBuilder(); - Document doc = builder.build(getClass().getClassLoader().getResourceAsStream(filename)); - List<Element> codecElementList = doc.getRootElement().getChildren("codec"); - for (Element codec : codecElementList) { - codecs.add(codec.getValue()); - } - return codecs; - } - return null; - } - - private Cluster.Builder populateSettings(Cluster.Builder builder, Properties properties) { - String serversParam = properties.getProperty(CassandraStoreParameters.CASSANDRA_SERVERS); - String[] servers = serversParam.split(","); - for (String server : servers) { - builder = builder.addContactPoint(server); - } - String portProp = properties.getProperty(CassandraStoreParameters.PORT); - if (portProp != null) { - builder = builder.withPort(Integer.parseInt(portProp)); - } - String clusterNameProp = properties.getProperty(CassandraStoreParameters.CLUSTER_NAME); - if (clusterNameProp != null) { - builder = builder.withClusterName(clusterNameProp); - } - String compressionProp = properties.getProperty(CassandraStoreParameters.COMPRESSION); - if (compressionProp != null) { - builder = builder.withCompression(ProtocolOptions.Compression.valueOf(compressionProp)); - } - builder = this.populateCredentials(properties, builder); - builder = this.populateLoadBalancingProp(properties, builder); - String enableJMXProp = properties.getProperty(CassandraStoreParameters.ENABLE_JMX_REPORTING); - if (enableJMXProp != null) { - if (!Boolean.parseBoolean(enableJMXProp)) { - builder = builder.withoutJMXReporting(); - } - } - String enableMetricsProp = properties.getProperty(CassandraStoreParameters.ENABLE_METRICS); - if (enableMetricsProp != null) { - if (!Boolean.parseBoolean(enableMetricsProp)) { - builder = builder.withoutMetrics(); - } - } - builder = this.populatePoolingSettings(properties, builder); - String versionProp = properties.getProperty(CassandraStoreParameters.PROTOCOL_VERSION); - if (versionProp != null) { - builder = builder.withProtocolVersion(ProtocolVersion.fromInt(Integer.parseInt(versionProp))); - } - builder = this.populateQueryOptions(properties, builder); - builder = this.populateReconnectPolicy(properties, builder); - builder = this.populateRetrytPolicy(properties, builder); - builder = this.populateSocketOptions(properties, builder); - String enableSSLProp = properties.getProperty(CassandraStoreParameters.ENABLE_SSL); - if (enableSSLProp != null) { - if (Boolean.parseBoolean(enableSSLProp)) { - builder = builder.withSSL(); - } - } - return builder; - } - - - private Cluster.Builder populateLoadBalancingProp(Properties properties, Cluster.Builder builder) { - String loadBalancingProp = properties.getProperty(CassandraStoreParameters.LOAD_BALANCING_POLICY); - if (loadBalancingProp != null) { - switch (loadBalancingProp) { - case "LatencyAwareRoundRobinPolicy": - builder = builder.withLoadBalancingPolicy(LatencyAwarePolicy.builder(new RoundRobinPolicy()).build()); - break; - case "RoundRobinPolicy": - builder = builder.withLoadBalancingPolicy(new RoundRobinPolicy()); - break; - case "DCAwareRoundRobinPolicy": { - String dataCenter = properties.getProperty(CassandraStoreParameters.DATA_CENTER); - boolean allowRemoteDCsForLocalConsistencyLevel = Boolean.parseBoolean( - properties.getProperty(CassandraStoreParameters.ALLOW_REMOTE_DCS_FOR_LOCAL_CONSISTENCY_LEVEL)); - if (dataCenter != null && !dataCenter.isEmpty()) { - if (allowRemoteDCsForLocalConsistencyLevel) { - builder = builder.withLoadBalancingPolicy( - DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter) - .allowRemoteDCsForLocalConsistencyLevel().build()); - } else { - builder = builder.withLoadBalancingPolicy( - DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter).build()); - } - } else { - if (allowRemoteDCsForLocalConsistencyLevel) { - builder = builder.withLoadBalancingPolicy( - (DCAwareRoundRobinPolicy.builder().allowRemoteDCsForLocalConsistencyLevel().build())); - } else { - builder = builder.withLoadBalancingPolicy((DCAwareRoundRobinPolicy.builder().build())); - } - } - break; - } - case "TokenAwareRoundRobinPolicy": - builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy(new RoundRobinPolicy())); - break; - case "TokenAwareDCAwareRoundRobinPolicy": { - String dataCenter = properties.getProperty(CassandraStoreParameters.DATA_CENTER); - boolean allowRemoteDCsForLocalConsistencyLevel = Boolean.parseBoolean( - properties.getProperty(CassandraStoreParameters.ALLOW_REMOTE_DCS_FOR_LOCAL_CONSISTENCY_LEVEL)); - if (dataCenter != null && !dataCenter.isEmpty()) { - if (allowRemoteDCsForLocalConsistencyLevel) { - builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy( - DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter) - .allowRemoteDCsForLocalConsistencyLevel().build())); - } else { - builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy( - DCAwareRoundRobinPolicy.builder().withLocalDc(dataCenter).build())); - } - } else { - if (allowRemoteDCsForLocalConsistencyLevel) { - builder = builder.withLoadBalancingPolicy(new TokenAwarePolicy( - DCAwareRoundRobinPolicy.builder().allowRemoteDCsForLocalConsistencyLevel().build())); - } else { - builder = builder.withLoadBalancingPolicy( - new TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().build())); - } - } - break; - } - default: - LOG.error("Unsupported Cassandra load balancing policy: {} ", loadBalancingProp); - break; - } - } - return builder; - } - - private Cluster.Builder populateCredentials(Properties properties, Cluster.Builder builder) { - String usernameProp = properties.getProperty(CassandraStoreParameters.USERNAME); - String passwordProp = properties.getProperty(CassandraStoreParameters.PASSWORD); - if (usernameProp != null) { - builder = builder.withCredentials(usernameProp, passwordProp); - } - return builder; - } - - private Cluster.Builder populatePoolingSettings(Properties properties, Cluster.Builder builder) { - String localCoreConnectionsPerHost = properties.getProperty(CassandraStoreParameters.LOCAL_CORE_CONNECTIONS_PER_HOST); - String remoteCoreConnectionsPerHost = properties.getProperty(CassandraStoreParameters.REMOTE_CORE_CONNECTIONS_PER_HOST); - String localMaxConnectionsPerHost = properties.getProperty(CassandraStoreParameters.LOCAL_MAX_CONNECTIONS_PER_HOST); - String remoteMaxConnectionsPerHost = properties.getProperty(CassandraStoreParameters.REMOTE_MAX_CONNECTIONS_PER_HOST); - String localNewConnectionThreshold = properties.getProperty(CassandraStoreParameters.LOCAL_NEW_CONNECTION_THRESHOLD); - String remoteNewConnectionThreshold = properties.getProperty(CassandraStoreParameters.REMOTE_NEW_CONNECTION_THRESHOLD); - String localMaxRequestsPerConnection = properties.getProperty(CassandraStoreParameters.LOCAL_MAX_REQUESTS_PER_CONNECTION); - String remoteMaxRequestsPerConnection = properties.getProperty(CassandraStoreParameters.REMOTE_MAX_REQUESTS_PER_CONNECTION); - PoolingOptions options = new PoolingOptions(); - if (localCoreConnectionsPerHost != null) { - options.setCoreConnectionsPerHost(HostDistance.LOCAL, Integer.parseInt(localCoreConnectionsPerHost)); - } - if (remoteCoreConnectionsPerHost != null) { - options.setCoreConnectionsPerHost(HostDistance.REMOTE, Integer.parseInt(remoteCoreConnectionsPerHost)); - } - if (localMaxConnectionsPerHost != null) { - options.setMaxConnectionsPerHost(HostDistance.LOCAL, Integer.parseInt(localMaxConnectionsPerHost)); - } - if (remoteMaxConnectionsPerHost != null) { - options.setMaxConnectionsPerHost(HostDistance.REMOTE, Integer.parseInt(remoteMaxConnectionsPerHost)); - } - if (localNewConnectionThreshold != null) { - options.setNewConnectionThreshold(HostDistance.LOCAL, Integer.parseInt(localNewConnectionThreshold)); - } - if (remoteNewConnectionThreshold != null) { - options.setNewConnectionThreshold(HostDistance.REMOTE, Integer.parseInt(remoteNewConnectionThreshold)); - } - if (localMaxRequestsPerConnection != null) { - options.setMaxRequestsPerConnection(HostDistance.LOCAL, Integer.parseInt(localMaxRequestsPerConnection)); - } - if (remoteMaxRequestsPerConnection != null) { - options.setMaxRequestsPerConnection(HostDistance.REMOTE, Integer.parseInt(remoteMaxRequestsPerConnection)); - } - builder = builder.withPoolingOptions(options); - return builder; - } - - private Cluster.Builder populateQueryOptions(Properties properties, Cluster.Builder builder) { - String consistencyLevelProp = properties.getProperty(CassandraStoreParameters.CONSISTENCY_LEVEL); - String serialConsistencyLevelProp = properties.getProperty(CassandraStoreParameters.SERIAL_CONSISTENCY_LEVEL); - String fetchSize = properties.getProperty(CassandraStoreParameters.FETCH_SIZE); - QueryOptions options = new QueryOptions(); - if (consistencyLevelProp != null) { - options.setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevelProp)); - } - if (serialConsistencyLevelProp != null) { - options.setSerialConsistencyLevel(ConsistencyLevel.valueOf(serialConsistencyLevelProp)); - } - if (fetchSize != null) { - options.setFetchSize(Integer.parseInt(fetchSize)); - } - return builder.withQueryOptions(options); - } - - private Cluster.Builder populateReconnectPolicy(Properties properties, Cluster.Builder builder) { - String reconnectionPolicy = properties.getProperty(CassandraStoreParameters.RECONNECTION_POLICY); - if (reconnectionPolicy != null) { - switch (reconnectionPolicy) { - case "ConstantReconnectionPolicy": { - String constantReconnectionPolicyDelay = properties.getProperty(CassandraStoreParameters.CONSTANT_RECONNECTION_POLICY_DELAY); - ConstantReconnectionPolicy policy = new ConstantReconnectionPolicy(Long.parseLong(constantReconnectionPolicyDelay)); - builder = builder.withReconnectionPolicy(policy); - break; - } - case "ExponentialReconnectionPolicy": { - String exponentialReconnectionPolicyBaseDelay = properties.getProperty(CassandraStoreParameters.EXPONENTIAL_RECONNECTION_POLICY_BASE_DELAY); - String exponentialReconnectionPolicyMaxDelay = properties.getProperty(CassandraStoreParameters.EXPONENTIAL_RECONNECTION_POLICY_MAX_DELAY); - - ExponentialReconnectionPolicy policy = new ExponentialReconnectionPolicy(Long.parseLong(exponentialReconnectionPolicyBaseDelay), - Long.parseLong(exponentialReconnectionPolicyMaxDelay)); - builder = builder.withReconnectionPolicy(policy); - break; - } - default: - LOG.error("Unsupported reconnection policy : {} ", reconnectionPolicy); - } - } - return builder; - } - - private Cluster.Builder populateRetrytPolicy(Properties properties, Cluster.Builder builder) { - String retryPolicy = properties.getProperty(CassandraStoreParameters.RETRY_POLICY); - if (retryPolicy != null) { - switch (retryPolicy) { - case "DefaultRetryPolicy": - builder = builder.withRetryPolicy(DefaultRetryPolicy.INSTANCE); - break; - case "DowngradingConsistencyRetryPolicy": - builder = builder.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE); - break; - case "FallthroughRetryPolicy": - builder = builder.withRetryPolicy(FallthroughRetryPolicy.INSTANCE); - break; - case "LoggingDefaultRetryPolicy": - builder = builder.withRetryPolicy(new LoggingRetryPolicy(DefaultRetryPolicy.INSTANCE)); - break; - case "LoggingDowngradingConsistencyRetryPolicy": - builder = builder.withRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)); - break; - case "LoggingFallthroughRetryPolicy": - builder = builder.withRetryPolicy(new LoggingRetryPolicy(FallthroughRetryPolicy.INSTANCE)); - break; - default: - LOG.error("Unsupported retry policy : {} ", retryPolicy); - break; - } - } - return builder; - } - - private Cluster.Builder populateSocketOptions(Properties properties, Cluster.Builder builder) { - String connectionTimeoutMillisProp = properties.getProperty(CassandraStoreParameters.CONNECTION_TIMEOUT_MILLIS); - String keepAliveProp = properties.getProperty(CassandraStoreParameters.KEEP_ALIVE); - String readTimeoutMillisProp = properties.getProperty(CassandraStoreParameters.READ_TIMEOUT_MILLIS); - String receiveBufferSizeProp = properties.getProperty(CassandraStoreParameters.RECEIVER_BUFFER_SIZE); - String reuseAddress = properties.getProperty(CassandraStoreParameters.REUSE_ADDRESS); - String sendBufferSize = properties.getProperty(CassandraStoreParameters.SEND_BUFFER_SIZE); - String soLinger = properties.getProperty(CassandraStoreParameters.SO_LINGER); - String tcpNoDelay = properties.getProperty(CassandraStoreParameters.TCP_NODELAY); - SocketOptions options = new SocketOptions(); - if (connectionTimeoutMillisProp != null) { - options.setConnectTimeoutMillis(Integer.parseInt(connectionTimeoutMillisProp)); - } - if (keepAliveProp != null) { - options.setKeepAlive(Boolean.parseBoolean(keepAliveProp)); - } - if (readTimeoutMillisProp != null) { - options.setReadTimeoutMillis(Integer.parseInt(readTimeoutMillisProp)); - } - if (receiveBufferSizeProp != null) { - options.setReceiveBufferSize(Integer.parseInt(receiveBufferSizeProp)); - } - if (reuseAddress != null) { - options.setReuseAddress(Boolean.parseBoolean(reuseAddress)); - } - if (sendBufferSize != null) { - options.setSendBufferSize(Integer.parseInt(sendBufferSize)); - } - if (soLinger != null) { - options.setSoLinger(Integer.parseInt(soLinger)); - } - if (tcpNoDelay != null) { - options.setTcpNoDelay(Boolean.parseBoolean(tcpNoDelay)); - } - return builder.withSocketOptions(options); - } - /** * {@inheritDoc} * <p> * This is a setter method to set the class of persistent objects. * * @param persistentClass class of persistent objects - * {@link org.apache.gora.cassandra.serializers.CassandraNativePersistent} + * {@link CassandraNativePersistent} * {@link org.apache.gora.persistency.Persistent} */ @Override @@ -638,9 +109,10 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> this.persistentClass = persistentClass; } + @SuppressWarnings("all") @Override public Class<T> getPersistentClass() { - return this.persistentClass; + return (Class<T>) this.persistentClass; } @Override @@ -650,20 +122,15 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> @Override public void createSchema() { - LOG.debug("creating Cassandra keyspace {}", mapping.getKeySpace().getName()); - this.session.execute(CassandraQueryFactory.getCreateKeySpaceQuery(mapping)); - LOG.debug("creating Cassandra column family / table {}", mapping.getCoreName()); - this.session.execute(CassandraQueryFactory.getCreateTableQuery(mapping)); + cassandraSerializer.createSchema(); } @Override public void deleteSchema() { - LOG.debug("dropping Cassandra table {}", mapping.getCoreName()); - this.session.execute(CassandraQueryFactory.getDropTableQuery(mapping)); - LOG.debug("dropping Cassandra keyspace {}", mapping.getKeySpace().getName()); - this.session.execute(CassandraQueryFactory.getDropKeySpaceQuery(mapping)); + cassandraSerializer.deleteSchema(); } + @SuppressWarnings("all") @Override public Class<K> getKeyClass() { return this.keyClass; @@ -677,16 +144,32 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> @Override public K newKey() { try { - return beanFactory.newKey(); + if (beanFactory != null) { + return beanFactory.newKey(); + } else { + LOG.warn("beanFactory is hasn't been initialized. It's recommended to initialize beanFactory."); + return keyClass.newInstance(); + } } catch (Exception ex) { LOG.error(ex.getMessage(), ex); return null; } } + @SuppressWarnings("all") @Override public T newPersistent() { - return this.beanFactory.newPersistent(); + try { + if (beanFactory != null) { + return this.beanFactory.newPersistent(); + } else { + LOG.warn("beanFactory is hasn't been initialized. It's recommended to initialize beanFactory."); + return persistentClass.newInstance(); + } + } catch (Exception ex) { + LOG.error(ex.getMessage(), ex); + return null; + } } @Override @@ -701,17 +184,12 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> @Override public void close() { - this.session.close(); - this.cluster.close(); + this.cassandraSerializer.close(); } @Override public T get(K key) { - if (isUseNativeSerialization) { - return mapper.get(key); - } else { - return null; - } + return (T) cassandraSerializer.get(key); } @Override @@ -721,21 +199,12 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> @Override public void put(K key, T obj) { - if (isUseNativeSerialization) { - mapper.save(obj); - } else { - - } + cassandraSerializer.put(key, obj); } @Override public boolean delete(K key) { - if (isUseNativeSerialization) { - mapper.delete(key); - return true; - } else { - return false; - } + return cassandraSerializer.delete(key); } @Override @@ -760,7 +229,7 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> @Override public void flush() { - + // ignore since caching has been disabled } @Override @@ -770,19 +239,12 @@ public class CassandraStore<K, T extends Persistent> implements DataStore<K, T> @Override public void truncateSchema() { - LOG.debug("truncating Cassandra table {}", mapping.getCoreName()); - this.session.execute(CassandraQueryFactory.getTruncateTableQuery(mapping)); + cassandraSerializer.truncateSchema(); } @Override public boolean schemaExists() { - KeyspaceMetadata keyspace = cluster.getMetadata().getKeyspace(mapping.getKeySpace().getName()); - if (keyspace != null) { - TableMetadata table = keyspace.getTable(mapping.getCoreName()); - return table != null; - } else { - return false; - } + return cassandraSerializer.schemaExists(); } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java index c48b1c0..95e1c0f 100644 --- a/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java +++ b/gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/store/CassandraStoreParameters.java @@ -218,7 +218,7 @@ public class CassandraStoreParameters { * Property pointing to use Native Cassandra Native Serialization. * boolean */ - public static final String USE_CASSANDRA_NATIVE_SERIALIZATION = "gora.cassandrastore.useCassandraNativeSerialization"; + public static final String CASSANDRA_SERIALIZATION_TYPE = "gora.cassandrastore.cassandraSerializationType"; /** * Property pointing to the custom codec file. * string http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/test/conf/avro/gora-cassandra-mapping.xml ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/test/conf/avro/gora-cassandra-mapping.xml b/gora-cassandra-cql/src/test/conf/avro/gora-cassandra-mapping.xml new file mode 100644 index 0000000..e99cf92 --- /dev/null +++ b/gora-cassandra-cql/src/test/conf/avro/gora-cassandra-mapping.xml @@ -0,0 +1,73 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- + The value of 'host' attribute of keyspace tag should match exactly what is in + gora.properties file. Essentially this means that if you are using port number, you should + use it every where regardless of whether it is the default port or not. + At runtime Gora will otherwise try to connect to localhost + https://issues.apache.org/jira/browse/GORA-269 + + The values of 'replication_factor' and 'placement_strategy' attribute of keyspace tag + only apply if gora create the kyespace. they have no effect if this is being used against + an existing keyspace. the default value for 'replication_factor' is '1' + + The value of 'placement_strategy' should be a fully qualifed class name that is known to + the cassansra cluster, not the application or gora. As of this writing, the classes that ship + with cassandra are: + 'org.apache.cassandra.locator.SimpleStrategy' + 'org.apache.cassandra.locator.NetworkTopologyStrategy' + gora cassandra would use SimpleStrategy by default if no value for this attribute is specified + + The default value of 'gc_grace_seconds' is '0' which is ONLY VIABLE FOR SINGLE NODE + CLUSTER. you should update this value according to your cluster configuration. + https://wiki.apache.org/cassandra/StorageConfiguration + + The value of 'ttl' (time to live) attribute of field tag should most likely always + be zero unless you want Cassandra to create Tombstones and delete portions of your + data once this period expires. Any positive value is read and bound to the number + of seconds after which the value for that field will disappear. The default value of ttl + is '0' + + More information on gora-cassandra configuration and mapping's can be found + at http://gora.apache.org/current/gora-cassandra.html +--> + +<gora-otd> + + <keyspace name="avroKeySpace" durableWrite="false"> + <placementStrategy name="SimpleStrategy" replication_factor="1"/> + </keyspace> + + <class name="org.apache.gora.examples.generated.WebPage" keyClass="java.lang.String" table="WebPage" keyspace="avroKeySpace"> + <field name="url" column="url" length="128" primarykey="true"/> + <field name="content" column="content" type="blob"/> + <field name="parsedContent" column="parsedContent" type="list"/> + <field name="outlinks" column="outlinks" type="map"/> + </class> + + <class name="org.apache.gora.examples.generated.Employee" keyClass="java.lang.String" keyspace="avroKeySpace" + table="Employee" compactStorage="true" > + <field name="name" column="name" type="text" ttl="10"/> + <field name="dateOfBirth" column="dob" type="bigint" ttl="10"/> + <field name="ssn" column="ssn" type="int" ttl="10" primarykey="true"/> + <field name="salary" column="salary" type="int" ttl="10" /> + </class> + +</gora-otd> http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/test/conf/nativeSerialization/gora-cassandra-mapping.xml ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/test/conf/nativeSerialization/gora-cassandra-mapping.xml b/gora-cassandra-cql/src/test/conf/nativeSerialization/gora-cassandra-mapping.xml index 07d6590..d3dde32 100644 --- a/gora-cassandra-cql/src/test/conf/nativeSerialization/gora-cassandra-mapping.xml +++ b/gora-cassandra-cql/src/test/conf/nativeSerialization/gora-cassandra-mapping.xml @@ -55,7 +55,7 @@ <placementStrategy name="SimpleStrategy" replication_factor="1"/> </keyspace> - <class name="org.apache.gora.cassandra.test.nativeSerialization.User" keyClass="java.util.UUID" keyspace="nativeTestKeySpace" + <class name="org.apache.gora.cassandra.example.generated.nativeSerialization.User" keyClass="java.util.UUID" keyspace="nativeTestKeySpace" table="Users" compactStorage="true" > <field name="userId" column="user_id" type="uuid" ttl="10" primarykey="true"/> <field name="name" column="name" type="text" ttl="10"/> http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/test/java/.gitignore ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/test/java/.gitignore b/gora-cassandra-cql/src/test/java/.gitignore deleted file mode 100644 index 09697dc..0000000 --- a/gora-cassandra-cql/src/test/java/.gitignore +++ /dev/null @@ -1,15 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java index 5bb1114..9ebcae5 100644 --- a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java +++ b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/store/TestCassandraStoreWithNativeSerialization.java @@ -1,7 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.gora.cassandra.store; import org.apache.gora.cassandra.GoraCassandraTestDriver; -import org.apache.gora.cassandra.test.nativeSerialization.User; +import org.apache.gora.cassandra.example.generated.nativeSerialization.User; import org.apache.gora.store.DataStore; import org.junit.After; import org.junit.AfterClass; @@ -34,7 +51,7 @@ public class TestCassandraStoreWithNativeSerialization { parameter = new Properties(); parameter.setProperty(CassandraStoreParameters.CASSANDRA_SERVERS, "localhost"); parameter.setProperty(CassandraStoreParameters.PORT, "9042"); - parameter.setProperty(CassandraStoreParameters.USE_CASSANDRA_NATIVE_SERIALIZATION, "true"); + parameter.setProperty(CassandraStoreParameters.CASSANDRA_SERIALIZATION_TYPE, "native"); parameter.setProperty(CassandraStoreParameters.PROTOCOL_VERSION, "3"); parameter.setProperty(CassandraStoreParameters.CLUSTER_NAME,"Test Cluster"); parameter.setProperty("gora.cassandrastore.mapping.file", "nativeSerialization/gora-cassandra-mapping.xml"); @@ -75,11 +92,11 @@ public class TestCassandraStoreWithNativeSerialization { // storing data; userDataStore.put(id, user1); // get data; - User olduser = userDataStore.get(id); + User olduser = userDataStore.get(user1.getUserId()); Assert.assertEquals(olduser.getName(), user1.getName()); Assert.assertEquals(olduser.getDateOfBirth(), user1.getDateOfBirth()); // delete data; - userDataStore.delete(id); + userDataStore.delete(user1.getUserId()); // get data User deletedUser = userDataStore.get(id); Assert.assertNull(deletedUser); http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/test/nativeSerialization/DateAsStringCodec.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/test/nativeSerialization/DateAsStringCodec.java b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/test/nativeSerialization/DateAsStringCodec.java index f6061cf..b5ffedb 100644 --- a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/test/nativeSerialization/DateAsStringCodec.java +++ b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/test/nativeSerialization/DateAsStringCodec.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.gora.cassandra.test.nativeSerialization; import com.datastax.driver.core.TypeCodec; http://git-wip-us.apache.org/repos/asf/gora/blob/5e383ef9/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/test/nativeSerialization/User.java ---------------------------------------------------------------------- diff --git a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/test/nativeSerialization/User.java b/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/test/nativeSerialization/User.java deleted file mode 100644 index 0f4c7ee..0000000 --- a/gora-cassandra-cql/src/test/java/org/apache/gora/cassandra/test/nativeSerialization/User.java +++ /dev/null @@ -1,66 +0,0 @@ -package org.apache.gora.cassandra.test.nativeSerialization; - -import com.datastax.driver.mapping.annotations.Column; -import com.datastax.driver.mapping.annotations.PartitionKey; -import com.datastax.driver.mapping.annotations.Table; -import com.datastax.driver.mapping.annotations.Transient; -import org.apache.gora.cassandra.serializers.CassandraNativePersistent; - -import java.util.Date; -import java.util.UUID; - -/** - * Created by madhawa on 6/23/17. - */ - -@Table(keyspace = "nativeTestKeySpace", name = "users", - readConsistency = "QUORUM", - writeConsistency = "QUORUM", - caseSensitiveKeyspace = false, - caseSensitiveTable = false) -public class User extends CassandraNativePersistent { - @PartitionKey - @Column(name = "user_id") - private UUID userId; - @Column(name = "name") - private String name; - @Column(name = "dob") - private Date dateOfBirth; - - @Transient - private boolean dirty; - - public User() { - - } - - public User(UUID userId, String name, Date dateOfBirth) { - this.userId = userId; - this.name = name; - this.dateOfBirth = dateOfBirth; - } - - public void setUserId(UUID userId) { - this.userId = userId; - } - - public void setName(String name) { - this.name = name; - } - - public void setDateOfBirth(Date dateOfBirth) { - this.dateOfBirth = dateOfBirth; - } - - public UUID getUserId() { - return userId; - } - - public String getName() { - return name; - } - - public Date getDateOfBirth() { - return dateOfBirth; - } -}