Add initial query model for gora-aerospike
Project: http://git-wip-us.apache.org/repos/asf/gora/repo Commit: http://git-wip-us.apache.org/repos/asf/gora/commit/2d667edb Tree: http://git-wip-us.apache.org/repos/asf/gora/tree/2d667edb Diff: http://git-wip-us.apache.org/repos/asf/gora/diff/2d667edb Branch: refs/heads/master Commit: 2d667edb08ac241e857c9922d05f13a00c91fdc3 Parents: ecd3c59 Author: nishadi <ndime...@gmail.com> Authored: Wed Jul 19 20:44:57 2017 +0530 Committer: nishadi <ndime...@gmail.com> Committed: Wed Jul 19 20:44:57 2017 +0530 ---------------------------------------------------------------------- .../gora/aerospike/query/AerospikeQuery.java | 33 +++++++ .../aerospike/query/AerospikeQueryResult.java | 80 ++++++++++++++++ .../aerospike/query/AerospikeResultRecord.java | 51 ++++++++++ .../gora/aerospike/store/AerospikeStore.java | 99 +++++++++++++++++++- .../src/test/conf/gora-aerospike-mapping.xml | 2 +- .../aerospike/store/TestAerospikeStore.java | 23 +---- 6 files changed, 262 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/gora/blob/2d667edb/gora-aerospike/src/main/java/org/apache/gora/aerospike/query/AerospikeQuery.java ---------------------------------------------------------------------- diff --git a/gora-aerospike/src/main/java/org/apache/gora/aerospike/query/AerospikeQuery.java b/gora-aerospike/src/main/java/org/apache/gora/aerospike/query/AerospikeQuery.java new file mode 100644 index 0000000..d87fe6a --- /dev/null +++ b/gora-aerospike/src/main/java/org/apache/gora/aerospike/query/AerospikeQuery.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.aerospike.query; + +import org.apache.gora.persistency.impl.PersistentBase; +import org.apache.gora.query.Query; +import org.apache.gora.query.impl.QueryBase; +import org.apache.gora.store.DataStore; + +/** + * Aerospike specific implementation of the {@link Query} interface. + */ +public class AerospikeQuery<K, T extends PersistentBase> extends QueryBase<K, T> { + public AerospikeQuery(DataStore<K, T> dataStore) { + super(dataStore); + } + +} http://git-wip-us.apache.org/repos/asf/gora/blob/2d667edb/gora-aerospike/src/main/java/org/apache/gora/aerospike/query/AerospikeQueryResult.java ---------------------------------------------------------------------- diff --git a/gora-aerospike/src/main/java/org/apache/gora/aerospike/query/AerospikeQueryResult.java b/gora-aerospike/src/main/java/org/apache/gora/aerospike/query/AerospikeQueryResult.java new file mode 100644 index 0000000..8858f34 --- /dev/null +++ b/gora-aerospike/src/main/java/org/apache/gora/aerospike/query/AerospikeQueryResult.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.aerospike.query; + +import com.aerospike.client.Record; +import com.aerospike.client.query.RecordSet; +import org.apache.gora.aerospike.store.AerospikeStore; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.query.Query; +import org.apache.gora.query.impl.ResultBase; +import org.apache.gora.store.DataStore; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Aerospike specific implementation of the {@link org.apache.gora.query.Result} + * interface. + */ +public class AerospikeQueryResult<K, T extends Persistent> extends ResultBase<K, T> { + + private List<AerospikeResultRecord> resultRecords; + + private String[] fields; + + public AerospikeQueryResult(DataStore<K, T> dataStore, Query<K, T> query, + List<AerospikeResultRecord> recordsList, String[] fields) { + super(dataStore, query); + this.resultRecords = recordsList; + this.fields = fields; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + //ToDo: to be implemented + return 0; + } + + /** + * Method to get the Aerospike specific data store + * + * @return the Aerospike data store instance + */ + @Override + public AerospikeStore getDataStore() { + return (AerospikeStore) super.getDataStore(); + } + + /** + * {@inheritDoc} + * + * @return true if more elements exist + * @throws IOException if we reach non-existent result + */ + @Override + protected boolean nextInner() throws IOException { + if (offset < 0 || offset > (resultRecords.size() - 1)) { + return false; + } + persistent = (T) getDataStore() + .createPersistentInstance(resultRecords.get((int) this.offset).getRecord(), fields); + return true; + } +} http://git-wip-us.apache.org/repos/asf/gora/blob/2d667edb/gora-aerospike/src/main/java/org/apache/gora/aerospike/query/AerospikeResultRecord.java ---------------------------------------------------------------------- diff --git a/gora-aerospike/src/main/java/org/apache/gora/aerospike/query/AerospikeResultRecord.java b/gora-aerospike/src/main/java/org/apache/gora/aerospike/query/AerospikeResultRecord.java new file mode 100644 index 0000000..07d4309 --- /dev/null +++ b/gora-aerospike/src/main/java/org/apache/gora/aerospike/query/AerospikeResultRecord.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gora.aerospike.query; + +import com.aerospike.client.Key; +import com.aerospike.client.Record; + +/** + * Class to hold Aerospike result records + */ +public class AerospikeResultRecord { + + private Key key; + private Record record; + + public AerospikeResultRecord(Key key, Record record) { + this.key = key; + this.record = record; + } + + public Key getKey() { + return key; + } + + public void setKey(Key key) { + this.key = key; + } + + public Record getRecord() { + return record; + } + + public void setRecord(Record record) { + this.record = record; + } +} http://git-wip-us.apache.org/repos/asf/gora/blob/2d667edb/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java ---------------------------------------------------------------------- diff --git a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java index 01fdd93..7fe7573 100644 --- a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java +++ b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeStore.java @@ -30,9 +30,14 @@ import com.aerospike.client.Bin; import com.aerospike.client.Record; import com.aerospike.client.AerospikeClient; import com.aerospike.client.policy.ClientPolicy; +import com.aerospike.client.query.RecordSet; +import com.aerospike.client.query.Statement; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.util.Utf8; +import org.apache.gora.aerospike.query.AerospikeQuery; +import org.apache.gora.aerospike.query.AerospikeQueryResult; +import org.apache.gora.aerospike.query.AerospikeResultRecord; import org.apache.gora.persistency.Persistent; import org.apache.gora.persistency.impl.DirtyListWrapper; import org.apache.gora.persistency.impl.DirtyMapWrapper; @@ -86,7 +91,11 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K policy.writePolicyDefault = aerospikeParameters.getAerospikeMapping().getWritePolicy(); policy.readPolicyDefault = aerospikeParameters.getAerospikeMapping().getReadPolicy(); - aerospikeClient = new AerospikeClient(aerospikeParameters.getHost(), + // 'SendKey' property is enabled by default as the key is needed in query execution + policy.readPolicyDefault.sendKey = true; + policy.writePolicyDefault.sendKey = true; + + aerospikeClient = new AerospikeClient(policy, aerospikeParameters.getHost(), aerospikeParameters.getPort()); aerospikeParameters.setServerSpecificParameters(aerospikeClient); aerospikeParameters.validateServerBinConfiguration(persistentClass.getFields()); @@ -207,14 +216,83 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K return 0; } + /** + * {@inheritDoc} + * + * @param query the query to execute. + * @return the query result + */ @Override public Result<K, T> execute(Query<K, T> query) { - return null; + + List<AerospikeResultRecord> resultRecords = new ArrayList<>(); + String namespace = aerospikeParameters.getAerospikeMapping().getNamespace(); + String set = aerospikeParameters.getAerospikeMapping().getSet(); + + // Query execution without any keys + if (query.getStartKey() == null && query.getEndKey() == null){ + + try (RecordSet recordSet = aerospikeClient.query(null, getStatement(namespace, set))) { + while (recordSet.next()) { + AerospikeResultRecord aerospikeRecord = new AerospikeResultRecord(recordSet.getKey(), + recordSet.getRecord()); + resultRecords.add(aerospikeRecord); + } + } + } + + // Query execution for single key + else if (query.getKey()!= null) { + Key key = getAerospikeKey(query.getKey()); + Record record = aerospikeClient.get(null, key); + if(record != null){ + resultRecords.add(new AerospikeResultRecord(key, record)); + } + } + + // Query execution for key ranges + // ToDo: Implement for other scenarios + else if (query.getStartKey() != null && query.getEndKey() != null) { +// Key startKey = null, endKey = null; +// if (query.getStartKey() != null) { +// startKey = getAerospikeKey(query.getStartKey()); +// } +// if (query.getEndKey() != null) { +// endKey = getAerospikeKey(query.getEndKey()); +// } + +// boolean isSpecifiedRange = false; +// try (RecordSet recordSet = aerospikeClient.query(null, getStatement(namespace, set))) { +// while (recordSet.next()) { +// Key key = recordSet.getKey(); +// Record record = recordSet.getRecord(); +// +// if(key.userKey == getAerospikeKey(query.getStartKey()).userKey){ +// isSpecifiedRange = true; +// } +// if(key.userKey == getAerospikeKey(query.getEndKey()).userKey){ +// isSpecifiedRange = false; +// } +// +// if(isSpecifiedRange){ +// AerospikeResultRecord aerospikeRecord = new AerospikeResultRecord(key,record); +// resultRecords.add(aerospikeRecord); +// } +// +// } + } + + return new AerospikeQueryResult<>(this, query, resultRecords, getFieldsToQuery(null)); } + /** + * {@inheritDoc} + * + * @return the new query corresponding to aerospike + */ @Override public Query<K, T> newQuery() { - return null; + return new AerospikeQuery<>(this); } @Override @@ -227,6 +305,19 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K } /** + * Method to create a statement + * + * @param namespace the namespace + * @param set the set + * @return the statement + */ + private Statement getStatement(String namespace, String set){ + Statement stmt = new Statement(); + stmt.setNamespace(namespace); + stmt.setSetName(set); + return stmt; + } + /** * Method to close aerospike client connections to database server nodes */ @Override @@ -309,7 +400,7 @@ public class AerospikeStore<K, T extends PersistentBase> extends DataStoreBase<K * @param fields fields * @return persistent object created */ - private T createPersistentInstance(Record record, String[] fields) { + public T createPersistentInstance(Record record, String[] fields) { T persistent = newPersistent(); for (String field : fields) { http://git-wip-us.apache.org/repos/asf/gora/blob/2d667edb/gora-aerospike/src/test/conf/gora-aerospike-mapping.xml ---------------------------------------------------------------------- diff --git a/gora-aerospike/src/test/conf/gora-aerospike-mapping.xml b/gora-aerospike/src/test/conf/gora-aerospike-mapping.xml index f3f6eee..d57edb0 100644 --- a/gora-aerospike/src/test/conf/gora-aerospike-mapping.xml +++ b/gora-aerospike/src/test/conf/gora-aerospike-mapping.xml @@ -30,7 +30,7 @@ </class> <class name="org.apache.gora.examples.generated.WebPage" keyClass="java.lang.String" - table="WebPage" namespace = "test"> + set="WebPage" namespace = "test"> <field name="url" bin="url"/> <field name="content" bin="content"/> <field name="parsedContent" bin="parsedContent"/> http://git-wip-us.apache.org/repos/asf/gora/blob/2d667edb/gora-aerospike/src/test/java/org/apache/gora/aerospike/store/TestAerospikeStore.java ---------------------------------------------------------------------- diff --git a/gora-aerospike/src/test/java/org/apache/gora/aerospike/store/TestAerospikeStore.java b/gora-aerospike/src/test/java/org/apache/gora/aerospike/store/TestAerospikeStore.java index 9e93fb0..317fb12 100644 --- a/gora-aerospike/src/test/java/org/apache/gora/aerospike/store/TestAerospikeStore.java +++ b/gora-aerospike/src/test/java/org/apache/gora/aerospike/store/TestAerospikeStore.java @@ -88,6 +88,8 @@ public class TestAerospikeStore extends DataStoreTestBase { @Ignore("Functionality is to be implemented in the next iteration") @Override public void testQuery() throws Exception { + //done + //ToDo: clear the data before executing super.testQuery(); } @@ -113,27 +115,6 @@ public class TestAerospikeStore extends DataStoreTestBase { } @Test - @Ignore("Functionality is to be implemented in the next iteration") - @Override - public void testQueryWebPageSingleKey() throws Exception { - super.testQueryWebPageSingleKey(); - } - - @Test - @Ignore("Functionality is to be implemented in the next iteration") - @Override - public void testQueryWebPageSingleKeyDefaultFields() throws Exception { - super.testQueryWebPageSingleKeyDefaultFields(); - } - - @Test - @Ignore("Functionality is to be implemented in the next iteration") - @Override - public void testQueryWebPageQueryEmptyResults() throws Exception { - super.testQueryWebPageQueryEmptyResults(); - } - - @Test @Ignore("Functionality is to be implemented in the next iteration as this incurs query execution") @Override public void testDelete() throws Exception {