http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinderTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinderTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinderTest.java deleted file mode 100644 index 8b38923..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/FilterFinderTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.fluo.app; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.util.Arrays; - -import org.junit.Test; -import org.openrdf.model.impl.LiteralImpl; -import org.openrdf.model.impl.URIImpl; -import org.openrdf.model.vocabulary.XMLSchema; -import org.openrdf.query.algebra.Compare; -import org.openrdf.query.algebra.Compare.CompareOp; -import org.openrdf.query.algebra.Filter; -import org.openrdf.query.algebra.ValueConstant; -import org.openrdf.query.algebra.ValueExpr; -import org.openrdf.query.algebra.Var; - -import com.google.common.base.Optional; - -/** - * Tests the methods of {@link FilterFinder}. - */ -public class FilterFinderTest { - - @Test - public void manyFilters() throws Exception { - // The query that will be searched. - final String sparql = - "SELECT ?person ?age " + - "{" + - "FILTER(?age < 30) . " + - "FILTER(?person = <http://Alice>)" + - "?person <http://hasAge> ?age" + - "}"; - - // Create the expected result. - final ValueExpr[] expected = new ValueExpr[2]; - expected[0] = new Compare(new Var("person"), new ValueConstant( new URIImpl("http://Alice") )); - expected[1] = new Compare(new Var("age"), new ValueConstant( new LiteralImpl("30", XMLSchema.INTEGER) ), CompareOp.LT); - - // Run the test. - final FilterFinder finder = new FilterFinder(); - final ValueExpr[] conditions = new ValueExpr[2]; - conditions[0] = finder.findFilter(sparql, 0).get().getCondition(); - conditions[1] = finder.findFilter(sparql, 1).get().getCondition(); - assertTrue( Arrays.equals(expected, conditions) ); - } - - @Test - public void noFilterAtIndex() throws Exception { - // The query that will be searched. - final String sparql = - "SELECT ?person ?age " + - "{" + - "FILTER(?age < 30) . " + - "FILTER(?person = <http://Alice>)" + - "?person <http://hasAge> ?age" + - "}"; - - // Run the test. - final FilterFinder finder = new FilterFinder(); - final Optional<Filter> filter = finder.findFilter(sparql, 4); - assertFalse( filter.isPresent() ); - } -} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDeTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDeTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDeTest.java deleted file mode 100644 index 99791ee..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/VisibilityBindingSetSerDeTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.rya.indexing.pcj.fluo.app; - -import static org.junit.Assert.assertEquals; - -import org.apache.fluo.api.data.Bytes; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; -import org.junit.Test; -import org.openrdf.model.ValueFactory; -import org.openrdf.model.impl.ValueFactoryImpl; -import org.openrdf.query.impl.MapBindingSet; - -/** - * Tests the methods of {@link VisibilityBindingSetSerDe}. - */ -public class VisibilityBindingSetSerDeTest { - - @Test - public void rountTrip() throws Exception { - final ValueFactory vf = new ValueFactoryImpl(); - - final MapBindingSet bs = new MapBindingSet(); - bs.addBinding("name", vf.createLiteral("Alice")); - bs.addBinding("age", vf.createLiteral(5)); - final VisibilityBindingSet original = new VisibilityBindingSet(bs, "u"); - - final VisibilityBindingSetSerDe serde = new VisibilityBindingSetSerDe(); - final Bytes bytes = serde.serialize(original); - final VisibilityBindingSet result = serde.deserialize(bytes); - - assertEquals(original, result); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java new file mode 100644 index 0000000..fe89325 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/batch/serializer/BatchInformationSerializerTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.batch.serializer; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.Optional; + +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Span; +import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task; +import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation; +import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.junit.Test; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +public class BatchInformationSerializerTest { + + @Test + public void testSpanBatchInformationSerialization() { + + SpanBatchDeleteInformation batch = SpanBatchDeleteInformation.builder().setBatchSize(1000) + .setColumn(FluoQueryColumns.PERIODIC_QUERY_BINDING_SET).setSpan(Span.prefix(Bytes.of("prefix"))).build(); + System.out.println(batch); + byte[] batchBytes = BatchInformationSerializer.toBytes(batch); + Optional<BatchInformation> decodedBatch = BatchInformationSerializer.fromBytes(batchBytes); + System.out.println(decodedBatch); + assertEquals(batch, decodedBatch.get()); + } + + @Test + public void testJoinBatchInformationSerialization() { + + QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("a", new URIImpl("urn:123")); + bs.addBinding("b", new URIImpl("urn:456")); + VisibilityBindingSet vBis = new VisibilityBindingSet(bs, "FOUO"); + + JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1000).setTask(Task.Update) + .setColumn(FluoQueryColumns.PERIODIC_QUERY_BINDING_SET).setSpan(Span.prefix(Bytes.of("prefix346"))) + .setJoinType(JoinType.LEFT_OUTER_JOIN).setSide(Side.RIGHT).setVarOrder(new VariableOrder(Arrays.asList("a", "b"))) + .setBs(vBis).build(); + + byte[] batchBytes = BatchInformationSerializer.toBytes(batch); + Optional<BatchInformation> decodedBatch = BatchInformationSerializer.fromBytes(batchBytes); + assertEquals(batch, decodedBatch.get()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java new file mode 100644 index 0000000..c8ca6af --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/PeriodicQueryUtilTest.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.query; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; +import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds; +import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; +import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil.PeriodicQueryNodeRelocator; +import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil.PeriodicQueryNodeVisitor; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.junit.Assert; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.algebra.Filter; +import org.openrdf.query.algebra.FunctionCall; +import org.openrdf.query.algebra.Join; +import org.openrdf.query.algebra.Projection; +import org.openrdf.query.algebra.QueryModelNode; +import org.openrdf.query.algebra.TupleExpr; +import org.openrdf.query.algebra.ValueConstant; +import org.openrdf.query.algebra.ValueExpr; +import org.openrdf.query.algebra.Var; +import org.openrdf.query.algebra.helpers.QueryModelVisitorBase; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; + +public class PeriodicQueryUtilTest { + + private static final ValueFactory vf = new ValueFactoryImpl(); + + + + @Test + public void periodicNodeNotPresentTest() throws Exception { + + List<ValueExpr> values = Arrays.asList(new Var("time"), new ValueConstant(vf.createLiteral(12.0)), new ValueConstant(vf.createLiteral(6.0)), new ValueConstant(vf.createURI(PeriodicQueryUtil.temporalNameSpace + "hours"))); + FunctionCall func = new FunctionCall("uri:func", values); + Optional<PeriodicQueryNode> node1 = PeriodicQueryUtil.getPeriodicQueryNode(func, new Join()); + Assert.assertEquals(false, node1.isPresent()); + } + + + + @Test + public void periodicNodePresentTest() throws Exception { + + List<ValueExpr> values = Arrays.asList(new Var("time"), new ValueConstant(vf.createLiteral(12.0)), new ValueConstant(vf.createLiteral(6.0)), new ValueConstant(vf.createURI(PeriodicQueryUtil.temporalNameSpace + "hours"))); + FunctionCall func = new FunctionCall(PeriodicQueryUtil.PeriodicQueryURI, values); + Optional<PeriodicQueryNode> node1 = PeriodicQueryUtil.getPeriodicQueryNode(func, new Join()); + Assert.assertEquals(true, node1.isPresent()); + + PeriodicQueryNode node2 = new PeriodicQueryNode(12*60*60*1000L, 6*3600*1000L, TimeUnit.MILLISECONDS, "time", new Join()); + + Assert.assertEquals(true, periodicNodesEqualIgnoreArg(node1.get(), node2)); + } + + + @Test + public void periodicNodeFractionalDurationTest() throws Exception { + + List<ValueExpr> values = Arrays.asList(new Var("time"), new ValueConstant(vf.createLiteral(1)), new ValueConstant(vf.createLiteral(.5)), new ValueConstant(vf.createURI(PeriodicQueryUtil.temporalNameSpace + "hours"))); + FunctionCall func = new FunctionCall(PeriodicQueryUtil.PeriodicQueryURI, values); + Optional<PeriodicQueryNode> node1 = PeriodicQueryUtil.getPeriodicQueryNode(func, new Join()); + Assert.assertEquals(true, node1.isPresent()); + + double window = 1*60*60*1000; + double period = .5*3600*1000; + + PeriodicQueryNode node2 = new PeriodicQueryNode((long) window, (long) period, TimeUnit.MILLISECONDS, "time", new Join()); + + Assert.assertEquals(true, periodicNodesEqualIgnoreArg(node1.get(), node2)); + } + + @Test + public void testPeriodicNodePlacement() throws MalformedQueryException { + String query = "prefix function: <http://org.apache.rya/function#> " //n + + "prefix time: <http://www.w3.org/2006/time#> " //n + + "prefix fn: <http://www.w3.org/2006/fn#> " //n + + "select ?obs ?time ?lat where {" //n + + "Filter(function:periodic(?time, 12.0, 6.0,time:hours)) " //n + + "Filter(fn:test(?lat, 25)) " //n + + "?obs <uri:hasTime> ?time. " //n + + "?obs <uri:hasLattitude> ?lat }"; //n + + SPARQLParser parser = new SPARQLParser(); + ParsedQuery pq = parser.parseQuery(query, null); + TupleExpr te = pq.getTupleExpr(); + te.visit(new PeriodicQueryNodeVisitor()); + + PeriodicNodeCollector collector = new PeriodicNodeCollector(); + te.visit(collector); + + PeriodicQueryNode node2 = new PeriodicQueryNode(12*60*60*1000L, 6*3600*1000L, TimeUnit.MILLISECONDS, "time", new Join()); + + Assert.assertEquals(true, periodicNodesEqualIgnoreArg(node2, collector.getPeriodicQueryNode())); + + } + + @Test + public void testPeriodicNodeLocation() throws MalformedQueryException { + String query = "prefix function: <http://org.apache.rya/function#> " //n + + "prefix time: <http://www.w3.org/2006/time#> " //n + + "prefix fn: <http://www.w3.org/2006/fn#> " //n + + "select ?obs ?time ?lat where {" //n + + "Filter(function:periodic(?time, 1,.5,time:hours)) " //n + + "Filter(fn:test(?lat, 25)) " //n + + "?obs <uri:hasTime> ?time. " //n + + "?obs <uri:hasLattitude> ?lat }"; //n + + SPARQLParser parser = new SPARQLParser(); + ParsedQuery pq = parser.parseQuery(query, null); + TupleExpr te = pq.getTupleExpr(); + te.visit(new PeriodicQueryNodeVisitor()); + + PeriodicNodeCollector collector = new PeriodicNodeCollector(); + te.visit(collector); + Assert.assertEquals(2, collector.getPos()); + + te.visit(new PeriodicQueryNodeRelocator()); + collector.resetCount(); + te.visit(collector); + Assert.assertEquals(1, collector.getPos()); + + double window = 1*60*60*1000; + double period = .5*3600*1000; + PeriodicQueryNode node2 = new PeriodicQueryNode((long) window, (long) period, TimeUnit.MILLISECONDS, "time", new Join()); + Assert.assertEquals(true, periodicNodesEqualIgnoreArg(node2, collector.getPeriodicQueryNode())); + + } + + @Test + public void testFluoQueryVarOrders() throws MalformedQueryException { + String query = "prefix function: <http://org.apache.rya/function#> " //n + + "prefix time: <http://www.w3.org/2006/time#> " //n + + "select (count(?obs) as ?total) where {" //n + + "Filter(function:periodic(?time, 12.4, 6.2,time:hours)) " //n + + "?obs <uri:hasTime> ?time. " //n + + "?obs <uri:hasLattitude> ?lat }"; //n + + SPARQLParser parser = new SPARQLParser(); + ParsedQuery pq = parser.parseQuery(query, null); + SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder(); + FluoQuery fluoQuery = builder.make(pq, new NodeIds()); + + PeriodicQueryMetadata periodicMeta = fluoQuery.getPeriodicQueryMetadata().orNull(); + Assert.assertEquals(true, periodicMeta != null); + VariableOrder periodicVars = periodicMeta.getVariableOrder(); + Assert.assertEquals(IncrementalUpdateConstants.PERIODIC_BIN_ID, periodicVars.getVariableOrders().get(0)); + + QueryMetadata queryMeta = fluoQuery.getQueryMetadata().get(); + VariableOrder queryVars = queryMeta.getVariableOrder(); + Assert.assertEquals(IncrementalUpdateConstants.PERIODIC_BIN_ID, queryVars.getVariableOrders().get(0)); + + Collection<AggregationMetadata> aggMetaCollection = fluoQuery.getAggregationMetadata(); + Assert.assertEquals(1, aggMetaCollection.size()); + AggregationMetadata aggMeta = aggMetaCollection.iterator().next(); + VariableOrder aggVars = aggMeta.getVariableOrder(); + Assert.assertEquals(IncrementalUpdateConstants.PERIODIC_BIN_ID, aggVars.getVariableOrders().get(0)); + + System.out.println(fluoQuery); + } + + private boolean periodicNodesEqualIgnoreArg(PeriodicQueryNode node1, PeriodicQueryNode node2) { + return new EqualsBuilder().append(node1.getPeriod(), node2.getPeriod()).append(node1.getWindowSize(), node2.getWindowSize()) + .append(node1.getTemporalVariable(), node2.getTemporalVariable()).append(node1.getUnit(), node2.getUnit()).build(); + } + + private static class PeriodicNodeCollector extends QueryModelVisitorBase<RuntimeException>{ + + private PeriodicQueryNode periodicNode; + int count = 0; + + public PeriodicQueryNode getPeriodicQueryNode() { + return periodicNode; + } + + public int getPos() { + return count; + } + + public void resetCount() { + count = 0; + } + + public void meet(Filter node) { + count++; + node.getArg().visit(this); + } + + public void meet(Projection node) { + count++; + node.getArg().visit(this); + } + + @Override + public void meetOther(QueryModelNode node) { + if(node instanceof PeriodicQueryNode) { + periodicNode = (PeriodicQueryNode) node; + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java index 99ccc58..7a73b41 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java +++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/util/QueryReportRenderer.java @@ -83,8 +83,7 @@ public class QueryReportRenderer { builder.appendItem( new ReportItem("FILTER NODE") ); builder.appendItem( new ReportItem("Node ID", filterMetadata.getNodeId()) ); builder.appendItem( new ReportItem("Variable Order", filterMetadata.getVariableOrder().toString()) ); - builder.appendItem( new ReportItem("Original SPARQL", prettyFormatSparql( filterMetadata.getOriginalSparql()) ) ); - builder.appendItem( new ReportItem("Filter Index", "" + filterMetadata.getFilterIndexWithinSparql()) ); + builder.appendItem( new ReportItem("Filter SPARQL", prettyFormatSparql( filterMetadata.getFilterSparql()))); builder.appendItem( new ReportItem("Parent Node ID", filterMetadata.getParentNodeId()) ); builder.appendItem( new ReportItem("Child Node ID", filterMetadata.getChildNodeId()) ); builder.appendItem( new ReportItem("Count", "" + queryReport.getCount(filterMetadata.getNodeId())) ); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml index 6467191..9591e55 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/pom.xml @@ -43,6 +43,11 @@ <artifactId>rya.indexing</artifactId> </dependency> <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.fluo.test.base</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.fluo</groupId> <artifactId>fluo-api</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/FluoITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/FluoITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/FluoITBase.java deleted file mode 100644 index b5d9428..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/FluoITBase.java +++ /dev/null @@ -1,282 +0,0 @@ -package org.apache.rya.indexing.pcj.fluo; - -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.net.UnknownHostException; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; -import org.apache.accumulo.minicluster.MiniAccumuloCluster; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.rya.accumulo.MiniAccumuloClusterInstance; -import org.apache.rya.accumulo.MiniAccumuloSingleton; -import org.apache.rya.accumulo.RyaTestInstanceRule; -import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; -import org.apache.rya.api.client.accumulo.AccumuloInstall; -import org.apache.zookeeper.ClientCnxn; -import org.junit.After; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.openrdf.repository.RepositoryConnection; -import org.openrdf.repository.RepositoryException; -import org.openrdf.sail.Sail; -import org.openrdf.sail.SailException; - -import org.apache.fluo.api.client.FluoAdmin; -import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException; -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.api.client.FluoFactory; -import org.apache.fluo.api.config.FluoConfiguration; -import org.apache.fluo.api.mini.MiniFluo; -import org.apache.rya.accumulo.AccumuloRdfConfiguration; -import org.apache.rya.api.client.RyaClientException; -import org.apache.rya.api.client.Install; -import org.apache.rya.api.client.Install.DuplicateInstanceNameException; -import org.apache.rya.api.client.Install.InstallConfiguration; -import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; -import org.apache.rya.api.persist.RyaDAOException; -import org.apache.rya.indexing.accumulo.ConfigUtils; -import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig; -import org.apache.rya.rdftriplestore.RyaSailRepository; -import org.apache.rya.rdftriplestore.inference.InferenceEngineException; -import org.apache.rya.sail.config.RyaSailFactory; - -/** - * Integration tests that ensure the Fluo application processes PCJs results - * correctly. - * <p> - * This class is being ignored because it doesn't contain any unit tests. - */ -public abstract class FluoITBase { - private static final Logger log = Logger.getLogger(FluoITBase.class); - - // Mini Accumulo Cluster - private static MiniAccumuloClusterInstance clusterInstance = MiniAccumuloSingleton.getInstance(); - private static MiniAccumuloCluster cluster; - - private static String instanceName = null; - private static String zookeepers = null; - - protected static Connector accumuloConn = null; - - // Fluo data store and connections. - protected MiniFluo fluo = null; - protected FluoConfiguration fluoConfig = null; - protected FluoClient fluoClient = null; - - // Rya data store and connections. - protected RyaSailRepository ryaRepo = null; - protected RepositoryConnection ryaConn = null; - - @Rule - public RyaTestInstanceRule testInstance = new RyaTestInstanceRule(false); - - @BeforeClass - public static void beforeClass() throws Exception { - Logger.getLogger(ClientCnxn.class).setLevel(Level.ERROR); - - // Setup and start the Mini Accumulo. - cluster = clusterInstance.getCluster(); - - // Store a connector to the Mini Accumulo. - instanceName = cluster.getInstanceName(); - zookeepers = cluster.getZooKeepers(); - - final Instance instance = new ZooKeeperInstance(instanceName, zookeepers); - accumuloConn = instance.getConnector(clusterInstance.getUsername(), new PasswordToken(clusterInstance.getPassword())); - } - - @Before - public void setupMiniResources() throws Exception { - // Initialize the Mini Fluo that will be used to store created queries. - fluoConfig = createFluoConfig(); - preFluoInitHook(); - FluoFactory.newAdmin(fluoConfig).initialize(new FluoAdmin.InitializationOptions() - .setClearTable(true) - .setClearZookeeper(true)); - postFluoInitHook(); - fluo = FluoFactory.newMiniFluo(fluoConfig); - fluoClient = FluoFactory.newClient(fluo.getClientConfiguration()); - - // Initialize the Rya that will be used by the tests. - ryaRepo = setupRya(); - ryaConn = ryaRepo.getConnection(); - } - - @After - public void shutdownMiniResources() { - if (ryaConn != null) { - try { - log.info("Shutting down Rya Connection."); - ryaConn.close(); - log.info("Rya Connection shut down."); - } catch (final Exception e) { - log.error("Could not shut down the Rya Connection.", e); - } - } - - if (ryaRepo != null) { - try { - log.info("Shutting down Rya Repo."); - ryaRepo.shutDown(); - log.info("Rya Repo shut down."); - } catch (final Exception e) { - log.error("Could not shut down the Rya Repo.", e); - } - } - - if (fluoClient != null) { - try { - log.info("Shutting down Fluo Client."); - fluoClient.close(); - log.info("Fluo Client shut down."); - } catch (final Exception e) { - log.error("Could not shut down the Fluo Client.", e); - } - } - - if (fluo != null) { - try { - log.info("Shutting down Mini Fluo."); - fluo.close(); - log.info("Mini Fluo shut down."); - } catch (final Exception e) { - log.error("Could not shut down the Mini Fluo.", e); - } - } - } - - protected void preFluoInitHook() throws Exception { - - } - - protected void postFluoInitHook() throws Exception { - - } - - protected MiniAccumuloCluster getMiniAccumuloCluster() { - return cluster; - } - - protected MiniFluo getMiniFluo() { - return fluo; - } - - public RyaSailRepository getRyaSailRepository() { - return ryaRepo; - } - - public Connector getAccumuloConnector() { - return accumuloConn; - } - - public String getRyaInstanceName() { - return testInstance.getRyaInstanceName(); - } - - protected String getUsername() { - return clusterInstance.getUsername(); - } - - protected String getPassword() { - return clusterInstance.getPassword(); - } - - protected FluoConfiguration getFluoConfiguration() { - return fluoConfig; - } - - public AccumuloConnectionDetails createConnectionDetails() { - return new AccumuloConnectionDetails( - clusterInstance.getUsername(), - clusterInstance.getPassword().toCharArray(), - clusterInstance.getInstanceName(), - clusterInstance.getZookeepers()); - } - - private FluoConfiguration createFluoConfig() { - // Configure how the mini fluo will run. - final FluoConfiguration config = new FluoConfiguration(); - config.setMiniStartAccumulo(false); - config.setAccumuloInstance(instanceName); - config.setAccumuloUser(clusterInstance.getUsername()); - config.setAccumuloPassword(clusterInstance.getPassword()); - config.setInstanceZookeepers(zookeepers + "/fluo"); - config.setAccumuloZookeepers(zookeepers); - - config.setApplicationName(getRyaInstanceName()); - config.setAccumuloTable("fluo" + getRyaInstanceName()); - return config; - } - - /** - * Sets up a Rya instance. - */ - protected RyaSailRepository setupRya() - throws AccumuloException, AccumuloSecurityException, RepositoryException, RyaDAOException, - NumberFormatException, UnknownHostException, InferenceEngineException, AlreadyInitializedException, - RyaDetailsRepositoryException, DuplicateInstanceNameException, RyaClientException, SailException { - checkNotNull(instanceName); - checkNotNull(zookeepers); - - // Setup Rya configuration values. - final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); - conf.setTablePrefix(getRyaInstanceName()); - conf.setDisplayQueryPlan(true); - conf.setBoolean(ConfigUtils.USE_MOCK_INSTANCE, false); - conf.set(ConfigUtils.CLOUDBASE_USER, clusterInstance.getUsername()); - conf.set(ConfigUtils.CLOUDBASE_PASSWORD, clusterInstance.getPassword()); - conf.set(ConfigUtils.CLOUDBASE_INSTANCE, clusterInstance.getInstanceName()); - conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, clusterInstance.getZookeepers()); - conf.set(ConfigUtils.USE_PCJ, "true"); - conf.set(ConfigUtils.FLUO_APP_NAME, getRyaInstanceName()); - conf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString()); - conf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString()); - conf.set(ConfigUtils.CLOUDBASE_AUTHS, ""); - - // Install the test instance of Rya. - final Install install = new AccumuloInstall(createConnectionDetails(), accumuloConn); - - final InstallConfiguration installConfig = InstallConfiguration.builder() - .setEnableTableHashPrefix(true) - .setEnableEntityCentricIndex(true) - .setEnableFreeTextIndex(true) - .setEnableTemporalIndex(true) - .setEnablePcjIndex(true) - .setEnableGeoIndex(true) - .setFluoPcjAppName(getRyaInstanceName()) - .build(); - install.install(getRyaInstanceName(), installConfig); - - // Connect to the instance of Rya that was just installed. - final Sail sail = RyaSailFactory.getInstance(conf); - final RyaSailRepository ryaRepo = new RyaSailRepository(sail); - - return ryaRepo; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java deleted file mode 100644 index 452dd27..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/KafkaExportITBase.java +++ /dev/null @@ -1,370 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.fluo; - -import static java.util.Objects.requireNonNull; -import static org.junit.Assert.assertEquals; - -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; - -import org.I0Itec.zkclient.ZkClient; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.minicluster.MiniAccumuloCluster; -import org.apache.fluo.api.config.ObserverSpecification; -import org.apache.fluo.recipes.test.AccumuloExportITBase; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.rya.accumulo.AccumuloRdfConfiguration; -import org.apache.rya.accumulo.AccumuloRyaDAO; -import org.apache.rya.api.client.Install.InstallConfiguration; -import org.apache.rya.api.client.RyaClient; -import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; -import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; -import org.apache.rya.indexing.accumulo.ConfigUtils; -import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig; -import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters; -import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe; -import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver; -import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver; -import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; -import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; -import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver; -import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver; -import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; -import org.apache.rya.rdftriplestore.RyaSailRepository; -import org.apache.rya.sail.config.RyaSailFactory; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.openrdf.model.Statement; -import org.openrdf.repository.sail.SailRepositoryConnection; -import org.openrdf.sail.Sail; - - -import kafka.admin.AdminUtils; -import kafka.admin.RackAwareMode; -import kafka.server.KafkaConfig; -import kafka.server.KafkaServer; -import kafka.utils.MockTime; -import kafka.utils.TestUtils; -import kafka.utils.Time; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; -import kafka.zk.EmbeddedZookeeper; - -/** - * The base Integration Test class used for Fluo applications that export to a - * Kakfa topic. - */ -public class KafkaExportITBase extends AccumuloExportITBase { - - protected static final String RYA_INSTANCE_NAME = "test_"; - - private static final String ZKHOST = "127.0.0.1"; - private static final String BROKERHOST = "127.0.0.1"; - private static final String BROKERPORT = "9092"; - private ZkUtils zkUtils; - private KafkaServer kafkaServer; - private EmbeddedZookeeper zkServer; - private ZkClient zkClient; - - // The Rya instance statements are written to that will be fed into the Fluo - // app. - private RyaSailRepository ryaSailRepo = null; - private AccumuloRyaDAO dao = null; - - /** - * Add info about the Kafka queue/topic to receive the export. - */ - @Override - protected void preFluoInitHook() throws Exception { - // Setup the observers that will be used by the Fluo PCJ Application. - final List<ObserverSpecification> observers = new ArrayList<>(); - observers.add(new ObserverSpecification(TripleObserver.class.getName())); - observers.add(new ObserverSpecification(StatementPatternObserver.class.getName())); - observers.add(new ObserverSpecification(JoinObserver.class.getName())); - observers.add(new ObserverSpecification(FilterObserver.class.getName())); - observers.add(new ObserverSpecification(AggregationObserver.class.getName())); - - // Configure the export observer to export new PCJ results to the mini - // accumulo cluster. - final HashMap<String, String> exportParams = new HashMap<>(); - - final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams); - kafkaParams.setExportToKafka(true); - - // Configure the Kafka Producer - final Properties producerConfig = new Properties(); - producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT); - producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer"); - kafkaParams.addAllProducerConfig(producerConfig); - - final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams); - observers.add(exportObserverConfig); - - //create construct query observer and tell it not to export to Kafka - //it will only add results back into Fluo - HashMap<String, String> constructParams = new HashMap<>(); - final KafkaExportParameters kafkaConstructParams = new KafkaExportParameters(constructParams); - kafkaConstructParams.setExportToKafka(true); - - // Configure the Kafka Producer - final Properties constructProducerConfig = new Properties(); - constructProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT); - constructProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - constructProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, RyaSubGraphKafkaSerDe.class.getName()); - kafkaConstructParams.addAllProducerConfig(constructProducerConfig); - - final ObserverSpecification constructExportObserverConfig = new ObserverSpecification(ConstructQueryResultObserver.class.getName(), - constructParams); - observers.add(constructExportObserverConfig); - - // Add the observers to the Fluo Configuration. - super.getFluoConfiguration().addObservers(observers); - } - - /** - * setup mini kafka and call the super to setup mini fluo - */ - @Before - public void setupKafka() throws Exception { - // Install an instance of Rya on the Accumulo cluster. - installRyaInstance(); - - // Setup Kafka. - zkServer = new EmbeddedZookeeper(); - final String zkConnect = ZKHOST + ":" + zkServer.port(); - zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); - zkUtils = ZkUtils.apply(zkClient, false); - - // setup Broker - final Properties brokerProps = new Properties(); - brokerProps.setProperty("zookeeper.connect", zkConnect); - brokerProps.setProperty("broker.id", "0"); - brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); - brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); - final KafkaConfig config = new KafkaConfig(brokerProps); - final Time mock = new MockTime(); - kafkaServer = TestUtils.createServer(config, mock); - } - - @After - public void teardownRya() { - final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster(); - final String instanceName = cluster.getInstanceName(); - final String zookeepers = cluster.getZooKeepers(); - - // Uninstall the instance of Rya. - final RyaClient ryaClient = AccumuloRyaClientFactory.build( - new AccumuloConnectionDetails(ACCUMULO_USER, ACCUMULO_PASSWORD.toCharArray(), instanceName, zookeepers), - super.getAccumuloConnector()); - - try { - ryaClient.getUninstall().uninstall(RYA_INSTANCE_NAME); - // Shutdown the repo. - if(ryaSailRepo != null) {ryaSailRepo.shutDown();} - if(dao != null ) {dao.destroy();} - } catch (Exception e) { - System.out.println("Encountered the following Exception when shutting down Rya: " + e.getMessage()); - } - } - - private void installRyaInstance() throws Exception { - final MiniAccumuloCluster cluster = super.getMiniAccumuloCluster(); - final String instanceName = cluster.getInstanceName(); - final String zookeepers = cluster.getZooKeepers(); - - // Install the Rya instance to the mini accumulo cluster. - final RyaClient ryaClient = AccumuloRyaClientFactory.build( - new AccumuloConnectionDetails(ACCUMULO_USER, ACCUMULO_PASSWORD.toCharArray(), instanceName, zookeepers), - super.getAccumuloConnector()); - - ryaClient.getInstall().install(RYA_INSTANCE_NAME, - InstallConfiguration.builder().setEnableTableHashPrefix(false).setEnableFreeTextIndex(false) - .setEnableEntityCentricIndex(false).setEnableGeoIndex(false).setEnableTemporalIndex(false).setEnablePcjIndex(true) - .setFluoPcjAppName(super.getFluoConfiguration().getApplicationName()).build()); - - // Connect to the Rya instance that was just installed. - final AccumuloRdfConfiguration conf = makeConfig(instanceName, zookeepers); - final Sail sail = RyaSailFactory.getInstance(conf); - dao = RyaSailFactory.getAccumuloDAOWithUpdatedConfig(conf); - ryaSailRepo = new RyaSailRepository(sail); - } - - protected AccumuloRdfConfiguration makeConfig(final String instanceName, final String zookeepers) { - final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); - conf.setTablePrefix(RYA_INSTANCE_NAME); - - // Accumulo connection information. - conf.setAccumuloUser(AccumuloExportITBase.ACCUMULO_USER); - conf.setAccumuloPassword(AccumuloExportITBase.ACCUMULO_PASSWORD); - conf.setAccumuloInstance(super.getAccumuloConnector().getInstance().getInstanceName()); - conf.setAccumuloZookeepers(super.getAccumuloConnector().getInstance().getZooKeepers()); - conf.setAuths(""); - - // PCJ configuration information. - conf.set(ConfigUtils.USE_PCJ, "true"); - conf.set(ConfigUtils.USE_PCJ_UPDATER_INDEX, "true"); - conf.set(ConfigUtils.FLUO_APP_NAME, super.getFluoConfiguration().getApplicationName()); - conf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType.ACCUMULO.toString()); - conf.set(ConfigUtils.PCJ_UPDATER_TYPE, PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType.FLUO.toString()); - - conf.setDisplayQueryPlan(true); - - return conf; - } - - /** - * @return A {@link RyaSailRepository} that is connected to the Rya instance - * that statements are loaded into. - */ - protected RyaSailRepository getRyaSailRepository() throws Exception { - return ryaSailRepo; - } - - /** - * @return A {@link AccumuloRyaDAO} so that RyaStatements with distinct - * visibilities can be added to the Rya Instance - */ - protected AccumuloRyaDAO getRyaDAO() { - return dao; - } - - /** - * Close all the Kafka mini server and mini-zookeeper - */ - @After - public void teardownKafka() { - if(kafkaServer != null) {kafkaServer.shutdown();} - if(zkClient != null) {zkClient.close();} - if(zkServer != null) {zkServer.shutdown();} - } - - /** - * Test kafka without rya code to make sure kafka works in this environment. - * If this test fails then its a testing environment issue, not with Rya. - * Source: https://github.com/asmaier/mini-kafka - */ - @Test - public void embeddedKafkaTest() throws Exception { - // create topic - final String topic = "testTopic"; - AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$); - - // setup producer - final Properties producerProps = new Properties(); - producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); - producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); - producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); - final KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(producerProps); - - // setup consumer - final Properties consumerProps = new Properties(); - consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT); - consumerProps.setProperty("group.id", "group0"); - consumerProps.setProperty("client.id", "consumer0"); - consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); - consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - - // to make sure the consumer starts from the beginning of the topic - consumerProps.put("auto.offset.reset", "earliest"); - - final KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps); - consumer.subscribe(Arrays.asList(topic)); - - // send message - final ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(topic, 42, "test-message".getBytes(StandardCharsets.UTF_8)); - producer.send(data); - producer.close(); - - // starting consumer - final ConsumerRecords<Integer, byte[]> records = consumer.poll(3000); - assertEquals(1, records.count()); - final Iterator<ConsumerRecord<Integer, byte[]>> recordIterator = records.iterator(); - final ConsumerRecord<Integer, byte[]> record = recordIterator.next(); - assertEquals(42, (int) record.key()); - assertEquals("test-message", new String(record.value(), StandardCharsets.UTF_8)); - consumer.close(); - } - - protected KafkaConsumer<Integer, VisibilityBindingSet> makeConsumer(final String TopicName) { - // setup consumer - final Properties consumerProps = new Properties(); - consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERHOST + ":" + BROKERPORT); - consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0"); - consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0"); - consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - "org.apache.kafka.common.serialization.IntegerDeserializer"); - consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - "org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer"); - - // to make sure the consumer starts from the beginning of the topic - consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - final KafkaConsumer<Integer, VisibilityBindingSet> consumer = new KafkaConsumer<>(consumerProps); - consumer.subscribe(Arrays.asList(TopicName)); - return consumer; - } - - protected String loadData(final String sparql, final Collection<Statement> statements) throws Exception { - requireNonNull(sparql); - requireNonNull(statements); - - // Register the PCJ with Rya. - final Instance accInstance = super.getAccumuloConnector().getInstance(); - final Connector accumuloConn = super.getAccumuloConnector(); - - final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails(ACCUMULO_USER, - ACCUMULO_PASSWORD.toCharArray(), accInstance.getInstanceName(), accInstance.getZooKeepers()), accumuloConn); - - final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql); - - // Write the data to Rya. - final SailRepositoryConnection ryaConn = getRyaSailRepository().getConnection(); - ryaConn.begin(); - ryaConn.add(statements); - ryaConn.commit(); - ryaConn.close(); - - // Wait for the Fluo application to finish computing the end result. - super.getMiniFluo().waitForObservers(); - - // The PCJ Id is the topic name the results will be written to. - return pcjId; - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java deleted file mode 100644 index 4eab0f6..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/RyaExportITBase.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.fluo; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -import org.apache.fluo.api.config.ObserverSpecification; -import org.apache.log4j.BasicConfigurator; -import org.apache.log4j.Level; -import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaExportParameters; -import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters; -import org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver; -import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver; -import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; -import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; -import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver; -import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver; -import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver; -import org.junit.BeforeClass; - -/** - * The base Integration Test class used for Fluo applications that export to a Rya PCJ Index. - */ -public class RyaExportITBase extends FluoITBase { - - @BeforeClass - public static void setupLogging() { - BasicConfigurator.configure(); - Logger.getRootLogger().setLevel(Level.ERROR); - } - - @Override - protected void preFluoInitHook() throws Exception { - // Setup the observers that will be used by the Fluo PCJ Application. - final List<ObserverSpecification> observers = new ArrayList<>(); - observers.add(new ObserverSpecification(TripleObserver.class.getName())); - observers.add(new ObserverSpecification(StatementPatternObserver.class.getName())); - observers.add(new ObserverSpecification(JoinObserver.class.getName())); - observers.add(new ObserverSpecification(FilterObserver.class.getName())); - observers.add(new ObserverSpecification(AggregationObserver.class.getName())); - - // Configure the export observer to export new PCJ results to the mini accumulo cluster. - final HashMap<String, String> exportParams = new HashMap<>(); - final RyaExportParameters ryaParams = new RyaExportParameters(exportParams); - ryaParams.setExportToRya(true); - ryaParams.setRyaInstanceName(getRyaInstanceName()); - ryaParams.setAccumuloInstanceName(super.getMiniAccumuloCluster().getInstanceName()); - ryaParams.setZookeeperServers(super.getMiniAccumuloCluster().getZooKeepers()); - ryaParams.setExporterUsername(getUsername()); - ryaParams.setExporterPassword(getPassword()); - - final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams); - observers.add(exportObserverConfig); - - final KafkaExportParameters kafkaParams = new KafkaExportParameters(exportParams); - kafkaParams.setExportToKafka(false); - - final ObserverSpecification constructExportObserverConfig = new ObserverSpecification(ConstructQueryResultObserver.class.getName(), - exportParams); - observers.add(constructExportObserverConfig); - - // Add the observers to the Fluo Configuration. - super.getFluoConfiguration().addObservers(observers); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java index 3a42a23..cb34d06 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/CountStatementsIT.java @@ -29,7 +29,7 @@ import org.apache.fluo.api.client.FluoFactory; import org.apache.fluo.api.config.ObserverSpecification; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.domain.RyaURI; -import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; +import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; import org.junit.Test; import com.google.common.base.Optional; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java index 9a1c285..d5c0e5f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java @@ -30,7 +30,6 @@ import org.apache.accumulo.core.client.Connector; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.rya.api.persist.RyaDAOException; -import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInAccumuloException; import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInFluoException; import org.apache.rya.indexing.pcj.storage.PcjException; @@ -39,6 +38,7 @@ import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; import org.junit.Test; import org.openrdf.query.MalformedQueryException; import org.openrdf.query.QueryEvaluationException; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java index d19646e..965a7b9 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java @@ -31,13 +31,13 @@ import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.domain.RyaURI; -import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; import org.apache.rya.indexing.pcj.storage.PcjMetadata; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; import org.junit.Test; import com.google.common.base.Optional; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java index ec301ba..e3914bd 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIdsIT.java @@ -29,7 +29,7 @@ import org.apache.accumulo.core.client.TableExistsException; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.fluo.api.client.Transaction; -import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; +import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; import org.junit.Test; import com.beust.jcommander.internal.Lists; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java index accabbf..d403404 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAOIT.java @@ -21,12 +21,12 @@ package org.apache.rya.indexing.pcj.fluo.app.query; import static org.junit.Assert.assertEquals; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.fluo.api.client.Snapshot; import org.apache.fluo.api.client.Transaction; -import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; import org.apache.rya.indexing.pcj.fluo.app.ConstructGraph; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationElement; import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata.AggregationType; @@ -34,6 +34,7 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery.QueryType; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; import org.junit.Test; import org.openrdf.query.MalformedQueryException; import org.openrdf.query.algebra.StatementPattern; @@ -42,8 +43,6 @@ import org.openrdf.query.parser.ParsedQuery; import org.openrdf.query.parser.sparql.SPARQLParser; import org.openrdf.repository.RepositoryException; -import com.google.common.base.Optional; - /** * Integration tests the methods of {@link FluoQueryMetadataDAO}. */ @@ -87,8 +86,7 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase { builder.setVarOrder(new VariableOrder("e;f")); builder.setParentNodeId("parentNodeId"); builder.setChildNodeId("childNodeId"); - builder.setOriginalSparql("originalSparql"); - builder.setFilterIndexWithinSparql(2); + builder.setFilterSparql("originalSparql"); final FilterMetadata originalMetadata = builder.build(); try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { @@ -232,11 +230,10 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase { storedMetadata = dao.readAggregationMetadata(sx, "nodeId"); } - // Ensure the deserialized object is the same as the serialized one. - assertEquals(originalMetadata, storedMetadata); } } + @Test public void aggregationMetadataTest_noGroupByVarOrders() { final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); @@ -267,6 +264,41 @@ public class FluoQueryMetadataDAOIT extends RyaExportITBase { assertEquals(originalMetadata, storedMetadata); } } + + @Test + public void periodicQueryMetadataTest() { + final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); + + // Create the object that will be serialized. + PeriodicQueryMetadata originalMetadata = PeriodicQueryMetadata.builder() + .setNodeId("nodeId") + .setParentNodeId("parentNodeId") + .setVarOrder(new VariableOrder("a","b","c")) + .setChildNodeId("childNodeId") + .setPeriod(10) + .setWindowSize(20) + .setUnit(TimeUnit.DAYS) + .setTemporalVariable("a") + .build(); + + + try(FluoClient fluoClient = FluoFactory.newClient(super.getFluoConfiguration())) { + // Write it to the Fluo table. + try(Transaction tx = fluoClient.newTransaction()) { + dao.write(tx, originalMetadata); + tx.commit(); + } + + // Read it from the Fluo table. + PeriodicQueryMetadata storedMetadata = null; + try(Snapshot sx = fluoClient.newSnapshot()) { + storedMetadata = dao.readPeriodicQueryMetadata(sx, "nodeId"); + } + + // Ensure the deserialized object is the same as the serialized one. + assertEquals(originalMetadata, storedMetadata); + } + } @Test public void fluoQueryTest() throws MalformedQueryException { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java new file mode 100644 index 0000000..0cd7cfb --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/BatchDeleteIT.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.integration; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.Snapshot; +import org.apache.fluo.api.client.Transaction; +import org.apache.fluo.api.client.scanner.ColumnScanner; +import org.apache.fluo.api.client.scanner.RowScanner; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.ColumnValue; +import org.apache.fluo.api.data.Span; +import org.apache.fluo.core.client.FluoClientImpl; +import org.apache.log4j.Logger; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; +import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformation.Task; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO; +import org.apache.rya.indexing.pcj.fluo.app.batch.JoinBatchInformation; +import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; +import org.junit.Test; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; + +public class BatchDeleteIT extends RyaExportITBase { + + private static final Logger log = Logger.getLogger(BatchDeleteIT.class); + private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); + + @Test + public void simpleScanDelete() throws Exception { + + final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; " + + " <urn:predicate_2> ?object2 } "; + try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) { + + RyaURI subj = new RyaURI("urn:subject_1"); + RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null); + RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null); + Set<RyaStatement> statements1 = getRyaStatements(statement1, 10); + Set<RyaStatement> statements2 = getRyaStatements(statement2, 10); + + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName()); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain the PCJ. + String queryId = new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()); + + List<String> ids = getNodeIdStrings(fluoClient, queryId); + List<String> prefixes = Arrays.asList("urn:subject_1", "urn:object", "urn:subject_1", "urn:subject_1"); + + // Stream the data into Fluo. + InsertTriples inserter = new InsertTriples(); + inserter.insert(fluoClient, statements1, Optional.<String> absent()); + inserter.insert(fluoClient, statements2, Optional.<String> absent()); + + // Verify the end results of the query match the expected results. + getMiniFluo().waitForObservers(); + + verifyCounts(fluoClient, ids, Arrays.asList(100, 100, 10, 10)); + + createSpanBatches(fluoClient, ids, prefixes, 10); + getMiniFluo().waitForObservers(); + + verifyCounts(fluoClient, ids, Arrays.asList(0, 0, 0, 0)); + } + } + + @Test + public void simpleJoinDelete() throws Exception { + final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; " + + " <urn:predicate_2> ?object2 } "; + try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) { + + RyaURI subj = new RyaURI("urn:subject_1"); + RyaStatement statement1 = new RyaStatement(subj, new RyaURI("urn:predicate_1"), null); + RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null); + Set<RyaStatement> statements1 = getRyaStatements(statement1, 5); + Set<RyaStatement> statements2 = getRyaStatements(statement2, 5); + + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName()); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain the PCJ. + String queryId = new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()); + + List<String> ids = getNodeIdStrings(fluoClient, queryId); + String joinId = ids.get(1); + String rightSp = ids.get(3); + QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("subject", new URIImpl("urn:subject_1")); + bs.addBinding("object1", new URIImpl("urn:object_0")); + VisibilityBindingSet vBs = new VisibilityBindingSet(bs); + Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1")); + VariableOrder varOrder = new VariableOrder(Arrays.asList("subject", "object2")); + + // Stream the data into Fluo. + InsertTriples inserter = new InsertTriples(); + inserter.insert(fluoClient, statements1, Optional.<String> absent()); + inserter.insert(fluoClient, statements2, Optional.<String> absent()); + + getMiniFluo().waitForObservers(); + verifyCounts(fluoClient, ids, Arrays.asList(25, 25, 5, 5)); + + JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1) + .setColumn(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).setSpan(span).setTask(Task.Delete) + .setJoinType(JoinType.NATURAL_JOIN).setSide(Side.LEFT).setBs(vBs).setVarOrder(varOrder).build(); + // Verify the end results of the query match the expected results. + createSpanBatch(fluoClient, joinId, batch); + + getMiniFluo().waitForObservers(); + verifyCounts(fluoClient, ids, Arrays.asList(25, 20, 5, 5)); + } + } + + @Test + public void simpleJoinAdd() throws Exception { + final String sparql = "SELECT ?subject ?object1 ?object2 WHERE { ?subject <urn:predicate_1> ?object1; " + + " <urn:predicate_2> ?object2 } "; + try (FluoClient fluoClient = new FluoClientImpl(getFluoConfiguration())) { + + RyaURI subj = new RyaURI("urn:subject_1"); + RyaStatement statement2 = new RyaStatement(subj, new RyaURI("urn:predicate_2"), null); + Set<RyaStatement> statements2 = getRyaStatements(statement2, 5); + + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getAccumuloConnector(), getRyaInstanceName()); + final String pcjId = pcjStorage.createPcj(sparql); + + // Tell the Fluo app to maintain the PCJ. + String queryId = new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, getAccumuloConnector(), getRyaInstanceName()); + + List<String> ids = getNodeIdStrings(fluoClient, queryId); + String joinId = ids.get(1); + String rightSp = ids.get(3); + QueryBindingSet bs = new QueryBindingSet(); + bs.addBinding("subject", new URIImpl("urn:subject_1")); + bs.addBinding("object1", new URIImpl("urn:object_0")); + VisibilityBindingSet vBs = new VisibilityBindingSet(bs); + Span span = Span.prefix(Bytes.of(rightSp + IncrementalUpdateConstants.NODEID_BS_DELIM + "urn:subject_1")); + VariableOrder varOrder = new VariableOrder(Arrays.asList("subject", "object2")); + + // Stream the data into Fluo. + InsertTriples inserter = new InsertTriples(); + inserter.insert(fluoClient, statements2, Optional.<String> absent()); + + getMiniFluo().waitForObservers(); + verifyCounts(fluoClient, ids, Arrays.asList(0, 0, 0, 5)); + + JoinBatchInformation batch = JoinBatchInformation.builder().setBatchSize(1) + .setColumn(FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET).setSpan(span).setTask(Task.Add) + .setJoinType(JoinType.NATURAL_JOIN).setSide(Side.LEFT).setBs(vBs).setVarOrder(varOrder).build(); + // Verify the end results of the query match the expected results. + createSpanBatch(fluoClient, joinId, batch); + + getMiniFluo().waitForObservers(); + verifyCounts(fluoClient, ids, Arrays.asList(5, 5, 0, 5)); + } + } + + private Set<RyaStatement> getRyaStatements(RyaStatement statement, int numTriples) { + + Set<RyaStatement> statements = new HashSet<>(); + final String subject = "urn:subject_"; + final String predicate = "urn:predicate_"; + final String object = "urn:object_"; + + for (int i = 0; i < numTriples; i++) { + RyaStatement stmnt = new RyaStatement(statement.getSubject(), statement.getPredicate(), statement.getObject()); + if (stmnt.getSubject() == null) { + stmnt.setSubject(new RyaURI(subject + i)); + } + if (stmnt.getPredicate() == null) { + stmnt.setPredicate(new RyaURI(predicate + i)); + } + if (stmnt.getObject() == null) { + stmnt.setObject(new RyaURI(object + i)); + } + statements.add(stmnt); + } + return statements; + } + + private List<String> getNodeIdStrings(FluoClient fluoClient, String queryId) { + List<String> nodeStrings = new ArrayList<>(); + try (Snapshot sx = fluoClient.newSnapshot()) { + FluoQuery query = dao.readFluoQuery(sx, queryId); + nodeStrings.add(queryId); + Collection<JoinMetadata> jMeta = query.getJoinMetadata(); + for (JoinMetadata meta : jMeta) { + nodeStrings.add(meta.getNodeId()); + nodeStrings.add(meta.getLeftChildNodeId()); + nodeStrings.add(meta.getRightChildNodeId()); + } + } + return nodeStrings; + } + + private void createSpanBatches(FluoClient fluoClient, List<String> ids, List<String> prefixes, int batchSize) { + + Preconditions.checkArgument(ids.size() == prefixes.size()); + + try (Transaction tx = fluoClient.newTransaction()) { + for (int i = 0; i < ids.size(); i++) { + String id = ids.get(i); + String bsPrefix = prefixes.get(i); + NodeType type = NodeType.fromNodeId(id).get(); + Column bsCol = type.getResultColumn(); + String row = id + IncrementalUpdateConstants.NODEID_BS_DELIM + bsPrefix; + Span span = Span.prefix(Bytes.of(row)); + BatchInformation batch = SpanBatchDeleteInformation.builder().setBatchSize(batchSize).setColumn(bsCol).setSpan(span) + .build(); + BatchInformationDAO.addBatch(tx, id, batch); + } + tx.commit(); + } + } + + private void createSpanBatch(FluoClient fluoClient, String nodeId, BatchInformation batch) { + try (Transaction tx = fluoClient.newTransaction()) { + BatchInformationDAO.addBatch(tx, nodeId, batch); + tx.commit(); + } + } + + private int countResults(FluoClient fluoClient, String nodeId, Column bsColumn) { + try (Transaction tx = fluoClient.newTransaction()) { + int count = 0; + RowScanner scanner = tx.scanner().over(Span.prefix(nodeId)).fetch(bsColumn).byRow().build(); + Iterator<ColumnScanner> colScanners = scanner.iterator(); + while (colScanners.hasNext()) { + ColumnScanner colScanner = colScanners.next(); + Iterator<ColumnValue> vals = colScanner.iterator(); + while (vals.hasNext()) { + vals.next(); + count++; + } + } + tx.commit(); + return count; + } + } + + private void verifyCounts(FluoClient fluoClient, List<String> ids, List<Integer> expectedCounts) { + Preconditions.checkArgument(ids.size() == expectedCounts.size()); + for (int i = 0; i < ids.size(); i++) { + String id = ids.get(i); + int expected = expectedCounts.get(i); + NodeType type = NodeType.fromNodeId(id).get(); + int count = countResults(fluoClient, id, type.getResultColumn()); + log.trace("NodeId: " + id + " Count: " + count + " Expected: " + expected); + switch (type) { + case STATEMENT_PATTERN: + assertEquals(expected, count); + break; + case JOIN: + assertEquals(expected, count); + break; + case QUERY: + assertEquals(expected, count); + break; + default: + break; + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java index 414fa70..0f2d892 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java @@ -35,8 +35,8 @@ import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Span; import org.apache.rya.api.client.RyaClient; import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; -import org.apache.rya.indexing.pcj.fluo.RyaExportITBase; import org.apache.rya.indexing.pcj.fluo.api.DeletePcj; +import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; import org.junit.Test; import org.openrdf.model.Statement; import org.openrdf.model.ValueFactory;