http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoPCJIndexIT.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoPCJIndexIT.java b/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoPCJIndexIT.java new file mode 100644 index 0000000..1503b53 --- /dev/null +++ b/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoPCJIndexIT.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.mongo; + +import static org.junit.Assert.assertEquals; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.rya.api.RdfCloudTripleStoreConfiguration; +import org.apache.rya.api.RdfCloudTripleStoreConstants; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.mongo.MongoConnectionDetails; +import org.apache.rya.api.client.mongo.MongoRyaClientFactory; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.mongodb.MongoITBase; +import org.apache.rya.sail.config.RyaSailFactory; +import org.junit.Test; +import org.openrdf.model.Statement; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.TupleQuery; +import org.openrdf.query.TupleQueryResult; +import org.openrdf.query.impl.MapBindingSet; +import org.openrdf.repository.sail.SailRepository; +import org.openrdf.repository.sail.SailRepositoryConnection; +import org.openrdf.sail.Sail; + +public class MongoPCJIndexIT extends MongoITBase { + private static final ValueFactory VF = ValueFactoryImpl.getInstance(); + + @Override + protected void updateConfiguration(final MongoDBRdfConfiguration conf) { + conf.setBoolean(ConfigUtils.USE_MONGO, true); + conf.setBoolean(ConfigUtils.USE_PCJ, false); + } + + @Test + public void sparqlQuery_Test() throws Exception { + // Setup a Rya Client. + final MongoConnectionDetails connectionDetails = getConnectionDetails(); + final RyaClient ryaClient = MongoRyaClientFactory.build(connectionDetails, getMongoClient()); + final String pcjQuery = "SELECT ?name WHERE {" + + " ?name <urn:likes> <urn:icecream> ." + + " ?name <urn:hasEyeColor> <urn:blue> ." + + " }"; + + // Install an instance of Rya and load statements. + ryaClient.getInstall().install(conf.getRyaInstanceName(), InstallConfiguration.builder() + .setEnablePcjIndex(true) + .build()); + ryaClient.getLoadStatements().loadStatements(conf.getRyaInstanceName(), getStatements()); + final String pcjId = ryaClient.getCreatePCJ().createPCJ(conf.getRyaInstanceName(), pcjQuery); + ryaClient.getBatchUpdatePCJ().batchUpdate(conf.getRyaInstanceName(), pcjId); + + //purge contents of rya triples collection + getMongoClient().getDatabase(conf.getRyaInstanceName()).getCollection(conf.getTriplesCollectionName()).drop(); + + //run the query. since the triples collection is gone, if the results match, they came from the PCJ index. + conf.setBoolean(ConfigUtils.USE_PCJ, true); + conf.setBoolean(ConfigUtils.USE_OPTIMAL_PCJ, true); + conf.setBoolean(ConfigUtils.DISPLAY_QUERY_PLAN, true); + final Sail sail = RyaSailFactory.getInstance(conf); + SailRepositoryConnection conn = new SailRepository(sail).getConnection(); + conn.begin(); + final TupleQuery tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, pcjQuery); + tupleQuery.setBinding(RdfCloudTripleStoreConfiguration.CONF_QUERYPLAN_FLAG, RdfCloudTripleStoreConstants.VALUE_FACTORY.createLiteral(true)); + final TupleQueryResult rez = tupleQuery.evaluate(); + final Set<BindingSet> results = new HashSet<>(); + while(rez.hasNext()) { + final BindingSet bs = rez.next(); + results.add(bs); + } + + // Verify the correct results were loaded into the PCJ table. + final Set<BindingSet> expectedResults = new HashSet<>(); + + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("name", VF.createURI("urn:Alice")); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("name", VF.createURI("urn:Bob")); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("name", VF.createURI("urn:Charlie")); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("name", VF.createURI("urn:David")); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("name", VF.createURI("urn:Eve")); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("name", VF.createURI("urn:Frank")); + expectedResults.add(bs); + + assertEquals(6, results.size()); + assertEquals(expectedResults, results); + } + + @Test + public void sparqlQuery_Test_complex() throws Exception { + // Setup a Rya Client. + final MongoConnectionDetails connectionDetails = getConnectionDetails(); + final RyaClient ryaClient = MongoRyaClientFactory.build(connectionDetails, getMongoClient()); + final String pcjQuery = "SELECT ?name WHERE {" + + " ?name <urn:likes> <urn:icecream> ." + + " ?name <urn:hasEyeColor> <urn:blue> ." + + " }"; + + final String testQuery = + "SELECT ?name WHERE {" + + " ?name <urn:hasHairColor> <urn:brown> ." + + " ?name <urn:likes> <urn:icecream> ." + + " ?name <urn:hasEyeColor> <urn:blue> ." + + " }"; + + // Install an instance of Rya and load statements. + conf.setBoolean(ConfigUtils.USE_PCJ, true); + conf.setBoolean(ConfigUtils.USE_OPTIMAL_PCJ, true); + conf.setBoolean(ConfigUtils.DISPLAY_QUERY_PLAN, true); + ryaClient.getInstall().install(conf.getRyaInstanceName(), InstallConfiguration.builder() + .setEnablePcjIndex(true) + .build()); + ryaClient.getLoadStatements().loadStatements(conf.getRyaInstanceName(), getStatements()); + final String pcjId = ryaClient.getCreatePCJ().createPCJ(conf.getRyaInstanceName(), pcjQuery); + ryaClient.getBatchUpdatePCJ().batchUpdate(conf.getRyaInstanceName(), pcjId); + + System.out.println("Triples: " + getMongoClient().getDatabase(conf.getRyaInstanceName()).getCollection(conf.getTriplesCollectionName()).count()); + System.out.println("PCJS: " + getMongoClient().getDatabase(conf.getRyaInstanceName()).getCollection("pcjs").count()); + + //run the query. since the triples collection is gone, if the results match, they came from the PCJ index. + final Sail sail = RyaSailFactory.getInstance(conf); + SailRepositoryConnection conn = new SailRepository(sail).getConnection(); + conn.begin(); + final TupleQuery tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, testQuery); + tupleQuery.setBinding(RdfCloudTripleStoreConfiguration.CONF_QUERYPLAN_FLAG, RdfCloudTripleStoreConstants.VALUE_FACTORY.createLiteral(true)); + final TupleQueryResult rez = tupleQuery.evaluate(); + + final Set<BindingSet> results = new HashSet<>(); + while(rez.hasNext()) { + final BindingSet bs = rez.next(); + results.add(bs); + } + + // Verify the correct results were loaded into the PCJ table. + final Set<BindingSet> expectedResults = new HashSet<>(); + + MapBindingSet bs = new MapBindingSet(); + bs = new MapBindingSet(); + bs.addBinding("name", VF.createURI("urn:David")); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("name", VF.createURI("urn:Eve")); + expectedResults.add(bs); + + bs = new MapBindingSet(); + bs.addBinding("name", VF.createURI("urn:Frank")); + expectedResults.add(bs); + + assertEquals(3, results.size()); + assertEquals(expectedResults, results); + } + + private MongoConnectionDetails getConnectionDetails() { + final java.util.Optional<char[]> password = conf.getMongoPassword() != null ? + java.util.Optional.of(conf.getMongoPassword().toCharArray()) : + java.util.Optional.empty(); + + return new MongoConnectionDetails( + conf.getMongoHostname(), + Integer.parseInt(conf.getMongoPort()), + java.util.Optional.ofNullable(conf.getMongoUser()), + password); + } + + private Set<Statement> getStatements() throws Exception { + final Set<Statement> statements = new HashSet<>(); + statements.add(VF.createStatement(VF.createURI("urn:Alice"), VF.createURI("urn:likes"), VF.createURI("urn:icecream"))); + statements.add(VF.createStatement(VF.createURI("urn:Bob"), VF.createURI("urn:likes"), VF.createURI("urn:icecream"))); + statements.add(VF.createStatement(VF.createURI("urn:Charlie"), VF.createURI("urn:likes"), VF.createURI("urn:icecream"))); + statements.add(VF.createStatement(VF.createURI("urn:David"), VF.createURI("urn:likes"), VF.createURI("urn:icecream"))); + statements.add(VF.createStatement(VF.createURI("urn:Eve"), VF.createURI("urn:likes"), VF.createURI("urn:icecream"))); + statements.add(VF.createStatement(VF.createURI("urn:Frank"), VF.createURI("urn:likes"), VF.createURI("urn:icecream"))); + statements.add(VF.createStatement(VF.createURI("urn:George"), VF.createURI("urn:likes"), VF.createURI("urn:icecream"))); + statements.add(VF.createStatement(VF.createURI("urn:Hillary"), VF.createURI("urn:likes"), VF.createURI("urn:icecream"))); + + statements.add(VF.createStatement(VF.createURI("urn:Alice"), VF.createURI("urn:hasEyeColor"), VF.createURI("urn:blue"))); + statements.add(VF.createStatement(VF.createURI("urn:Bob"), VF.createURI("urn:hasEyeColor"), VF.createURI("urn:blue"))); + statements.add(VF.createStatement(VF.createURI("urn:Charlie"), VF.createURI("urn:hasEyeColor"), VF.createURI("urn:blue"))); + statements.add(VF.createStatement(VF.createURI("urn:David"), VF.createURI("urn:hasEyeColor"), VF.createURI("urn:blue"))); + statements.add(VF.createStatement(VF.createURI("urn:Eve"), VF.createURI("urn:hasEyeColor"), VF.createURI("urn:blue"))); + statements.add(VF.createStatement(VF.createURI("urn:Frank"), VF.createURI("urn:hasEyeColor"), VF.createURI("urn:blue"))); + statements.add(VF.createStatement(VF.createURI("urn:George"), VF.createURI("urn:hasEyeColor"), VF.createURI("urn:green"))); + statements.add(VF.createStatement(VF.createURI("urn:Hillary"), VF.createURI("urn:hasEyeColor"), VF.createURI("urn:brown"))); + + statements.add(VF.createStatement(VF.createURI("urn:Alice"), VF.createURI("urn:hasHairColor"), VF.createURI("urn:blue"))); + statements.add(VF.createStatement(VF.createURI("urn:Bob"), VF.createURI("urn:hasHairColor"), VF.createURI("urn:blue"))); + statements.add(VF.createStatement(VF.createURI("urn:Charlie"), VF.createURI("urn:hasHairColor"), VF.createURI("urn:blue"))); + statements.add(VF.createStatement(VF.createURI("urn:David"), VF.createURI("urn:hasHairColor"), VF.createURI("urn:brown"))); + statements.add(VF.createStatement(VF.createURI("urn:Eve"), VF.createURI("urn:hasHairColor"), VF.createURI("urn:brown"))); + statements.add(VF.createStatement(VF.createURI("urn:Frank"), VF.createURI("urn:hasHairColor"), VF.createURI("urn:brown"))); + statements.add(VF.createStatement(VF.createURI("urn:George"), VF.createURI("urn:hasHairColor"), VF.createURI("urn:blonde"))); + statements.add(VF.createStatement(VF.createURI("urn:Hillary"), VF.createURI("urn:hasHairColor"), VF.createURI("urn:blonde"))); + return statements; + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoPcjIntegrationTest.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoPcjIntegrationTest.java b/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoPcjIntegrationTest.java new file mode 100644 index 0000000..af81cf6 --- /dev/null +++ b/extras/indexing/src/test/java/org/apache/rya/indexing/mongo/MongoPcjIntegrationTest.java @@ -0,0 +1,374 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.mongo; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.rya.indexing.IndexPlanValidator.IndexPlanValidator; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.external.PcjIntegrationTestingUtil; +import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig; +import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType; +import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType; +import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet; +import org.apache.rya.indexing.mongodb.pcj.MongoPcjIndexSetProvider; +import org.apache.rya.indexing.mongodb.pcj.MongoPcjQueryNode; +import org.apache.rya.indexing.pcj.matching.PCJOptimizer; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.mongodb.MongoITBase; +import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration; +import org.apache.rya.sail.config.RyaSailFactory; +import org.junit.Test; +import org.openrdf.model.URI; +import org.openrdf.model.impl.LiteralImpl; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.model.vocabulary.RDF; +import org.openrdf.model.vocabulary.RDFS; +import org.openrdf.query.BindingSet; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.QueryResultHandlerException; +import org.openrdf.query.TupleQueryResultHandler; +import org.openrdf.query.TupleQueryResultHandlerException; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; +import org.openrdf.repository.sail.SailRepository; +import org.openrdf.repository.sail.SailRepositoryConnection; +import org.openrdf.sail.Sail; + +import com.google.common.collect.Lists; + +public class MongoPcjIntegrationTest extends MongoITBase { + private static final URI talksTo = new URIImpl("uri:talksTo"); + private static final URI sub = new URIImpl("uri:entity"); + private static final URI sub2 = new URIImpl("uri:entity2"); + private static final URI subclass = new URIImpl("uri:class"); + private static final URI subclass2 = new URIImpl("uri:class2"); + private static final URI obj = new URIImpl("uri:obj"); + private static final URI obj2 = new URIImpl("uri:obj2"); + + private void addPCJS(final SailRepositoryConnection conn) throws Exception { + conn.add(sub, RDF.TYPE, subclass); + conn.add(sub, RDFS.LABEL, new LiteralImpl("label")); + conn.add(sub, talksTo, obj); + + conn.add(sub2, RDF.TYPE, subclass2); + conn.add(sub2, RDFS.LABEL, new LiteralImpl("label2")); + conn.add(sub2, talksTo, obj2); + } + + @Override + protected void updateConfiguration(final MongoDBRdfConfiguration conf) { + conf.set(PrecomputedJoinIndexerConfig.PCJ_STORAGE_TYPE, PrecomputedJoinStorageType.MONGO.name()); + conf.set(PrecomputedJoinIndexerConfig.PCJ_UPDATER_TYPE, PrecomputedJoinUpdaterType.NO_UPDATE.name()); + } + + @Test + public void testEvaluateSingleIndex() throws Exception { + final Sail nonPcjSail = RyaSailFactory.getInstance(conf); + final MongoDBRdfConfiguration pcjConf = conf.clone(); + pcjConf.setBoolean(ConfigUtils.USE_PCJ, true); + final Sail pcjSail = RyaSailFactory.getInstance(pcjConf); + final SailRepositoryConnection conn = new SailRepository(nonPcjSail).getConnection(); + final SailRepositoryConnection pcjConn = new SailRepository(pcjSail).getConnection(); + addPCJS(pcjConn); + try { + final String indexSparqlString = ""// + + "SELECT ?e ?l ?c " // + + "{" // + + " ?e a ?c . "// + + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l "// + + "}";// + + PcjIntegrationTestingUtil.createAndPopulatePcj(conn, getMongoClient(), conf.getMongoDBName() + 1, conf.getRyaInstanceName(), indexSparqlString); + + final String queryString = ""// + + "SELECT ?e ?c ?l ?o " // + + "{" // + + " ?e a ?c . "// + + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l . "// + + " ?e <uri:talksTo> ?o . "// + + "}";// + + final CountingResultHandler crh1 = new CountingResultHandler(); + final CountingResultHandler crh2 = new CountingResultHandler(); + + conn.prepareTupleQuery(QueryLanguage.SPARQL, queryString).evaluate(crh1); + pcjConn.prepareTupleQuery(QueryLanguage.SPARQL, queryString).evaluate(crh2); + + assertEquals(crh1.getCount(), crh2.getCount()); + } finally { + conn.close(); + pcjConn.close(); + nonPcjSail.shutDown(); + pcjSail.shutDown(); + } + } + + @Test + public void testEvaluateOneIndex() throws Exception { + final Sail nonPcjSail = RyaSailFactory.getInstance(conf); + final MongoDBRdfConfiguration pcjConf = conf.clone(); + pcjConf.setBoolean(ConfigUtils.USE_PCJ, true); + final Sail pcjSail = RyaSailFactory.getInstance(pcjConf); + final SailRepositoryConnection conn = new SailRepository(nonPcjSail).getConnection(); + final SailRepositoryConnection pcjConn = new SailRepository(pcjSail).getConnection(); + addPCJS(pcjConn); + try { + final URI superclass = new URIImpl("uri:superclass"); + final URI superclass2 = new URIImpl("uri:superclass2"); + + conn.add(subclass, RDF.TYPE, superclass); + conn.add(subclass2, RDF.TYPE, superclass2); + conn.add(obj, RDFS.LABEL, new LiteralImpl("label")); + conn.add(obj2, RDFS.LABEL, new LiteralImpl("label2")); + + final String indexSparqlString = ""// + + "SELECT ?dog ?pig ?duck " // + + "{" // + + " ?pig a ?dog . "// + + " ?pig <http://www.w3.org/2000/01/rdf-schema#label> ?duck "// + + "}";// + + final CountingResultHandler crh1 = new CountingResultHandler(); + final CountingResultHandler crh2 = new CountingResultHandler(); + + PcjIntegrationTestingUtil.createAndPopulatePcj(conn, getMongoClient(), conf.getMongoDBName() + 1, conf.getRyaInstanceName(), indexSparqlString); + + conn.prepareTupleQuery(QueryLanguage.SPARQL, indexSparqlString).evaluate(crh1); + PcjIntegrationTestingUtil.deleteCoreRyaTables(getMongoClient(), conf.getRyaInstanceName(), conf.getTriplesCollectionName()); + pcjConn.prepareTupleQuery(QueryLanguage.SPARQL, indexSparqlString).evaluate(crh2); + + assertEquals(crh1.count, crh2.count); + } finally { + conn.close(); + pcjConn.close(); + nonPcjSail.shutDown(); + pcjSail.shutDown(); + } + } + + @Test + public void testEvaluateTwoIndexValidate() throws Exception { + final Sail nonPcjSail = RyaSailFactory.getInstance(conf); + final MongoDBRdfConfiguration pcjConf = conf.clone(); + pcjConf.setBoolean(ConfigUtils.USE_PCJ, true); + final Sail pcjSail = RyaSailFactory.getInstance(pcjConf); + final SailRepositoryConnection conn = new SailRepository(nonPcjSail).getConnection(); + final SailRepositoryConnection pcjConn = new SailRepository(pcjSail).getConnection(); + addPCJS(pcjConn); + try { + final URI superclass = new URIImpl("uri:superclass"); + final URI superclass2 = new URIImpl("uri:superclass2"); + + conn.add(subclass, RDF.TYPE, superclass); + conn.add(subclass2, RDF.TYPE, superclass2); + conn.add(obj, RDFS.LABEL, new LiteralImpl("label")); + conn.add(obj2, RDFS.LABEL, new LiteralImpl("label2")); + + final String indexSparqlString = ""// + + "SELECT ?dog ?pig ?duck " // + + "{" // + + " ?pig a ?dog . "// + + " ?pig <http://www.w3.org/2000/01/rdf-schema#label> ?duck "// + + "}";// + + final String indexSparqlString2 = ""// + + "SELECT ?o ?f ?e ?c ?l " // + + "{" // + + " ?e <uri:talksTo> ?o . "// + + " ?o <http://www.w3.org/2000/01/rdf-schema#label> ?l. "// + + " ?c a ?f . " // + + "}";// + + final String queryString = ""// + + "SELECT ?e ?c ?l ?f ?o " // + + "{" // + + " ?e a ?c . "// + + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l. "// + + " ?e <uri:talksTo> ?o . "// + + " ?o <http://www.w3.org/2000/01/rdf-schema#label> ?l. "// + + " ?c a ?f . " // + + "}";// + + PcjIntegrationTestingUtil.createAndPopulatePcj(conn, getMongoClient(), conf.getMongoDBName() + 1, conf.getRyaInstanceName(), indexSparqlString); + final MongoPcjQueryNode ais1 = new MongoPcjQueryNode(conf, conf.getMongoDBName() + 1); + + PcjIntegrationTestingUtil.createAndPopulatePcj(conn, getMongoClient(), conf.getMongoDBName() + 2, conf.getRyaInstanceName(), indexSparqlString2); + final MongoPcjQueryNode ais2 = new MongoPcjQueryNode(conf, conf.getMongoDBName() + 2); + + final List<ExternalTupleSet> index = new ArrayList<>(); + index.add(ais1); + index.add(ais2); + + ParsedQuery pq = null; + final SPARQLParser sp = new SPARQLParser(); + pq = sp.parseQuery(queryString, null); + final List<TupleExpr> teList = Lists.newArrayList(); + final TupleExpr te = pq.getTupleExpr(); + + final PCJOptimizer pcj = new PCJOptimizer(index, false, new MongoPcjIndexSetProvider(new StatefulMongoDBRdfConfiguration(conf, getMongoClient()))); + pcj.optimize(te, null, null); + teList.add(te); + + final IndexPlanValidator ipv = new IndexPlanValidator(false); + + assertTrue(ipv.isValid(te)); + } finally { + conn.close(); + pcjConn.close(); + nonPcjSail.shutDown(); + pcjSail.shutDown(); + } + } + + @Test + public void testEvaluateThreeIndexValidate() throws Exception { + final Sail nonPcjSail = RyaSailFactory.getInstance(conf); + final MongoDBRdfConfiguration pcjConf = conf.clone(); + pcjConf.setBoolean(ConfigUtils.USE_PCJ, true); + final Sail pcjSail = RyaSailFactory.getInstance(pcjConf); + final SailRepositoryConnection conn = new SailRepository(nonPcjSail).getConnection(); + final SailRepositoryConnection pcjConn = new SailRepository(pcjSail).getConnection(); + addPCJS(pcjConn); + try { + final URI superclass = new URIImpl("uri:superclass"); + final URI superclass2 = new URIImpl("uri:superclass2"); + + final URI howlsAt = new URIImpl("uri:howlsAt"); + final URI subType = new URIImpl("uri:subType"); + final URI superSuperclass = new URIImpl("uri:super_superclass"); + + conn.add(subclass, RDF.TYPE, superclass); + conn.add(subclass2, RDF.TYPE, superclass2); + conn.add(obj, RDFS.LABEL, new LiteralImpl("label")); + conn.add(obj2, RDFS.LABEL, new LiteralImpl("label2")); + conn.add(sub, howlsAt, superclass); + conn.add(superclass, subType, superSuperclass); + + final String indexSparqlString = ""// + + "SELECT ?dog ?pig ?duck " // + + "{" // + + " ?pig a ?dog . "// + + " ?pig <http://www.w3.org/2000/01/rdf-schema#label> ?duck "// + + "}";// + + final String indexSparqlString2 = ""// + + "SELECT ?o ?f ?e ?c ?l " // + + "{" // + + " ?e <uri:talksTo> ?o . "// + + " ?o <http://www.w3.org/2000/01/rdf-schema#label> ?l. "// + + " ?c a ?f . " // + + "}";// + + final String indexSparqlString3 = ""// + + "SELECT ?wolf ?sheep ?chicken " // + + "{" // + + " ?wolf <uri:howlsAt> ?sheep . "// + + " ?sheep <uri:subType> ?chicken. "// + + "}";// + + final String queryString = ""// + + "SELECT ?e ?c ?l ?f ?o " // + + "{" // + + " ?e a ?c . "// + + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l. "// + + " ?e <uri:talksTo> ?o . "// + + " ?o <http://www.w3.org/2000/01/rdf-schema#label> ?l. "// + + " ?c a ?f . " // + + " ?e <uri:howlsAt> ?f. "// + + " ?f <uri:subType> ?o. "// + + "}";// + + PcjIntegrationTestingUtil.createAndPopulatePcj(conn, getMongoClient(), conf.getMongoDBName() + 1, conf.getRyaInstanceName(), indexSparqlString); + final MongoPcjQueryNode ais1 = new MongoPcjQueryNode(conf, conf.getMongoDBName() + 1); + + PcjIntegrationTestingUtil.createAndPopulatePcj(conn, getMongoClient(), conf.getMongoDBName() + 2, conf.getRyaInstanceName(), indexSparqlString2); + final MongoPcjQueryNode ais2 = new MongoPcjQueryNode(conf, conf.getMongoDBName() + 2); + + PcjIntegrationTestingUtil.createAndPopulatePcj(conn, getMongoClient(), conf.getMongoDBName() + 3, conf.getRyaInstanceName(), indexSparqlString3); + final MongoPcjQueryNode ais3 = new MongoPcjQueryNode(conf, conf.getMongoDBName() + 3); + + final List<ExternalTupleSet> index = new ArrayList<>(); + index.add(ais1); + index.add(ais3); + index.add(ais2); + + ParsedQuery pq = null; + final SPARQLParser sp = new SPARQLParser(); + pq = sp.parseQuery(queryString, null); + final List<TupleExpr> teList = Lists.newArrayList(); + final TupleExpr te = pq.getTupleExpr(); + + final PCJOptimizer pcj = new PCJOptimizer(index, false, new MongoPcjIndexSetProvider(new StatefulMongoDBRdfConfiguration(conf, getMongoClient()))); + pcj.optimize(te, null, null); + + teList.add(te); + + final IndexPlanValidator ipv = new IndexPlanValidator(false); + + assertTrue(ipv.isValid(te)); + } finally { + conn.close(); + pcjConn.close(); + nonPcjSail.shutDown(); + pcjSail.shutDown(); + } + } + + public static class CountingResultHandler implements TupleQueryResultHandler { + private int count = 0; + + public int getCount() { + return count; + } + + public void resetCount() { + count = 0; + } + + @Override + public void startQueryResult(final List<String> arg0) throws TupleQueryResultHandlerException { + } + + @Override + public void handleSolution(final BindingSet arg0) throws TupleQueryResultHandlerException { + count++; + System.out.println(arg0); + } + + @Override + public void endQueryResult() throws TupleQueryResultHandlerException { + } + + @Override + public void handleBoolean(final boolean arg0) throws QueryResultHandlerException { + + } + + @Override + public void handleLinks(final List<String> arg0) throws QueryResultHandlerException { + + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/indexing/src/test/java/org/apache/rya/indexing/pcj/matching/PCJOptimizerTest.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/org/apache/rya/indexing/pcj/matching/PCJOptimizerTest.java b/extras/indexing/src/test/java/org/apache/rya/indexing/pcj/matching/PCJOptimizerTest.java index 31f4e7b..d28d826 100644 --- a/extras/indexing/src/test/java/org/apache/rya/indexing/pcj/matching/PCJOptimizerTest.java +++ b/extras/indexing/src/test/java/org/apache/rya/indexing/pcj/matching/PCJOptimizerTest.java @@ -19,17 +19,24 @@ package org.apache.rya.indexing.pcj.matching; * under the License. */ import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.hadoop.conf.Configuration; import org.apache.rya.indexing.IndexPlanValidator.IndexedExecutionPlanGenerator; import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet; import org.apache.rya.indexing.external.tupleSet.SimpleExternalTupleSet; - +import org.apache.rya.indexing.mongodb.pcj.MongoPcjIndexSetProvider; +import org.apache.rya.indexing.pcj.matching.provider.AbstractPcjIndexSetProvider; +import org.apache.rya.indexing.pcj.matching.provider.AccumuloIndexSetProvider; +import org.apache.rya.mongodb.EmbeddedMongoSingleton; +import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration; import org.junit.Assert; import org.junit.Test; -import org.openrdf.query.MalformedQueryException; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.openrdf.query.algebra.Projection; import org.openrdf.query.algebra.QueryModelNode; import org.openrdf.query.algebra.StatementPattern; @@ -38,14 +45,30 @@ import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; import org.openrdf.query.parser.ParsedQuery; import org.openrdf.query.parser.sparql.SPARQLParser; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; +@RunWith(Parameterized.class) public class PCJOptimizerTest { + private final AbstractPcjIndexSetProvider provider; + + @Parameterized.Parameters + public static Collection providers() throws Exception { + final StatefulMongoDBRdfConfiguration conf = new StatefulMongoDBRdfConfiguration(new Configuration(), EmbeddedMongoSingleton.getNewMongoClient()); + return Lists.<AbstractPcjIndexSetProvider> newArrayList( + new AccumuloIndexSetProvider(new Configuration()), + new MongoPcjIndexSetProvider(conf) + ); + } + + public PCJOptimizerTest(final AbstractPcjIndexSetProvider provider) { + this.provider = provider; + } @Test public void testBasicSegment() throws Exception { - String query1 = ""// + final String query1 = ""// + "SELECT ?e ?c ?l" // + "{" // + " ?e a ?c . "// @@ -53,29 +76,30 @@ public class PCJOptimizerTest { + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l "// + "}";// - String query2 = ""// + final String query2 = ""// + "SELECT ?a ?b ?m" // + "{" // + " ?a a ?b . "// + " OPTIONAL {?a <uri:talksTo> ?m} . "// + "}";// - SPARQLParser parser = new SPARQLParser(); - ParsedQuery pq1 = parser.parseQuery(query1, null); - ParsedQuery pq2 = parser.parseQuery(query2, null); - TupleExpr te1 = pq1.getTupleExpr(); - TupleExpr te2 = pq2.getTupleExpr(); + final SPARQLParser parser = new SPARQLParser(); + final ParsedQuery pq1 = parser.parseQuery(query1, null); + final ParsedQuery pq2 = parser.parseQuery(query2, null); + final TupleExpr te1 = pq1.getTupleExpr(); + final TupleExpr te2 = pq2.getTupleExpr(); - TupleExpr unOpt = te1.clone(); - List<QueryModelNode> remainingNodes = getNodes(te1); - Set<QueryModelNode> unMatchedNodes = new HashSet<>(); + final TupleExpr unOpt = te1.clone(); + final List<QueryModelNode> remainingNodes = getNodes(te1); + final Set<QueryModelNode> unMatchedNodes = new HashSet<>(); unMatchedNodes.add(remainingNodes.get(2)); - SimpleExternalTupleSet pcj = new SimpleExternalTupleSet((Projection) te2); - List<ExternalTupleSet> externalList = new ArrayList<>(); + final SimpleExternalTupleSet pcj = new SimpleExternalTupleSet((Projection) te2); + final List<ExternalTupleSet> externalList = new ArrayList<>(); externalList.add(pcj); - PCJOptimizer optimizer = new PCJOptimizer(externalList, false); + provider.setIndices(externalList); + final PCJOptimizer optimizer = new PCJOptimizer(externalList, false, provider); optimizer.optimize(te1, null, null); Assert.assertEquals(true, validatePcj(te1, unOpt, externalList, unMatchedNodes)); @@ -86,7 +110,7 @@ public class PCJOptimizerTest { @Test public void testSegmentWithUnion() throws Exception { - String query1 = ""// + final String query1 = ""// + "SELECT ?e ?c ?l" // + "{" // + " {?e <uri:p1> <uri:o1>. } UNION { ?e a ?c. OPTIONAL {?e <uri:talksTo> ?l}. ?e <uri:p5> <uri:o4>. ?e <uri:p4> <uri:o3> } . "// @@ -94,7 +118,7 @@ public class PCJOptimizerTest { + " ?e <uri:p3> <uri:o2> . "// + "}";// - String query2 = ""// + final String query2 = ""// + "SELECT ?a ?b ?m" // + "{" // + " ?a <uri:p5> <uri:o4> ." // @@ -103,33 +127,34 @@ public class PCJOptimizerTest { + " ?a a ?b . "// + "}";// - String query3 = ""// + final String query3 = ""// + "SELECT ?h ?i" // + "{" // + " ?h <uri:p2> ?i . "// + " ?h <uri:p3> <uri:o2> . "// + "}";// - SPARQLParser parser = new SPARQLParser(); - ParsedQuery pq1 = parser.parseQuery(query1, null); - ParsedQuery pq2 = parser.parseQuery(query2, null); - ParsedQuery pq3 = parser.parseQuery(query3, null); - TupleExpr te1 = pq1.getTupleExpr(); - TupleExpr te2 = pq2.getTupleExpr(); - TupleExpr te3 = pq3.getTupleExpr(); - - TupleExpr unOpt = te1.clone(); - List<QueryModelNode> remainingNodes = getNodes(te1); - Set<QueryModelNode> unMatchedNodes = new HashSet<>(); + final SPARQLParser parser = new SPARQLParser(); + final ParsedQuery pq1 = parser.parseQuery(query1, null); + final ParsedQuery pq2 = parser.parseQuery(query2, null); + final ParsedQuery pq3 = parser.parseQuery(query3, null); + final TupleExpr te1 = pq1.getTupleExpr(); + final TupleExpr te2 = pq2.getTupleExpr(); + final TupleExpr te3 = pq3.getTupleExpr(); + + final TupleExpr unOpt = te1.clone(); + final List<QueryModelNode> remainingNodes = getNodes(te1); + final Set<QueryModelNode> unMatchedNodes = new HashSet<>(); unMatchedNodes.add(remainingNodes.get(0)); - SimpleExternalTupleSet pcj1 = new SimpleExternalTupleSet((Projection) te2); - SimpleExternalTupleSet pcj2 = new SimpleExternalTupleSet((Projection) te3); - List<ExternalTupleSet> externalList = new ArrayList<>(); + final SimpleExternalTupleSet pcj1 = new SimpleExternalTupleSet((Projection) te2); + final SimpleExternalTupleSet pcj2 = new SimpleExternalTupleSet((Projection) te3); + final List<ExternalTupleSet> externalList = new ArrayList<>(); externalList.add(pcj1); externalList.add(pcj2); - PCJOptimizer optimizer = new PCJOptimizer(externalList, false); + provider.setIndices(externalList); + final PCJOptimizer optimizer = new PCJOptimizer(externalList, false, provider); optimizer.optimize(te1, null, null); Assert.assertEquals(true, validatePcj(te1, unOpt, externalList, unMatchedNodes)); @@ -142,7 +167,7 @@ public class PCJOptimizerTest { public void testExactMatchLargeReOrdered() throws Exception { - String query1 = ""// + final String query1 = ""// + "SELECT ?a ?b ?c ?d ?e ?f ?g ?h" // + "{" // + " ?a <uri:p0> ?b ." // @@ -155,7 +180,7 @@ public class PCJOptimizerTest { + " OPTIONAL{?b <uri:p4> ?o. ?o <uri:p1> ?p} . "// + "}";// - String query2 = ""// + final String query2 = ""// + "SELECT ?a ?b ?c ?d ?e ?f ?g ?h" // + "{" // + " ?a <uri:p0> ?b ." // @@ -168,20 +193,21 @@ public class PCJOptimizerTest { + " OPTIONAL{?b <uri:p3> ?e. ?e <uri:p1> ?f} . "// + "}";// - SPARQLParser parser = new SPARQLParser(); - ParsedQuery pq1 = parser.parseQuery(query1, null); - ParsedQuery pq2 = parser.parseQuery(query2, null); - TupleExpr te1 = pq1.getTupleExpr(); - TupleExpr te2 = pq2.getTupleExpr(); + final SPARQLParser parser = new SPARQLParser(); + final ParsedQuery pq1 = parser.parseQuery(query1, null); + final ParsedQuery pq2 = parser.parseQuery(query2, null); + final TupleExpr te1 = pq1.getTupleExpr(); + final TupleExpr te2 = pq2.getTupleExpr(); - TupleExpr unOpt = te1.clone(); + final TupleExpr unOpt = te1.clone(); - SimpleExternalTupleSet pcj = new SimpleExternalTupleSet((Projection) te2); - List<ExternalTupleSet> externalList = new ArrayList<>(); + final SimpleExternalTupleSet pcj = new SimpleExternalTupleSet((Projection) te2); + final List<ExternalTupleSet> externalList = new ArrayList<>(); externalList.add(pcj); - PCJOptimizer optimizer = new PCJOptimizer(externalList, false); + provider.setIndices(externalList); + final PCJOptimizer optimizer = new PCJOptimizer(externalList, false, provider); optimizer.optimize(te1, null, null); Assert.assertEquals(true, validatePcj(te1, unOpt, externalList, new HashSet<QueryModelNode>())); @@ -190,7 +216,7 @@ public class PCJOptimizerTest { @Test public void testSubsetMatchLargeReOrdered() throws Exception { - String query1 = ""// + final String query1 = ""// + "SELECT ?a ?b ?c ?d ?e ?f ?g ?h" // + "{" // + " ?a <uri:p0> ?b ." // @@ -203,7 +229,7 @@ public class PCJOptimizerTest { + " OPTIONAL{?b <uri:p4> ?o. ?o <uri:p1> ?p} . "// + "}";// - String query2 = ""// + final String query2 = ""// + "SELECT ?a ?b ?c ?d ?e ?f ?g ?h" // + "{" // + " ?a <uri:p0> ?b ." // @@ -213,15 +239,15 @@ public class PCJOptimizerTest { + " OPTIONAL{?b <uri:p3> ?e. ?e <uri:p1> ?f} . "// + "}";// - SPARQLParser parser = new SPARQLParser(); - ParsedQuery pq1 = parser.parseQuery(query1, null); - ParsedQuery pq2 = parser.parseQuery(query2, null); - TupleExpr te1 = pq1.getTupleExpr(); - TupleExpr te2 = pq2.getTupleExpr(); + final SPARQLParser parser = new SPARQLParser(); + final ParsedQuery pq1 = parser.parseQuery(query1, null); + final ParsedQuery pq2 = parser.parseQuery(query2, null); + final TupleExpr te1 = pq1.getTupleExpr(); + final TupleExpr te2 = pq2.getTupleExpr(); - TupleExpr unOpt = te1.clone(); - List<QueryModelNode> remainingNodes = getNodes(te1); - Set<QueryModelNode> unMatchedNodes = new HashSet<>(); + final TupleExpr unOpt = te1.clone(); + final List<QueryModelNode> remainingNodes = getNodes(te1); + final Set<QueryModelNode> unMatchedNodes = new HashSet<>(); unMatchedNodes.add(remainingNodes.get(8)); unMatchedNodes.add(remainingNodes.get(9)); unMatchedNodes.add(remainingNodes.get(10)); @@ -229,11 +255,12 @@ public class PCJOptimizerTest { unMatchedNodes.add(remainingNodes.get(12)); unMatchedNodes.add(remainingNodes.get(7)); - SimpleExternalTupleSet pcj = new SimpleExternalTupleSet((Projection) te2); - List<ExternalTupleSet> externalList = new ArrayList<>(); + final SimpleExternalTupleSet pcj = new SimpleExternalTupleSet((Projection) te2); + final List<ExternalTupleSet> externalList = new ArrayList<>(); externalList.add(pcj); - PCJOptimizer optimizer = new PCJOptimizer(externalList, false); + provider.setIndices(externalList); + final PCJOptimizer optimizer = new PCJOptimizer(externalList, false, provider); optimizer.optimize(te1, null, null); Assert.assertEquals(true, validatePcj(te1, unOpt, externalList, unMatchedNodes)); @@ -242,7 +269,7 @@ public class PCJOptimizerTest { @Test public void testSwitchTwoBoundVars() throws Exception { - String query1 = ""// + final String query1 = ""// + "SELECT ?a ?b ?c " // + "{" // + " ?a <uri:p0> ?c ." // @@ -254,7 +281,7 @@ public class PCJOptimizerTest { + " ?b <uri:p3> <uri:o3> " // + "}";// - String query2 = ""// + final String query2 = ""// + "SELECT ?a ?b ?c " // + "{" // + " ?a <uri:p2> <uri:o2>. " // @@ -264,23 +291,24 @@ public class PCJOptimizerTest { + " ?b<uri:p1> ?c " // + "}";// - SPARQLParser parser = new SPARQLParser(); - ParsedQuery pq1 = parser.parseQuery(query1, null); - ParsedQuery pq2 = parser.parseQuery(query2, null); - TupleExpr te1 = pq1.getTupleExpr(); - TupleExpr te2 = pq2.getTupleExpr(); + final SPARQLParser parser = new SPARQLParser(); + final ParsedQuery pq1 = parser.parseQuery(query1, null); + final ParsedQuery pq2 = parser.parseQuery(query2, null); + final TupleExpr te1 = pq1.getTupleExpr(); + final TupleExpr te2 = pq2.getTupleExpr(); - TupleExpr unOpt = te1.clone(); - List<QueryModelNode> remainingNodes = getNodes(te1); - Set<QueryModelNode> unMatchedNodes = new HashSet<>(); + final TupleExpr unOpt = te1.clone(); + final List<QueryModelNode> remainingNodes = getNodes(te1); + final Set<QueryModelNode> unMatchedNodes = new HashSet<>(); unMatchedNodes.add(remainingNodes.get(1)); unMatchedNodes.add(remainingNodes.get(2)); - SimpleExternalTupleSet pcj = new SimpleExternalTupleSet((Projection) te2); - List<ExternalTupleSet> externalList = new ArrayList<>(); + final SimpleExternalTupleSet pcj = new SimpleExternalTupleSet((Projection) te2); + final List<ExternalTupleSet> externalList = new ArrayList<>(); externalList.add(pcj); - PCJOptimizer optimizer = new PCJOptimizer(externalList, false); + provider.setIndices(externalList); + final PCJOptimizer optimizer = new PCJOptimizer(externalList, false, provider); optimizer.optimize(te1, null, null); Assert.assertEquals(true, validatePcj(te1, unOpt, externalList, unMatchedNodes)); @@ -289,7 +317,7 @@ public class PCJOptimizerTest { @Test public void testSegmentWithLargeUnion() throws Exception { - String query1 = ""// + final String query1 = ""// + "SELECT ?e ?c ?l" // + "{" // + " {?e <uri:p1> <uri:o1>. } UNION { " // @@ -304,7 +332,7 @@ public class PCJOptimizerTest { + " ?e <uri:p3> <uri:o2> . "// + "}";// - String query2 = ""// + final String query2 = ""// + "SELECT ?a ?b ?c " // + "{" // + " ?a <uri:p2> <uri:o2>. " // @@ -314,35 +342,36 @@ public class PCJOptimizerTest { + " ?b<uri:p1> ?c " // + "}";// - String query3 = ""// + final String query3 = ""// + "SELECT ?h ?i" // + "{" // + " ?h <uri:p2> ?i . "// + " ?h <uri:p3> <uri:o2> . "// + "}";// - SPARQLParser parser = new SPARQLParser(); - ParsedQuery pq1 = parser.parseQuery(query1, null); - ParsedQuery pq2 = parser.parseQuery(query2, null); - ParsedQuery pq3 = parser.parseQuery(query3, null); - TupleExpr te1 = pq1.getTupleExpr(); - TupleExpr te2 = pq2.getTupleExpr(); - TupleExpr te3 = pq3.getTupleExpr(); - - TupleExpr unOpt = te1.clone(); - List<QueryModelNode> remainingNodes = getNodes(te1); - Set<QueryModelNode> unMatchedNodes = new HashSet<>(); + final SPARQLParser parser = new SPARQLParser(); + final ParsedQuery pq1 = parser.parseQuery(query1, null); + final ParsedQuery pq2 = parser.parseQuery(query2, null); + final ParsedQuery pq3 = parser.parseQuery(query3, null); + final TupleExpr te1 = pq1.getTupleExpr(); + final TupleExpr te2 = pq2.getTupleExpr(); + final TupleExpr te3 = pq3.getTupleExpr(); + + final TupleExpr unOpt = te1.clone(); + final List<QueryModelNode> remainingNodes = getNodes(te1); + final Set<QueryModelNode> unMatchedNodes = new HashSet<>(); unMatchedNodes.add(remainingNodes.get(0)); unMatchedNodes.add(remainingNodes.get(2)); unMatchedNodes.add(remainingNodes.get(3)); - SimpleExternalTupleSet pcj1 = new SimpleExternalTupleSet((Projection) te2); - SimpleExternalTupleSet pcj2 = new SimpleExternalTupleSet((Projection) te3); - List<ExternalTupleSet> externalList = new ArrayList<>(); + final SimpleExternalTupleSet pcj1 = new SimpleExternalTupleSet((Projection) te2); + final SimpleExternalTupleSet pcj2 = new SimpleExternalTupleSet((Projection) te3); + final List<ExternalTupleSet> externalList = new ArrayList<>(); externalList.add(pcj1); externalList.add(pcj2); - PCJOptimizer optimizer = new PCJOptimizer(externalList, false); + provider.setIndices(externalList); + final PCJOptimizer optimizer = new PCJOptimizer(externalList, false, provider); optimizer.optimize(te1, null, null); Assert.assertEquals(true, validatePcj(te1, unOpt, externalList, unMatchedNodes)); @@ -352,7 +381,7 @@ public class PCJOptimizerTest { @Test public void testSegmentWithUnionAndFilters() throws Exception { - String query1 = ""// + final String query1 = ""// + "SELECT ?e ?c ?l" // + "{" // + " Filter(?e = <uri:s1>) " // @@ -362,7 +391,7 @@ public class PCJOptimizerTest { + " ?e <uri:p3> <uri:o2> . "// + "}";// - String query2 = ""// + final String query2 = ""// + "SELECT ?a ?b ?m" // + "{" // + " Filter(?b = <uri:s2>) " // @@ -372,7 +401,7 @@ public class PCJOptimizerTest { + " ?a a ?b . "// + "}";// - String query3 = ""// + final String query3 = ""// + "SELECT ?h ?i" // + "{" // + " Filter(?h = <uri:s1>) " // @@ -380,26 +409,27 @@ public class PCJOptimizerTest { + " ?h <uri:p3> <uri:o2> . "// + "}";// - SPARQLParser parser = new SPARQLParser(); - ParsedQuery pq1 = parser.parseQuery(query1, null); - ParsedQuery pq2 = parser.parseQuery(query2, null); - ParsedQuery pq3 = parser.parseQuery(query3, null); - TupleExpr te1 = pq1.getTupleExpr(); - TupleExpr te2 = pq2.getTupleExpr(); - TupleExpr te3 = pq3.getTupleExpr(); - - TupleExpr unOpt = te1.clone(); - List<QueryModelNode> remainingNodes = getNodes(te1); - Set<QueryModelNode> unMatchedNodes = new HashSet<>(); + final SPARQLParser parser = new SPARQLParser(); + final ParsedQuery pq1 = parser.parseQuery(query1, null); + final ParsedQuery pq2 = parser.parseQuery(query2, null); + final ParsedQuery pq3 = parser.parseQuery(query3, null); + final TupleExpr te1 = pq1.getTupleExpr(); + final TupleExpr te2 = pq2.getTupleExpr(); + final TupleExpr te3 = pq3.getTupleExpr(); + + final TupleExpr unOpt = te1.clone(); + final List<QueryModelNode> remainingNodes = getNodes(te1); + final Set<QueryModelNode> unMatchedNodes = new HashSet<>(); unMatchedNodes.add(remainingNodes.get(0)); - SimpleExternalTupleSet pcj1 = new SimpleExternalTupleSet((Projection) te2); - SimpleExternalTupleSet pcj2 = new SimpleExternalTupleSet((Projection) te3); - List<ExternalTupleSet> externalList = new ArrayList<>(); + final SimpleExternalTupleSet pcj1 = new SimpleExternalTupleSet((Projection) te2); + final SimpleExternalTupleSet pcj2 = new SimpleExternalTupleSet((Projection) te3); + final List<ExternalTupleSet> externalList = new ArrayList<>(); externalList.add(pcj1); externalList.add(pcj2); - PCJOptimizer optimizer = new PCJOptimizer(externalList, false); + provider.setIndices(externalList); + final PCJOptimizer optimizer = new PCJOptimizer(externalList, false, provider); optimizer.optimize(te1, null, null); Assert.assertEquals(true, validatePcj(te1, unOpt, externalList, unMatchedNodes)); @@ -409,7 +439,7 @@ public class PCJOptimizerTest { @Test public void testSegmentWithLeftJoinsAndFilters() throws Exception { - String query1 = ""// + final String query1 = ""// + "SELECT ?e ?c ?l" // + "{" // + " Filter(?e = <uri:s1>) " // @@ -419,14 +449,14 @@ public class PCJOptimizerTest { + " OPTIONAL {?e <uri:p2> ?c } . "// + "}";// - String query2 = ""// + final String query2 = ""// + "SELECT ?e ?c ?l" // + "{" // + " Filter(?c = <uri:s2>) " // + " ?e <uri:p1> <uri:o1>. " + " OPTIONAL {?e <uri:p2> ?l}. " + " ?c <uri:p3> <uri:o3> . "// + "}";// - String query3 = ""// + final String query3 = ""// + "SELECT ?e ?c" // + "{" // + " Filter(?e = <uri:s1>) " // @@ -434,23 +464,24 @@ public class PCJOptimizerTest { + " OPTIONAL {?e <uri:p2> ?c } . "// + "}";// - SPARQLParser parser = new SPARQLParser(); - ParsedQuery pq1 = parser.parseQuery(query1, null); - ParsedQuery pq2 = parser.parseQuery(query2, null); - ParsedQuery pq3 = parser.parseQuery(query3, null); - TupleExpr te1 = pq1.getTupleExpr(); - TupleExpr te2 = pq2.getTupleExpr(); - TupleExpr te3 = pq3.getTupleExpr(); + final SPARQLParser parser = new SPARQLParser(); + final ParsedQuery pq1 = parser.parseQuery(query1, null); + final ParsedQuery pq2 = parser.parseQuery(query2, null); + final ParsedQuery pq3 = parser.parseQuery(query3, null); + final TupleExpr te1 = pq1.getTupleExpr(); + final TupleExpr te2 = pq2.getTupleExpr(); + final TupleExpr te3 = pq3.getTupleExpr(); - TupleExpr unOpt = te1.clone(); + final TupleExpr unOpt = te1.clone(); - SimpleExternalTupleSet pcj1 = new SimpleExternalTupleSet((Projection) te2); - SimpleExternalTupleSet pcj2 = new SimpleExternalTupleSet((Projection) te3); - List<ExternalTupleSet> externalList = new ArrayList<>(); + final SimpleExternalTupleSet pcj1 = new SimpleExternalTupleSet((Projection) te2); + final SimpleExternalTupleSet pcj2 = new SimpleExternalTupleSet((Projection) te3); + final List<ExternalTupleSet> externalList = new ArrayList<>(); externalList.add(pcj1); externalList.add(pcj2); - PCJOptimizer optimizer = new PCJOptimizer(externalList, false); + provider.setIndices(externalList); + final PCJOptimizer optimizer = new PCJOptimizer(externalList, false, provider); optimizer.optimize(te1, null, null); Assert.assertEquals(true, validatePcj(te1, unOpt, externalList, new HashSet<QueryModelNode>())); @@ -459,7 +490,7 @@ public class PCJOptimizerTest { @Test public void testJoinMatcherRejectsLeftJoinPcj() throws Exception { - String query1 = ""// + final String query1 = ""// + "SELECT ?e ?c ?l" // + "{" // + " ?e a ?c . "// @@ -467,7 +498,7 @@ public class PCJOptimizerTest { + " ?e <http://www.w3.org/2000/01/rdf-schema#label> ?l "// + "}";// - String query2 = ""// + final String query2 = ""// + "SELECT ?a ?b ?m" // + "{" // + " ?a a ?b . "// @@ -475,43 +506,44 @@ public class PCJOptimizerTest { + " OPTIONAL {?a <http://www.w3.org/2000/01/rdf-schema#label> ?m} . "// + "}";// - SPARQLParser parser = new SPARQLParser(); - ParsedQuery pq1 = parser.parseQuery(query1, null); - ParsedQuery pq2 = parser.parseQuery(query2, null); - TupleExpr te1 = pq1.getTupleExpr(); - TupleExpr te2 = pq2.getTupleExpr(); - TupleExpr expected = te1.clone(); + final SPARQLParser parser = new SPARQLParser(); + final ParsedQuery pq1 = parser.parseQuery(query1, null); + final ParsedQuery pq2 = parser.parseQuery(query2, null); + final TupleExpr te1 = pq1.getTupleExpr(); + final TupleExpr te2 = pq2.getTupleExpr(); + final TupleExpr expected = te1.clone(); - SimpleExternalTupleSet pcj = new SimpleExternalTupleSet((Projection) te2); - List<ExternalTupleSet> externalList = new ArrayList<>(); + final SimpleExternalTupleSet pcj = new SimpleExternalTupleSet((Projection) te2); + final List<ExternalTupleSet> externalList = new ArrayList<>(); externalList.add(pcj); - PCJOptimizer optimizer = new PCJOptimizer(externalList, false); + provider.setIndices(externalList); + final PCJOptimizer optimizer = new PCJOptimizer(externalList, false, provider); optimizer.optimize(te1, null, null); Assert.assertEquals(expected, te1); } - private List<QueryModelNode> getNodes(TupleExpr te) { - NodeCollector collector = new NodeCollector(); + private List<QueryModelNode> getNodes(final TupleExpr te) { + final NodeCollector collector = new NodeCollector(); te.visit(collector); return collector.getNodes(); } - private boolean validatePcj(TupleExpr optTupleExp, TupleExpr unOptTup, List<ExternalTupleSet> pcjs, Set<QueryModelNode> expUnmatchedNodes) { + private boolean validatePcj(final TupleExpr optTupleExp, final TupleExpr unOptTup, final List<ExternalTupleSet> pcjs, final Set<QueryModelNode> expUnmatchedNodes) { - IndexedExecutionPlanGenerator iep = new IndexedExecutionPlanGenerator( + final IndexedExecutionPlanGenerator iep = new IndexedExecutionPlanGenerator( unOptTup, pcjs); - List<ExternalTupleSet> indexList = iep.getNormalizedIndices(); - Set<QueryModelNode> indexSet = new HashSet<>(); - for(ExternalTupleSet etup: indexList) { + final List<ExternalTupleSet> indexList = iep.getNormalizedIndices(); + final Set<QueryModelNode> indexSet = new HashSet<>(); + for(final ExternalTupleSet etup: indexList) { indexSet.add(etup); } - Set<QueryModelNode> tupNodes = Sets.newHashSet(getNodes(optTupleExp)); + final Set<QueryModelNode> tupNodes = Sets.newHashSet(getNodes(optTupleExp)); - Set<QueryModelNode> diff = Sets.difference(tupNodes, indexSet); + final Set<QueryModelNode> diff = Sets.difference(tupNodes, indexSet); return diff.equals(expUnmatchedNodes); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/KafkaLatencyBenchmark.java ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/KafkaLatencyBenchmark.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/KafkaLatencyBenchmark.java index e19e965..e75d499 100644 --- a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/KafkaLatencyBenchmark.java +++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/KafkaLatencyBenchmark.java @@ -113,8 +113,8 @@ public class KafkaLatencyBenchmark implements AutoCloseable { public KafkaLatencyBenchmark(final CommonOptions options, final BenchmarkOptions benchmarkOptions) throws AccumuloException, AccumuloSecurityException { this.options = Objects.requireNonNull(options); this.benchmarkOptions = Objects.requireNonNull(benchmarkOptions); - this.client = Objects.requireNonNull(options.buildRyaClient()); - this.startTime = LocalDateTime.now(); + client = Objects.requireNonNull(options.buildRyaClient()); + startTime = LocalDateTime.now(); logger.info("Running {} with the following input parameters:\n{}\n{}", this.getClass(), options, benchmarkOptions); } @@ -172,7 +172,7 @@ public class KafkaLatencyBenchmark implements AutoCloseable { + "group by ?type"; logger.info("Query: {}", sparql); - return client.getCreatePCJ().get().createPCJ(options.getRyaInstance(), sparql, ImmutableSet.of(ExportStrategy.KAFKA)); + return client.getCreatePCJ().createPCJ(options.getRyaInstance(), sparql, ImmutableSet.of(ExportStrategy.KAFKA)); } private String issuePeriodicQuery(final PeriodicQueryCommand periodicOptions) throws InstanceDoesNotExistException, RyaClientException { @@ -352,7 +352,7 @@ public class KafkaLatencyBenchmark implements AutoCloseable { logger.info("Publishing {} Observations", observationsPerIteration); final long t1 = System.currentTimeMillis(); loadStatements.loadStatements(ryaInstanceName, statements); - logger.info("Published {} observations in in {}s", observationsPerIteration, ((System.currentTimeMillis() - t1)/1000.0)); + logger.info("Published {} observations in in {}s", observationsPerIteration, (System.currentTimeMillis() - t1)/1000.0); logger.info("Updating published totals..."); for(int typeId = 0; typeId < numTypes; typeId++) { typeToStatMap.get(typePrefix + typeId).total.addAndGet(observationsPerTypePerIteration); @@ -367,7 +367,7 @@ public class KafkaLatencyBenchmark implements AutoCloseable { } public void setShutdownOperation(final Runnable f) { - this.shutdownOperation = f; + shutdownOperation = f; } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/PCJOptimizerBenchmark.java ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/PCJOptimizerBenchmark.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/PCJOptimizerBenchmark.java index 6841048..cf4ca8f 100644 --- a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/PCJOptimizerBenchmark.java +++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/PCJOptimizerBenchmark.java @@ -28,9 +28,11 @@ import java.util.Map; import java.util.Objects; import java.util.Queue; +import org.apache.hadoop.conf.Configuration; import org.apache.rya.indexing.external.tupleSet.ExternalTupleSet; import org.apache.rya.indexing.external.tupleSet.SimpleExternalTupleSet; import org.apache.rya.indexing.pcj.matching.PCJOptimizer; +import org.apache.rya.indexing.pcj.matching.provider.AccumuloIndexSetProvider; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; @@ -106,12 +108,12 @@ public class PCJOptimizerBenchmark { final BenchmarkValues chainedValues = new BenchmarkValues( makeChainedQuery(benchmarkParams), makeChainedPCJOptimizer(benchmarkParams)); - this.chainedBenchmarkValues.put(benchmarkParams, chainedValues); + chainedBenchmarkValues.put(benchmarkParams, chainedValues); final BenchmarkValues unchainedValues = new BenchmarkValues( makeUnchainedQuery(benchmarkParams), makeUnchainedPCJOptimizer(benchmarkParams)); - this.unchainedBenchmarkValues.put(benchmarkParams, unchainedValues); + unchainedBenchmarkValues.put(benchmarkParams, unchainedValues); } } } @@ -215,7 +217,7 @@ public class PCJOptimizerBenchmark { } // Create the optimizer. - return new PCJOptimizer(indices, false); + return new PCJOptimizer(indices, false, new AccumuloIndexSetProvider(new Configuration())); } private static PCJOptimizer makeChainedPCJOptimizer(final BenchmarkParams params) throws Exception { @@ -252,7 +254,7 @@ public class PCJOptimizerBenchmark { } // Create the optimizer. - return new PCJOptimizer(indices, false); + return new PCJOptimizer(indices, false, new AccumuloIndexSetProvider(new Configuration())); } private static String buildUnchainedSPARQL(final List<String> vars) { @@ -274,8 +276,8 @@ public class PCJOptimizerBenchmark { } return "select " + Joiner.on(" ").join(vars) + " where { " + - Joiner.on(" . ").join(statementPatterns) + - " . }" ; + Joiner.on(" . ").join(statementPatterns) + + " . }" ; } private static String buildChainedSPARQL(final List<String> vars) { @@ -298,8 +300,8 @@ public class PCJOptimizerBenchmark { // Build the SPARQL query from the pieces. return "select " + Joiner.on(" ").join(vars) + " where { " + - Joiner.on(" . ").join(statementPatterns) + - " . }" ; + Joiner.on(" . ").join(statementPatterns) + + " . }" ; } /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java b/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java index cc5ba8b..dd5fe68 100644 --- a/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java +++ b/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java @@ -148,7 +148,7 @@ public class QueryBenchmarkRunIT { final String pcjId = pcjs.createPcj(SPARQL_QUERY); // Batch update the PCJ using the Rya Client. - ryaClient.getBatchUpdatePCJ().get().batchUpdate(RYA_INSTANCE_NAME, pcjId); + ryaClient.getBatchUpdatePCJ().batchUpdate(RYA_INSTANCE_NAME, pcjId); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.indexing.pcj/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/pom.xml b/extras/rya.indexing.pcj/pom.xml index 797c2fb..1d4a4b6 100644 --- a/extras/rya.indexing.pcj/pom.xml +++ b/extras/rya.indexing.pcj/pom.xml @@ -48,6 +48,10 @@ under the License. <groupId>org.apache.rya</groupId> <artifactId>accumulo.rya</artifactId> </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>mongodb.rya</artifactId> + </dependency> <!-- Accumulo support dependencies. --> <dependency> @@ -90,5 +94,11 @@ under the License. <type>test-jar</type> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>mongodb.rya</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoBindingSetConverter.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoBindingSetConverter.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoBindingSetConverter.java new file mode 100644 index 0000000..010f8bc --- /dev/null +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoBindingSetConverter.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.mongo; + +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter; +import org.bson.Document; +import org.openrdf.query.BindingSet; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Converts {@link BindingSet}s into other representations. This library is + * intended to convert between BindingSet and {@link Document}. + */ +@DefaultAnnotation(NonNull.class) +public interface MongoBindingSetConverter extends BindingSetConverter<Document> { + + /** + * Converts a {@link BindingSet} into a MongoDB model. + * + * @param bindingSet - The BindingSet that will be converted. (not null) + * @return The BindingSet formatted as Mongo Bson object. + * @throws BindingSetConversionException The BindingSet was unable to be + * converted. This will happen if one of the values could not be + * converted into the target model. + */ + public Document convert(BindingSet bindingSet) throws BindingSetConversionException; + + /** + * Converts a MongoDB model into a {@link BindingSet}. + * + * @param bindingSet - The bson that will be converted. (not null) + * @return The BindingSet created from a Mongo Bson object. + * @throws BindingSetConversionException The Bson was unable to be + * converted. This will happen if one of the values could not be + * converted into a BindingSet. + */ + public BindingSet convert(Document bindingSet) throws BindingSetConversionException; +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjDocuments.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjDocuments.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjDocuments.java new file mode 100644 index 0000000..ecfbc1c --- /dev/null +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjDocuments.java @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.mongo; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.api.resolver.RyaToRdfConversions; +import org.apache.rya.api.utils.CloseableIterator; +import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.PcjVarOrderFactory; +import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.QueryLanguage; +import org.openrdf.query.TupleQuery; +import org.openrdf.query.TupleQueryResult; +import org.openrdf.query.impl.MapBindingSet; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.RepositoryException; + +import com.mongodb.MongoClient; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoCollection; +import com.mongodb.util.JSON; + +/** + * Creates and modifies PCJs in MongoDB. PCJ's are stored as follows: + * + * <pre> + * <code> + * ----- PCJ Metadata Doc ----- + * { + * _id: [pcj_ID]_METADATA, + * sparql: [sparql query to match results], + * varOrders: [varOrder1, VarOrder2, ..., VarOrdern] + * cardinality: [number of results] + * } + * + * ----- PCJ Results Doc ----- + * { + * pcjId: [pcj_ID], + * visibilities: [visibilities] + * [binding_var1]: { + * uri: [type_uri], + * value: [value] + * } + * . + * . + * . + * [binding_varn]: { + * uri: [type_uri], + * value: [value] + * } + * } + * </code> + * </pre> + */ +public class MongoPcjDocuments { + public static final String PCJ_COLLECTION_NAME = "pcjs"; + + // metadata fields + public static final String CARDINALITY_FIELD = "cardinality"; + public static final String SPARQL_FIELD = "sparql"; + public static final String PCJ_METADATA_ID = "_id"; + public static final String VAR_ORDER_FIELD = "varOrders"; + + // pcj results fields + private static final String BINDING_VALUE = "value"; + private static final String BINDING_TYPE = "rdfType"; + private static final String VISIBILITIES_FIELD = "visibilities"; + private static final String PCJ_ID = "pcjId"; + + private final MongoCollection<Document> pcjCollection; + private static final PcjVarOrderFactory pcjVarOrderFactory = new ShiftVarOrderFactory(); + + /** + * Creates a new {@link MongoPcjDocuments}. + * @param client - The {@link MongoClient} to use to connect to mongo. + * @param ryaInstanceName - The rya instance to connect to. + */ + public MongoPcjDocuments(final MongoClient client, final String ryaInstanceName) { + requireNonNull(client); + requireNonNull(ryaInstanceName); + pcjCollection = client.getDatabase(ryaInstanceName).getCollection(PCJ_COLLECTION_NAME); + } + + private String makeMetadataID(final String pcjId) { + return pcjId + "_METADATA"; + } + + /** + * Creates a {@link Document} containing the metadata defining the PCj. + * + * @param pcjId - Uniquely identifies a PCJ within Rya. (not null) + * @param sparql - The sparql query the PCJ will use. + * @return The document built around the provided metadata. + * @throws PCJStorageException - Thrown when the sparql query is malformed. + */ + public Document makeMetadataDocument(final String pcjId, final String sparql) throws PCJStorageException { + requireNonNull(pcjId); + requireNonNull(sparql); + + final Set<VariableOrder> varOrders; + try { + varOrders = pcjVarOrderFactory.makeVarOrders(sparql); + } catch (final MalformedQueryException e) { + throw new PCJStorageException("Can not create the PCJ. The SPARQL is malformed.", e); + } + + return new Document() + .append(PCJ_METADATA_ID, makeMetadataID(pcjId)) + .append(SPARQL_FIELD, sparql) + .append(CARDINALITY_FIELD, 0) + .append(VAR_ORDER_FIELD, varOrders); + + } + + /** + * Creates a new PCJ based on the provided metadata. The initial pcj results + * will be empty. + * + * @param pcjId - Uniquely identifies a PCJ within Rya. + * @param sparql - The query the pcj is assigned to. + * @throws PCJStorageException - Thrown when the sparql query is malformed. + */ + public void createPcj(final String pcjId, final String sparql) throws PCJStorageException { + pcjCollection.insertOne(makeMetadataDocument(pcjId, sparql)); + } + + /** + * Creates a new PCJ document and populates it by scanning an instance of + * Rya for historic matches. + * <p> + * If any portion of this operation fails along the way, the partially + * create PCJ documents will be left in Mongo. + * + * @param ryaConn - Connects to the Rya that will be scanned. (not null) + * @param pcjId - Uniquely identifies a PCJ within Rya. (not null) + * @param sparql - The SPARQL query whose results will be loaded into the PCJ results document. (not null) + * @throws PCJStorageException The PCJ documents could not be create or the + * values from Rya were not able to be loaded into it. + */ + public void createAndPopulatePcj( + final RepositoryConnection ryaConn, + final String pcjId, + final String sparql) throws PCJStorageException { + checkNotNull(ryaConn); + checkNotNull(pcjId); + checkNotNull(sparql); + + // Create the PCJ document in Mongo. + createPcj(pcjId, sparql); + + // Load historic matches from Rya into the PCJ results document. + populatePcj(pcjId, ryaConn); + } + + /** + * Gets the {@link PcjMetadata} from a provided PCJ Id. + * + * @param pcjId - The Id of the PCJ to get from MongoDB. (not null) + * @return - The {@link PcjMetadata} of the Pcj specified. + * @throws PCJStorageException The PCJ metadata document does not exist. + */ + public PcjMetadata getPcjMetadata(final String pcjId) throws PCJStorageException { + requireNonNull(pcjId); + + // since query by ID, there will only be one. + final Document result = pcjCollection.find(new Document(PCJ_METADATA_ID, makeMetadataID(pcjId))).first(); + + if(result == null) { + throw new PCJStorageException("The PCJ: " + pcjId + " does not exist."); + } + + final String sparql = result.getString(SPARQL_FIELD); + final int cardinality = result.getInteger(CARDINALITY_FIELD, 0); + final List<List<String>> varOrders= (List<List<String>>) result.get(VAR_ORDER_FIELD); + final Set<VariableOrder> varOrder = new HashSet<>(); + for(final List<String> vars : varOrders) { + varOrder.add(new VariableOrder(vars)); + } + + return new PcjMetadata(sparql, cardinality, varOrder); + } + + /** + * Adds binding set results to a specific PCJ. + * + * @param pcjId - Uniquely identifies a PCJ within Rya. (not null) + * @param results - The binding set results. (not null) + */ + public void addResults(final String pcjId, final Collection<VisibilityBindingSet> results) { + checkNotNull(pcjId); + checkNotNull(results); + + final List<Document> pcjDocs = new ArrayList<>(); + for (final VisibilityBindingSet vbs : results) { + // each binding gets it's own doc. + final Document bindingDoc = new Document(PCJ_ID, pcjId); + vbs.forEach(binding -> { + final RyaType type = RdfToRyaConversions.convertValue(binding.getValue()); + bindingDoc.append(binding.getName(), + new Document() + .append(BINDING_TYPE, type.getDataType().stringValue()) + .append(BINDING_VALUE, type.getData()) + ); + }); + bindingDoc.append(VISIBILITIES_FIELD, vbs.getVisibility()); + pcjDocs.add(bindingDoc); + } + pcjCollection.insertMany(pcjDocs); + + // update cardinality in the metadata doc. + final int appendCardinality = pcjDocs.size(); + final Bson query = new Document(PCJ_METADATA_ID, makeMetadataID(pcjId)); + final Bson update = new Document("$inc", new Document(CARDINALITY_FIELD, appendCardinality)); + pcjCollection.updateOne(query, update); + } + + /** + * Purges all results from the PCJ results document with the provided Id. + * + * @param pcjId - The Id of the PCJ to purge. (not null) + */ + public void purgePcjs(final String pcjId) { + requireNonNull(pcjId); + + // remove every doc for the pcj, except the metadata + final Bson filter = new Document(PCJ_ID, pcjId); + pcjCollection.deleteMany(filter); + + // reset cardinality + final Bson query = new Document(PCJ_METADATA_ID, makeMetadataID(pcjId)); + final Bson update = new Document("$set", new Document(CARDINALITY_FIELD, 0)); + pcjCollection.updateOne(query, update); + } + + /** + * Scan Rya for results that solve the PCJ's query and store them in the PCJ + * document. + * <p> + * This method assumes the PCJ document has already been created. + * + * @param pcjId - The Id of the PCJ that will receive the results. (not null) + * @param ryaConn - A connection to the Rya store that will be queried to find results. (not null) + * @throws PCJStorageException If results could not be written to the PCJ results document, + * the PCJ results document does not exist, or the query that is being execute was malformed. + */ + public void populatePcj(final String pcjId, final RepositoryConnection ryaConn) throws PCJStorageException { + checkNotNull(pcjId); + checkNotNull(ryaConn); + + try { + // Fetch the query that needs to be executed from the PCJ metadata document. + final PcjMetadata pcjMetadata = getPcjMetadata(pcjId); + final String sparql = pcjMetadata.getSparql(); + + // Query Rya for results to the SPARQL query. + final TupleQuery query = ryaConn.prepareTupleQuery(QueryLanguage.SPARQL, sparql); + final TupleQueryResult results = query.evaluate(); + + // Load batches of 1000 of them at a time into the PCJ results document. + final Set<VisibilityBindingSet> batch = new HashSet<>(1000); + while(results.hasNext()) { + final VisibilityBindingSet bs = new VisibilityBindingSet(results.next()); + batch.add( bs ); + if(batch.size() == 1000) { + addResults(pcjId, batch); + batch.clear(); + } + } + + if(!batch.isEmpty()) { + addResults(pcjId, batch); + } + + } catch (RepositoryException | MalformedQueryException | QueryEvaluationException e) { + throw new PCJStorageException( + "Could not populate a PCJ document with Rya results for the pcj with Id: " + pcjId, e); + } + } + + /** + * List the document Ids of the PCJs that are stored in MongoDB + * for this instance of Rya. + * + * @return A list of pcj document Ids that hold PCJ index data for the current + * instance of Rya. + */ + public List<String> listPcjDocuments() { + final List<String> pcjIds = new ArrayList<>(); + + //This Bson string reads as: + //{} - no search criteria: find all + //{ _id: 1 } - only return the _id, which is the PCJ Id. + final FindIterable<Document> rez = pcjCollection.find((Bson) JSON.parse("{ }, { " + PCJ_METADATA_ID + ": 1 , _id: 0}")); + final Iterator<Document> iter = rez.iterator(); + while(iter.hasNext()) { + pcjIds.add(iter.next().get(PCJ_METADATA_ID).toString().replace("_METADATA", "")); + } + + return pcjIds; + } + + /** + * Returns all of the results of a PCJ. + * + * @param pcjId + * - The PCJ to get the results for. (not null) + * @return The authorized PCJ results. + */ + public CloseableIterator<BindingSet> listResults(final String pcjId) { + requireNonNull(pcjId); + + // get all results based on pcjId + return queryForBindings(new Document(PCJ_ID, pcjId)); + } + + /** + * Retrieves the stored {@link BindingSet} results for the provided pcjId. + * + * @param pcjId - The Id of the PCJ to retrieve results from. + * @param restrictionBindings - The collection of {@link BindingSet}s to restrict results. + * <p> + * Note: the result restrictions from {@link BindingSet}s are an OR + * over ANDS in that: <code> + * [ + * bindingset: binding AND binding AND binding, + * OR + * bindingset: binding AND binding AND binding, + * . + * . + * . + * OR + * bindingset: binding + * ] + * </code> + * @return + */ + public CloseableIterator<BindingSet> getResults(final String pcjId, final Collection<BindingSet> restrictionBindings) { + // empty bindings return all results. + if (restrictionBindings.size() == 1 && restrictionBindings.iterator().next().size() == 0) { + return listResults(pcjId); + } + + final Document query = new Document(PCJ_ID, pcjId); + final Document bindingSetDoc = new Document(); + final List<Document> bindingSetList = new ArrayList<>(); + restrictionBindings.forEach(bindingSet -> { + final Document bindingDoc = new Document(); + final List<Document> bindings = new ArrayList<>(); + bindingSet.forEach(binding -> { + final RyaType type = RdfToRyaConversions.convertValue(binding.getValue()); + final Document typeDoc = new Document() + .append(BINDING_TYPE, type.getDataType().stringValue()) + .append(BINDING_VALUE, type.getData()); + final Document bind = new Document(binding.getName(), typeDoc); + bindings.add(bind); + }); + bindingDoc.append("$and", bindings); + bindingSetList.add(bindingDoc); + }); + bindingSetDoc.append("$or", bindingSetList); + return queryForBindings(query); + } + + private CloseableIterator<BindingSet> queryForBindings(final Document query) { + final FindIterable<Document> rez = pcjCollection.find(query); + final Iterator<Document> resultsIter = rez.iterator(); + return new CloseableIterator<BindingSet>() { + @Override + public boolean hasNext() { + return resultsIter.hasNext(); + } + + @Override + public BindingSet next() { + final Document bs = resultsIter.next(); + final MapBindingSet binding = new MapBindingSet(); + for (final String key : bs.keySet()) { + if (key.equals(VISIBILITIES_FIELD)) { + // has auths, is a visibility binding set. + } else if (!key.equals("_id") && !key.equals(PCJ_ID)) { + // is the binding value. + final Document typeDoc = (Document) bs.get(key); + final URI dataType = new URIImpl(typeDoc.getString(BINDING_TYPE)); + final RyaType type = new RyaType(dataType, typeDoc.getString(BINDING_VALUE)); + final Value value = RyaToRdfConversions.convertValue(type); + binding.addBinding(key, value); + } + } + return binding; + } + + @Override + public void close() throws Exception { + } + }; + } + + /** + * Drops a pcj based on the PCJ Id. Removing the entire document from Mongo. + * + * @param pcjId - The identifier for the PCJ to remove. + */ + public void dropPcj(final String pcjId) { + purgePcjs(pcjId); + pcjCollection.deleteOne(new Document(PCJ_METADATA_ID, makeMetadataID(pcjId))); + } +} \ No newline at end of file