RYA-377 Fixed the project layout so that it builds even when the geoindexing profile is not enabled.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/92c85ee1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/92c85ee1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/92c85ee1 Branch: refs/heads/master Commit: 92c85ee11030712289df48faed4710359b1b0601 Parents: 923448f Author: kchilton2 <kevin.e.chil...@gmail.com> Authored: Wed Jan 3 16:41:08 2018 -0500 Committer: caleb <caleb.me...@parsons.com> Committed: Tue Jan 9 15:13:01 2018 -0500 ---------------------------------------------------------------------- extras/rya.streams/geo/pom.xml | 71 +++ .../kafka/processors/filter/GeoFilterIT.java | 137 ++++++ extras/rya.streams/integration/pom.xml | 69 +++ .../aggregation/AggregationProcessorIT.java | 457 +++++++++++++++++++ .../processors/filter/FilterProcessorIT.java | 86 ++++ .../processors/filter/FilterProcessorTest.java | 75 +++ .../processors/filter/TemporalFilterIT.java | 231 ++++++++++ .../kafka/processors/join/JoinProcessorIT.java | 316 +++++++++++++ .../projection/MultiProjectionProcessorIT.java | 93 ++++ .../projection/ProjectionProcessorIT.java | 86 ++++ .../sp/StatementPatternProcessorIT.java | 196 ++++++++ extras/rya.streams/kafka-test/pom.xml | 51 +++ .../rya/streams/kafka/RyaStreamsTestUtil.java | 122 +++++ extras/rya.streams/kafka/pom.xml | 32 +- .../apache/rya/streams/kafka/RdfTestUtil.java | 131 ------ .../rya/streams/kafka/RyaStreamsTestUtil.java | 122 ----- .../processors/StatementPatternProcessorIT.java | 195 -------- .../aggregation/AggregationProcessorIT.java | 457 ------------------- .../processors/filter/FilterProcessorIT.java | 86 ---- .../processors/filter/FilterProcessorTest.java | 75 --- .../kafka/processors/filter/GeoFilterIT.java | 137 ------ .../processors/filter/TemporalFilterIT.java | 231 ---------- .../kafka/processors/join/JoinProcessorIT.java | 316 ------------- .../projection/MultiProjectionProcessorIT.java | 92 ---- .../projection/ProjectionProcessorIT.java | 85 ---- extras/rya.streams/pom.xml | 3 + pom.xml | 10 + test/pom.xml | 1 + test/rdf/pom.xml | 59 +++ .../apache/rya/streams/kafka/RdfTestUtil.java | 131 ++++++ 30 files changed, 2212 insertions(+), 1941 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/geo/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.streams/geo/pom.xml b/extras/rya.streams/geo/pom.xml new file mode 100644 index 0000000..2f179d0 --- /dev/null +++ b/extras/rya.streams/geo/pom.xml @@ -0,0 +1,71 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.streams.parent</artifactId> + <version>3.2.12-incubating-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <artifactId>rya.streams.geo-test</artifactId> + + <name>Apache Rya Streams Geo Test</name> + <description> + A module that contains Geo integration tests with Rya Streams. + </description> + + <dependencies> + <!-- Rya dependencies --> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.functions.geo</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.geo.common</artifactId> + <version>${project.version}</version> + </dependency> + + <!-- Test dependencies --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.test.kafka</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.streams.kafka-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java b/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java new file mode 100644 index 0000000..c090afa --- /dev/null +++ b/extras/rya.streams/geo/src/test/java/org/apache/rya/streams/kafka/processors/filter/GeoFilterIT.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.filter; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.function.projection.RandomUUIDFactory; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.indexing.GeoConstants; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.RyaStreamsTestUtil; +import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.topology.TopologyFactory; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.junit.Rule; +import org.junit.Test; +import org.openrdf.model.Resource; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.StatementImpl; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.algebra.evaluation.function.Function; +import org.openrdf.query.algebra.evaluation.function.FunctionRegistry; +import org.openrdf.query.impl.MapBindingSet; + +import com.vividsolutions.jts.geom.Coordinate; +import com.vividsolutions.jts.geom.Geometry; +import com.vividsolutions.jts.geom.GeometryFactory; +import com.vividsolutions.jts.io.WKTWriter; + +/** + * Integration tests the geo methods of {@link FilterProcessor}. + */ +public class GeoFilterIT { + private static final String GEO = "http://www.opengis.net/def/function/geosparql/"; + private static final GeometryFactory GF = new GeometryFactory(); + private static final Geometry ZERO = GF.createPoint(new Coordinate(0, 0)); + private static final Geometry ONE = GF.createPoint(new Coordinate(1, 1)); + + @Rule + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); + + @Test + public void showGeoFunctionsRegistered() { + int count = 0; + final Collection<Function> funcs = FunctionRegistry.getInstance().getAll(); + for (final Function fun : funcs) { + if (fun.getURI().startsWith(GEO)) { + count++; + } + } + + // There are 30 geo functions registered, ensure that there are 30. + assertEquals(30, count); + } + + @Test + public void showProcessorWorks() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Get the RDF model objects that will be used to build the query. + final String sparql = + "PREFIX geo: <http://www.opengis.net/ont/geosparql#>\n" + + "PREFIX geof: <" + GEO + ">\n" + + "SELECT * \n" + + "WHERE { \n" + + " <urn:event1> geo:asWKT ?point .\n" + + " FILTER(geof:sfWithin(?point, \"POLYGON((-3 -2, -3 2, 1 2, 1 -2, -3 -2))\"^^geo:wktLiteral)) " + + "}"; + + // Setup a topology. + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Create the statements that will be input into the query. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = getStatements(); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + final MapBindingSet bs = new MapBindingSet(); + final WKTWriter w = new WKTWriter(); + bs.addBinding("point", vf.createLiteral(w.write(ZERO), GeoConstants.XMLSCHEMA_OGC_WKT)); + expected.add( new VisibilityBindingSet(bs, "a") ); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); + } + + private List<VisibilityStatement> getStatements() throws Exception { + final List<VisibilityStatement> statements = new ArrayList<>(); + // geo 2x2 points + statements.add(new VisibilityStatement(statement(ZERO), "a")); + statements.add(new VisibilityStatement(statement(ONE), "a")); + return statements; + } + + private static Statement statement(final Geometry geo) { + final ValueFactory vf = new ValueFactoryImpl(); + final Resource subject = vf.createURI("urn:event1"); + final URI predicate = GeoConstants.GEO_AS_WKT; + final WKTWriter w = new WKTWriter(); + final Value object = vf.createLiteral(w.write(geo), GeoConstants.XMLSCHEMA_OGC_WKT); + return new StatementImpl(subject, predicate, object); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.streams/integration/pom.xml b/extras/rya.streams/integration/pom.xml new file mode 100644 index 0000000..26ec9f7 --- /dev/null +++ b/extras/rya.streams/integration/pom.xml @@ -0,0 +1,69 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.streams.parent</artifactId> + <version>3.2.12-incubating-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <artifactId>rya.streams.integration</artifactId> + + <name>Apache Rya Streams Kafka Integration Tests</name> + <description> + A module that contains Kafka Integration tests for Rya Streams. + </description> + + <dependencies> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.streams.kafka</artifactId> + </dependency> + + <!-- Test dependencies --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.test.rdf</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.test.kafka</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.streams.kafka-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java new file mode 100644 index 0000000..072469a --- /dev/null +++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/aggregation/AggregationProcessorIT.java @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.aggregation; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.function.projection.RandomUUIDFactory; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.RyaStreamsTestUtil; +import org.apache.rya.streams.kafka.processors.aggregation.AggregationProcessorSupplier.AggregationProcessor; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.topology.TopologyFactory; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.junit.Rule; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.impl.MapBindingSet; + +/** + * Integration tests {@link AggregationProcessor}. + */ +public class AggregationProcessorIT { + + @Rule + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(false); + + @Test + public void count() throws Exception { + // A query that figures out how many books each person has. + final String sparql = + "SELECT ?person (count(?book) as ?bookCount) " + + "WHERE { " + + "?person <urn:hasBook> ?book " + + "} GROUP BY ?person"; + + // Create the statements that will be input into the query.. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 1")), "a")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 1")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasBook"), vf.createLiteral("Book 2")), "b")); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("bookCount", vf.createLiteral("1", XMLSchema.INTEGER)); + expected.add(new VisibilityBindingSet(bs, "a")); + + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Bob")); + bs.addBinding("bookCount", vf.createLiteral("1", XMLSchema.INTEGER)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("bookCount", vf.createLiteral("2", XMLSchema.INTEGER)); + expected.add(new VisibilityBindingSet(bs, "a&b")); + + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Setup a topology. + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); + } + + @Test + public void sum() throws Exception { + // A query that figures out how much food each person has. + final String sparql = + "SELECT ?person (sum(?foodCount) as ?totalFood) " + + "WHERE { " + + "?person <urn:hasFoodType> ?food . " + + "?food <urn:count> ?foodCount . " + + "} GROUP BY ?person"; + + // Create the statements that will be input into the query.. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasFoodType"), vf.createURI("urn:corn")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasFoodType"), vf.createURI("urn:apple")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:corn"), vf.createURI("urn:count"), vf.createLiteral(4)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:apple"), vf.createURI("urn:count"), vf.createLiteral(3)), "")); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("totalFood", vf.createLiteral("4", XMLSchema.INTEGER)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("totalFood", vf.createLiteral("7", XMLSchema.INTEGER)); + expected.add(new VisibilityBindingSet(bs, "")); + + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Setup a topology. + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); + } + + @Test + public void average() throws Exception { + // A query that figures out the average age across all people. + final String sparql = + "SELECT (avg(?age) as ?avgAge) " + + "WHERE { " + + "?person <urn:age> ?age " + + "}"; + + // Create the statements that will be input into the query.. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(3)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(7)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(2)), "")); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("avgAge", vf.createLiteral("3", XMLSchema.DECIMAL)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("avgAge", vf.createLiteral("5", XMLSchema.DECIMAL)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("avgAge", vf.createLiteral("4", XMLSchema.DECIMAL)); + expected.add(new VisibilityBindingSet(bs, "")); + + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Setup a topology. + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); + } + + @Test + public void min() throws Exception { + // A query that figures out what the youngest age is across all people. + final String sparql = + "SELECT (min(?age) as ?youngest) " + + "WHERE { " + + "?person <urn:age> ?age " + + "}"; + + // Create the statements that will be input into the query.. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), "")); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("youngest", vf.createLiteral(13)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("youngest", vf.createLiteral(7)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("youngest", vf.createLiteral(5)); + expected.add(new VisibilityBindingSet(bs, "")); + + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Setup a topology. + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); + } + + @Test + public void max() throws Exception { + // A query that figures out what the oldest age is across all people. + final String sparql = + "SELECT (max(?age) as ?oldest) " + + "WHERE { " + + "?person <urn:age> ?age " + + "}"; + + // Create the statements that will be input into the query.. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), "")); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("oldest", vf.createLiteral(13)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("oldest", vf.createLiteral(14)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("oldest", vf.createLiteral(25)); + expected.add(new VisibilityBindingSet(bs, "")); + + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Setup a topology. + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); + } + + @Test + public void multipleGroupByVars() throws Exception { + // A query that contains more than one group by variable. + final String sparql = + "SELECT ?business ?employee (sum(?hours) AS ?totalHours) " + + "WHERE {" + + "?employee <urn:worksAt> ?business . " + + "?business <urn:hasTimecardId> ?timecardId . " + + "?employee <urn:hasTimecardId> ?timecardId . " + + "?timecardId <urn:hours> ?hours . " + + "} GROUP BY ?business ?employee"; + + // Create the statements that will be input into the query. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoint")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard1")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard1")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:timecard1"), vf.createURI("urn:hours"), vf.createLiteral(40)), "")); + + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard2")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard2")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:timecard2"), vf.createURI("urn:hours"), vf.createLiteral(25)), "")); + + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoJoint")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard3")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard3")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:timecard3"), vf.createURI("urn:hours"), vf.createLiteral(28)), "")); + + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:TacoJoint"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard4")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard4")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:timecard4"), vf.createURI("urn:hours"), vf.createLiteral(28)), "")); + + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:CoffeeShop"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard5")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:hasTimecardId"), vf.createURI("urn:timecard5")), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:timecard5"), vf.createURI("urn:hours"), vf.createLiteral(12)), "")); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("business", vf.createURI("urn:TacoJoint")); + bs.addBinding("employee", vf.createURI("urn:Alice")); + bs.addBinding("totalHours", vf.createLiteral("40", XMLSchema.INTEGER)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("business", vf.createURI("urn:TacoJoint")); + bs.addBinding("employee", vf.createURI("urn:Alice")); + bs.addBinding("totalHours", vf.createLiteral("65", XMLSchema.INTEGER)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("business", vf.createURI("urn:TacoJoint")); + bs.addBinding("employee", vf.createURI("urn:Bob")); + bs.addBinding("totalHours", vf.createLiteral("28", XMLSchema.INTEGER)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("business", vf.createURI("urn:TacoJoint")); + bs.addBinding("employee", vf.createURI("urn:Bob")); + bs.addBinding("totalHours", vf.createLiteral("56", XMLSchema.INTEGER)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("business", vf.createURI("urn:CoffeeShop")); + bs.addBinding("employee", vf.createURI("urn:Alice")); + bs.addBinding("totalHours", vf.createLiteral("12", XMLSchema.INTEGER)); + expected.add(new VisibilityBindingSet(bs, "")); + + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Setup a topology. + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); + } + + @Test + public void multipleAggregations() throws Exception { + // A query that figures out what the youngest and oldest ages are across all people. + final String sparql = + "SELECT (min(?age) as ?youngest) (max(?age) as ?oldest) " + + "WHERE { " + + "?person <urn:age> ?age " + + "}"; + + // Create the statements that will be input into the query.. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(13)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(14)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:age"), vf.createLiteral(7)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:age"), vf.createLiteral(5)), "")); + statements.add(new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:age"), vf.createLiteral(25)), "")); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("youngest", vf.createLiteral(13)); + bs.addBinding("oldest", vf.createLiteral(13)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("youngest", vf.createLiteral(13)); + bs.addBinding("oldest", vf.createLiteral(14)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("youngest", vf.createLiteral(7)); + bs.addBinding("oldest", vf.createLiteral(14)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("youngest", vf.createLiteral(5)); + bs.addBinding("oldest", vf.createLiteral(14)); + expected.add(new VisibilityBindingSet(bs, "")); + + bs = new MapBindingSet(); + bs.addBinding("youngest", vf.createLiteral(5)); + bs.addBinding("oldest", vf.createLiteral(25)); + expected.add(new VisibilityBindingSet(bs, "")); + + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Setup a topology. + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java new file mode 100644 index 0000000..aaa67ea --- /dev/null +++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorIT.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.filter; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.function.projection.RandomUUIDFactory; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.RyaStreamsTestUtil; +import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.topology.TopologyFactory; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.junit.Rule; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.impl.MapBindingSet; + +/** + * Integration tests the methods of {@link FilterProcessor}. + */ +public class FilterProcessorIT { + + @Rule + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); + + @Test + public void showProcessorWorks() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Get the RDF model objects that will be used to build the query. + final String sparql = + "SELECT * " + + "WHERE { " + + "FILTER(?age < 10)" + + "?person <urn:age> ?age " + + "}"; + + // Setup a topology. + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Create the statements that will be input into the query. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add(new VisibilityStatement(vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:age"), vf.createLiteral(11)), "a")); + statements.add(new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:age"), vf.createLiteral(9)), "a")); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + final MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("age", vf.createLiteral(9)); + expected.add( new VisibilityBindingSet(bs, "a") ); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java new file mode 100644 index 0000000..3ff8e8d --- /dev/null +++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/FilterProcessorTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.filter; + +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.rya.api.function.filter.FilterEvaluator; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.streams.kafka.RdfTestUtil; +import org.apache.rya.streams.kafka.processors.ProcessorResult; +import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult; +import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.algebra.Filter; +import org.openrdf.query.impl.MapBindingSet; + +/** + * Unit tests the methods of {@link FilterProcessor}. + */ +public class FilterProcessorTest { + + @Test + public void showFilterFunctionIsCalled() throws Exception { + // Read the filter object from a SPARQL query. + final Filter filter = RdfTestUtil.getFilter( + "SELECT * " + + "WHERE { " + + "FILTER(?age < 10)" + + "?person <urn:age> ?age " + + "}"); + + // Create a Binding Set that will be passed into the Filter function based on the where clause. + final ValueFactory vf = new ValueFactoryImpl(); + final MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("age", vf.createLiteral(9)); + final VisibilityBindingSet inputVisBs = new VisibilityBindingSet(bs, "a"); + + // Mock the processor context that will be invoked. + final ProcessorContext context = mock(ProcessorContext.class); + + // Run the test. + final FilterProcessor processor = new FilterProcessor( + FilterEvaluator.make(filter), + result -> ProcessorResult.make(new UnaryResult(result))); + processor.init(context); + processor.process("key", ProcessorResult.make(new UnaryResult(inputVisBs))); + + // Verify the binding set was passed through. + verify(context, times(1)).forward(eq("key"), eq(ProcessorResult.make(new UnaryResult(inputVisBs)))); + + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java new file mode 100644 index 0000000..22a883b --- /dev/null +++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/filter/TemporalFilterIT.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.filter; + +import static org.junit.Assert.assertEquals; + +import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.function.projection.RandomUUIDFactory; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.RyaStreamsTestUtil; +import org.apache.rya.streams.kafka.processors.filter.FilterProcessorSupplier.FilterProcessor; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.topology.TopologyFactory; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.junit.Rule; +import org.junit.Test; +import org.openrdf.model.Resource; +import org.openrdf.model.Statement; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.StatementImpl; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.algebra.evaluation.function.Function; +import org.openrdf.query.algebra.evaluation.function.FunctionRegistry; +import org.openrdf.query.impl.MapBindingSet; + +/** + * Integration tests the temporal methods of {@link FilterProcessor}. + */ +public class TemporalFilterIT { + private static final ValueFactory vf = new ValueFactoryImpl(); + private static final String TEMPORAL = "http://rya.apache.org/ns/temporal"; + private static final ZonedDateTime TIME = ZonedDateTime.parse("2015-12-30T12:00:00Z"); + private static final ZonedDateTime TIME_10 = ZonedDateTime.parse("2015-12-30T12:00:10Z"); + private static final ZonedDateTime TIME_20 = ZonedDateTime.parse("2015-12-30T12:00:20Z"); + + @Rule + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(false); + + @Test + public void temporalFunctionsRegistered() { + int count = 0; + final Collection<Function> funcs = FunctionRegistry.getInstance().getAll(); + for (final Function fun : funcs) { + if (fun.getURI().startsWith(TEMPORAL)) { + count++; + } + } + + // There are 4 temporal functions registered, ensure that there are 4. + assertEquals(4, count); + } + + @Test + public void showEqualsWorks() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Get the RDF model objects that will be used to build the query. + final String sparql = + "PREFIX time: <http://www.w3.org/2006/time/> \n" + + "PREFIX tempf: <http://rya.apache.org/ns/temporal/>\n" + + "SELECT * \n" + + "WHERE { \n" + + " <urn:time> time:atTime ?date .\n" + + " FILTER(tempf:equals(?date, \"" + TIME.toString() + "\")) " + + "}"; + // Setup a topology. + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Create the statements that will be input into the query. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = getStatements(); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + final MapBindingSet bs = new MapBindingSet(); + bs.addBinding("date", vf.createLiteral(TIME.toString())); + expected.add( new VisibilityBindingSet(bs, "a") ); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); + } + + @Test + public void showBeforeWorks() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Get the RDF model objects that will be used to build the query. + final String sparql = + "PREFIX time: <http://www.w3.org/2006/time/> \n" + + "PREFIX tempf: <http://rya.apache.org/ns/temporal/>\n" + + "SELECT * \n" + + "WHERE { \n" + + " <urn:time> time:atTime ?date .\n" + + " FILTER(tempf:before(?date, \"" + TIME_10.toString() + "\")) " + + "}"; + // Setup a topology. + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Create the statements that will be input into the query. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = getStatements(); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + final MapBindingSet bs = new MapBindingSet(); + bs.addBinding("date", vf.createLiteral(TIME.toString())); + expected.add( new VisibilityBindingSet(bs, "a") ); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); + } + + @Test + public void showAfterWorks() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Get the RDF model objects that will be used to build the query. + final String sparql = + "PREFIX time: <http://www.w3.org/2006/time/> \n" + + "PREFIX tempf: <http://rya.apache.org/ns/temporal/>\n" + + "SELECT * \n" + + "WHERE { \n" + + " <urn:time> time:atTime ?date .\n" + + " FILTER(tempf:after(?date, \"" + TIME_10.toString() + "\")) " + + "}"; + // Setup a topology. + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Create the statements that will be input into the query. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = getStatements(); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + final MapBindingSet bs = new MapBindingSet(); + bs.addBinding("date", vf.createLiteral(TIME_20.toString())); + expected.add( new VisibilityBindingSet(bs, "a") ); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); + } + + @Test + public void showWithinWorks() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Get the RDF model objects that will be used to build the query. + final String sparql = + "PREFIX time: <http://www.w3.org/2006/time/> \n" + + "PREFIX tempf: <http://rya.apache.org/ns/temporal/>\n" + + "SELECT * \n" + + "WHERE { \n" + + " <urn:time> time:atTime ?date .\n" + + " FILTER(tempf:within(?date, \"" + TIME.toString() + "/" + TIME_20.toString() + "\")) " + + "}"; + // Setup a topology. + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Create the statements that will be input into the query. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = getStatements(); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + final MapBindingSet bs = new MapBindingSet(); + bs.addBinding("date", vf.createLiteral(TIME_10.toString())); + expected.add( new VisibilityBindingSet(bs, "a") ); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); + } + + private List<VisibilityStatement> getStatements() throws Exception { + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add(new VisibilityStatement(statement(TIME), "a")); + statements.add(new VisibilityStatement(statement(TIME_10), "a")); + statements.add(new VisibilityStatement(statement(TIME_20), "a")); + return statements; + } + + private static Statement statement(final ZonedDateTime time) { + final Resource subject = vf.createURI("urn:time"); + final URI predicate = vf.createURI("http://www.w3.org/2006/time/atTime"); + final Value object = vf.createLiteral(time.toString()); + return new StatementImpl(subject, predicate, object); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java new file mode 100644 index 0000000..bdb9be6 --- /dev/null +++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/join/JoinProcessorIT.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.join; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.function.join.NaturalJoin; +import org.apache.rya.api.function.projection.RandomUUIDFactory; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.RyaStreamsTestUtil; +import org.apache.rya.streams.kafka.processors.ProcessorResult; +import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult; +import org.apache.rya.streams.kafka.processors.join.JoinProcessorSupplier.JoinProcessor; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.topology.TopologyFactory; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.junit.Rule; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.impl.MapBindingSet; + +import com.google.common.collect.Lists; + +/** + * Integration tests the methods of {@link JoinProcessor}. + */ +public class JoinProcessorIT { + + @Rule + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); + + @Test(expected = IllegalArgumentException.class) + public void badAllVars() throws IllegalArgumentException { + new JoinProcessorSupplier( + "NATURAL_JOIN", + new NaturalJoin(), + Lists.newArrayList("employee"), + Lists.newArrayList("person", "employee", "business"), + result -> ProcessorResult.make( new UnaryResult(result) )); + } + + @Test + public void newLeftResult() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Setup a topology. + final String query = + "SELECT * WHERE { " + + "?person <urn:talksTo> ?employee ." + + "?employee <urn:worksAt> ?business" + + " }"; + final TopologyFactory factory = new TopologyFactory(); + final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Create some statements that generate a bunch of right SP results. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "a&b") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "b") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "b|c") ); + + // Add a statement that will generate a left result that joins with some of those right results. + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "c") ); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("employee", vf.createURI("urn:Bob")); + bs.addBinding("business", vf.createURI("urn:TacoPlace")); + expected.add( new VisibilityBindingSet(bs, "a&b&c") ); + + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("employee", vf.createURI("urn:Bob")); + bs.addBinding("business", vf.createURI("urn:BurgerJoint")); + expected.add( new VisibilityBindingSet(bs, "c&(b|c)") ); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); + } + + @Test + public void newRightResult() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Setup a topology. + final String query = + "SELECT * WHERE { " + + "?person <urn:talksTo> ?employee ." + + "?employee <urn:worksAt> ?business" + + " }"; + final TopologyFactory factory = new TopologyFactory(); + final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Create some statements that generate a bunch of right SP results. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "a&b") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "b") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "b|c") ); + + // Add a statement that will generate a left result that joins with some of those right results. + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "c") ); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("employee", vf.createURI("urn:Bob")); + bs.addBinding("business", vf.createURI("urn:TacoPlace")); + expected.add( new VisibilityBindingSet(bs, "a&b&c") ); + + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("employee", vf.createURI("urn:Bob")); + bs.addBinding("business", vf.createURI("urn:BurgerJoint")); + expected.add( new VisibilityBindingSet(bs, "c&(b|c)") ); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); + } + + @Test + public void newResultsBothSides() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Setup a topology. + final String query = + "SELECT * WHERE { " + + "?person <urn:talksTo> ?employee ." + + "?employee <urn:worksAt> ?business" + + " }"; + final TopologyFactory factory = new TopologyFactory(); + final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Create some statements that generate a bunch of right SP results. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "a&b") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "c") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Eve"), vf.createURI("urn:worksAt"), vf.createURI("urn:CoffeeShop")), "b") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "b|c") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:talksTo"), vf.createURI("urn:Charlie")), "c") ); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("employee", vf.createURI("urn:Bob")); + bs.addBinding("business", vf.createURI("urn:TacoPlace")); + expected.add( new VisibilityBindingSet(bs, "a&b&c") ); + + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("employee", vf.createURI("urn:Bob")); + bs.addBinding("business", vf.createURI("urn:BurgerJoint")); + expected.add( new VisibilityBindingSet(bs, "c&(b|c)") ); + + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Bob")); + bs.addBinding("employee", vf.createURI("urn:Charlie")); + bs.addBinding("business", vf.createURI("urn:BurgerJoint")); + expected.add( new VisibilityBindingSet(bs, "a&c") ); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); + } + + @Test + public void manyJoins() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Setup a topology. + final String query = + "SELECT * WHERE { " + + "?person <urn:talksTo> ?employee ." + + "?employee <urn:worksAt> ?business ." + + "?employee <urn:hourlyWage> ?wage ." + + " }"; + final TopologyFactory factory = new TopologyFactory(); + final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Create some statements that generate a bunch of right SP results. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "a") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:hourlyWage"), vf.createLiteral(7.25)), "a") ); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + final MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("employee", vf.createURI("urn:Bob")); + bs.addBinding("business", vf.createURI("urn:BurgerJoint")); + bs.addBinding("wage", vf.createLiteral(7.25)); + expected.add( new VisibilityBindingSet(bs, "a") ); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); + } + + @Test + public void leftJoin() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Setup a topology. + final String query = + "SELECT * WHERE { " + + "?person <urn:talksTo> ?employee ." + + "OPTIONAL{ ?employee <urn:worksAt> ?business } " + + " }"; + final TopologyFactory factory = new TopologyFactory(); + final TopologyBuilder builder = factory.build(query, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Create some statements that generate a result that includes the optional value as well as one that does not. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:worksAt"), vf.createURI("urn:TacoPlace")), "b") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:Bob"), vf.createURI("urn:talksTo"), vf.createURI("urn:Charlie")), "c") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:David"), vf.createURI("urn:worksAt"), vf.createURI("urn:BurgerJoint")), "d") ); + + // Make the expected results. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + MapBindingSet bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("employee", vf.createURI("urn:Bob")); + expected.add( new VisibilityBindingSet(bs, "a") ); + + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Alice")); + bs.addBinding("employee", vf.createURI("urn:Bob")); + bs.addBinding("business", vf.createURI("urn:TacoPlace")); + expected.add( new VisibilityBindingSet(bs, "a&b") ); + + bs = new MapBindingSet(); + bs.addBinding("person", vf.createURI("urn:Bob")); + bs.addBinding("employee", vf.createURI("urn:Charlie")); + expected.add( new VisibilityBindingSet(bs, "c") ); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityBindingSetDeserializer.class); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java new file mode 100644 index 0000000..a560294 --- /dev/null +++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/MultiProjectionProcessorIT.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.projection; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.RyaStreamsTestUtil; +import org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier; +import org.apache.rya.streams.kafka.processors.projection.MultiProjectionProcessorSupplier.MultiProjectionProcessor; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; +import org.apache.rya.streams.kafka.topology.TopologyFactory; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.junit.Rule; +import org.junit.Test; +import org.openrdf.model.BNode; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.model.vocabulary.RDF; + +/** + * Integration tests the methods of {@link MultiProjectionProcessor}. + */ +public class MultiProjectionProcessorIT { + + @Rule + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); + + @Test + public void showProcessorWorks() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Create a topology for the Query that will be tested. + final String sparql = + "CONSTRUCT {" + + "_:b a <urn:movementObservation> ; " + + "<urn:location> ?location ; " + + "<urn:direction> ?direction ; " + + "}" + + "WHERE {" + + "?thing <urn:corner> ?location ." + + "?thing <urn:compass> ?direction." + + "}"; + + final String bNodeId = UUID.randomUUID().toString(); + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, () -> bNodeId); + + // Create the statements that will be input into the query. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:car1"), vf.createURI("urn:compass"), vf.createURI("urn:NW")), "a") ); + statements.add( new VisibilityStatement( + vf.createStatement(vf.createURI("urn:car1"), vf.createURI("urn:corner"), vf.createURI("urn:corner1")), "a") ); + + // Make the expected results. + final Set<VisibilityStatement> expected = new HashSet<>(); + final BNode blankNode = vf.createBNode(bNodeId); + + expected.add(new VisibilityStatement(vf.createStatement(blankNode, RDF.TYPE, vf.createURI("urn:movementObservation")), "a")); + expected.add(new VisibilityStatement(vf.createStatement(blankNode, vf.createURI("urn:direction"), vf.createURI("urn:NW")), "a")); + expected.add(new VisibilityStatement(vf.createStatement(blankNode, vf.createURI("urn:location"), vf.createURI("urn:corner1")), "a")); + + // Run the test. + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, expected, VisibilityStatementDeserializer.class); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/92c85ee1/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java new file mode 100644 index 0000000..322bba9 --- /dev/null +++ b/extras/rya.streams/integration/src/test/java/org/apache/rya/streams/kafka/processors/projection/ProjectionProcessorIT.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.processors.projection; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.function.projection.RandomUUIDFactory; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.model.VisibilityStatement; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.RyaStreamsTestUtil; +import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier; +import org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.topology.TopologyFactory; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.junit.Rule; +import org.junit.Test; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.query.impl.MapBindingSet; + +import com.google.common.collect.Sets; + +/** + * Integration tests the methods of {@link StatementPatternProcessor}. + */ +public class ProjectionProcessorIT { + + @Rule + public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true); + + @Test + public void showProcessorWorks() throws Exception { + // Enumerate some topics that will be re-used + final String ryaInstance = UUID.randomUUID().toString(); + final UUID queryId = UUID.randomUUID(); + final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); + final String resultsTopic = KafkaTopics.queryResultsTopic(queryId); + + // Create a topology for the Query that will be tested. + final String sparql = + "SELECT (?person AS ?p) ?otherPerson " + + "WHERE { " + + "?person <urn:talksTo> ?otherPerson . " + + "}"; + + final TopologyBuilder builder = new TopologyFactory().build(sparql, statementsTopic, resultsTopic, new RandomUUIDFactory()); + + // Load some data into the input topic. + final ValueFactory vf = new ValueFactoryImpl(); + final List<VisibilityStatement> statements = new ArrayList<>(); + statements.add( new VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") ); + + // Show the correct binding set results from the job. + final Set<VisibilityBindingSet> expected = new HashSet<>(); + + final MapBindingSet expectedBs = new MapBindingSet(); + expectedBs.addBinding("p", vf.createURI("urn:Alice")); + expectedBs.addBinding("otherPerson", vf.createURI("urn:Bob")); + expected.add(new VisibilityBindingSet(expectedBs, "a")); + + RyaStreamsTestUtil.runStreamProcessingTest(kafka, statementsTopic, resultsTopic, builder, statements, Sets.newHashSet(expected), VisibilityBindingSetDeserializer.class); + } +} \ No newline at end of file