RYA-377 Implement the StatementPatternProcessor for the Rya Streams project.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/653e4b83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/653e4b83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/653e4b83

Branch: refs/heads/master
Commit: 653e4b83ac2fecac0c1b2107fbcdffe7bc357371
Parents: 0ad2c51
Author: kchilton2 <kevin.e.chil...@gmail.com>
Authored: Mon Nov 6 16:39:35 2017 -0500
Committer: caleb <caleb.me...@parsons.com>
Committed: Tue Jan 9 15:13:00 2018 -0500

----------------------------------------------------------------------
 common/rya.api.function/pom.xml                 |  11 +-
 .../function/sp/StatementPatternMatcher.java    | 142 ++++++++++
 .../sp/StatementPatternMatcherTest.java         | 272 +++++++++++++++++++
 .../streams/api/interactor/LoadStatements.java  |  11 +-
 .../client/command/LoadStatementsCommand.java   |   2 +-
 extras/rya.streams/kafka/pom.xml                |   4 +
 .../kafka/interactor/KafkaLoadStatements.java   |  13 +-
 .../RyaStreamsSinkFormatterSupplier.java        |  83 ++++++
 .../StatementPatternProcessorSupplier.java      | 121 +++++++++
 .../apache/rya/streams/kafka/KafkaTestUtil.java | 127 +++++++++
 .../apache/rya/streams/kafka/RdfTestUtil.java   |  62 +++++
 .../interactor/KafkaGetQueryResultStreamIT.java |  20 +-
 .../kafka/interactor/KafkaLoadStatementsIT.java |  61 ++---
 .../processors/StatementPatternProcessorIT.java | 135 +++++++++
 .../kafka/queries/KafkaQueryChangeLogIT.java    |  26 +-
 .../VisibilityBindingSetKafkaIT.java            |  44 ++-
 .../VisibilityStatementKafkaIT.java             |  44 ++-
 17 files changed, 1039 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/common/rya.api.function/pom.xml
----------------------------------------------------------------------
diff --git a/common/rya.api.function/pom.xml b/common/rya.api.function/pom.xml
index f05dd6f..ce88e36 100644
--- a/common/rya.api.function/pom.xml
+++ b/common/rya.api.function/pom.xml
@@ -31,6 +31,7 @@ under the License.
     <name>Apache Rya Common API - Functions</name>
 
     <dependencies>
+        <!-- Rya dependencies. -->        
         <dependency>
             <groupId>org.apache.rya</groupId>
             <artifactId>rya.api.model</artifactId>
@@ -38,15 +39,23 @@ under the License.
 
         <!-- Third Party Dependencies -->
         <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-queryalgebra-model</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.openrdf.sesame</groupId>
+            <artifactId>sesame-queryalgebra-evaluation</artifactId>
+        </dependency>
+        <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
-
         <dependency>
             <groupId>com.github.stephenc.findbugs</groupId>
             <artifactId>findbugs-annotations</artifactId>
         </dependency>
 
+        <!-- Testing dependencies. -->
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/common/rya.api.function/src/main/java/org/apache/rya/api/function/sp/StatementPatternMatcher.java
----------------------------------------------------------------------
diff --git 
a/common/rya.api.function/src/main/java/org/apache/rya/api/function/sp/StatementPatternMatcher.java
 
b/common/rya.api.function/src/main/java/org/apache/rya/api/function/sp/StatementPatternMatcher.java
new file mode 100644
index 0000000..208f8d1
--- /dev/null
+++ 
b/common/rya.api.function/src/main/java/org/apache/rya/api/function/sp/StatementPatternMatcher.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.sp;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+
+import org.openrdf.model.Statement;
+import org.openrdf.model.Value;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Matches {@link Statement}s against a {@link StatementPattern} and returns 
{@link BindingSet}s
+ * when the statement matched the pattern.
+ */
+@DefaultAnnotation(NonNull.class)
+public class StatementPatternMatcher {
+
+    private final StatementPattern pattern;
+
+    /**
+     * Constructs an instance of {@link StatementPatternMatcher}.
+     *
+     * @param pattern - The pattern that will be matched against. (not null)
+     */
+    public StatementPatternMatcher(final StatementPattern pattern) {
+        this.pattern = requireNonNull(pattern);
+    }
+
+    /**
+     * Matches a {@link Statement} against the provided {@link 
StatementPattern} and returns a {@link BindingSet}
+     * if the statement matched the pattern.
+     *
+     * @param statement - The statement that will be matched against the 
pattern. (not null)
+     * @return A {@link BinidngSet} containing the statement's values filled 
in for the pattern's variables if
+     *   the statement's values match the pattern's constants; otherwise empty.
+     */
+    public Optional<BindingSet> match(final Statement statement) {
+        requireNonNull(statement);
+
+        // Setup the resulting binding set that could be built from this 
Statement.
+        final QueryBindingSet bs = new QueryBindingSet();
+
+        if(matchesValue(pattern.getSubjectVar(), statement.getSubject(), bs) &&
+                matchesValue(pattern.getPredicateVar(), 
statement.getPredicate(), bs) &&
+                matchesValue(pattern.getObjectVar(), statement.getObject(), 
bs) &&
+                matchesContext(pattern.getContextVar(), 
statement.getContext(), bs)) {
+            return Optional.of(bs);
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    /**
+     * The following table describes how a Subject, Predicate, and Object Var 
may be handled for a Statement and a
+     * Statement Pattern:
+     * <table border=1>
+     *     <tr> <th>Pattern's var is constant</th> <th>Effect on resulting 
BS</th> </tr>
+     *     <try> <td>yes</td> <td>Emit a BS if they match, no Context 
binding</td> </tr>
+     *     <try> <td>no</td>  <td>Emit a BS with a binding for the 
variable</td> </tr>
+     * </table>
+     *
+     * @param var - The statement pattern variable that is being matched. (not 
null)
+     * @param stmtValue - The statement's value for the variable. (not null)
+     * @param bs - The binding set that may be updated to include a binding 
for the variable. (not null)
+     * @return {@code true} if he variable and the statement value match, 
otherwise {@code false},
+     */
+    private boolean matchesValue(final Var var, final Value stmtValue, final 
QueryBindingSet bs) {
+        requireNonNull(var);
+        requireNonNull(stmtValue);
+        requireNonNull(bs);
+
+        // If the var is a constant, statement's value must match the var's 
value.
+        if(var.isConstant()) {
+            if(!stmtValue.equals(var.getValue())) {
+                return false;
+            }
+        } else {
+            // Otherwise it is a variable to be filled in.
+            bs.addBinding(var.getName(), stmtValue);
+        }
+
+        // Either the value matched the constant or the binding set was 
updated.
+        return true;
+    }
+
+    /**
+     * The following table describes how Context may be handled for a 
Statement and a Statement Pattern:
+     * <table border=1>
+     *   <tr> <th>Pattern's context state</th> <th>Statement has a context 
value</th> <th>Effect on resulting BS</th></tr>
+     *   <tr> <td>not mentioned</td>  <td>yes</td> <td>Emit BS without a 
Context binding</td> </tr>
+     *   <tr> <td>not mentioned</td>  <td>no</td>  <td>Emit BS without a 
Context binding</td> </tr>
+     *   <tr> <td>has a constant</td> <td>yes</td> <td>Emit BS if they match, 
no Context binding</td> </tr>
+     *   <tr> <td>has a constant</td> <td>no</td>  <td>Do not emit a BS</td> 
</tr>
+     *   <tr> <td>has a variable</td> <td>yes</td> <td>Emit BS with Context 
binding</td> </tr>
+     *   <tr> <td>has a variable</td> <td>no</td>  <td>Do not emit a BS</td> 
</tr>
+     * </table>
+     *
+     * @param cntxVar - The statement pattern's context variable. This may be 
{@code null} when there is no context
+     *   specified for the pattern.
+     * @param stmtCntx - The statement's context value. This may be {@code 
null} when there was no context
+     *   specified within the statement.
+     * @param bs - The binding set that may be updated to include a context 
binding. (not null)
+     * @return {@code true} if the the pattern's context variable and 
statement's context matched, otherwise {@code false}.
+     */
+    private boolean matchesContext(@Nullable final Var cntxVar, @Nullable 
final Value stmtCntx, final QueryBindingSet bs) {
+        if(cntxVar == null) {
+            // If there is no context, automatically matches.
+            return true;
+        } else if(stmtCntx == null) {
+            // If no value was provided within the statement, then it does not 
match.
+            return false;
+        } else {
+            // Otherwise handle it like a normal variable.
+            return matchesValue(cntxVar, stmtCntx, bs);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/common/rya.api.function/src/test/java/org/apache/rya/api/function/sp/StatementPatternMatcherTest.java
----------------------------------------------------------------------
diff --git 
a/common/rya.api.function/src/test/java/org/apache/rya/api/function/sp/StatementPatternMatcherTest.java
 
b/common/rya.api.function/src/test/java/org/apache/rya/api/function/sp/StatementPatternMatcherTest.java
new file mode 100644
index 0000000..78a5418
--- /dev/null
+++ 
b/common/rya.api.function/src/test/java/org/apache/rya/api/function/sp/StatementPatternMatcherTest.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.function.sp;
+
+import static java.util.Objects.requireNonNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Unit tests the methods of {@link StatementPatternMatcher}.
+ */
+public class StatementPatternMatcherTest {
+
+    @Test
+    public void matchesSubject() throws Exception {
+        // Create the matcher against a pattern that matches a specific 
subject.
+        final StatementPatternMatcher matcher = new 
StatementPatternMatcher(getSp(
+                "SELECT * WHERE {" +
+                    "<urn:Alice> ?p ?o ." +
+                "}"));
+
+        // Create a statement that matches the pattern.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Statement statement = 
vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), 
vf.createURI("urn:Bob"), vf.createURI("urn:testGraph"));
+
+        // Create the expected resulting Binding Set.
+        final QueryBindingSet expected = new QueryBindingSet();
+        expected.addBinding("p", vf.createURI("urn:talksTo"));
+        expected.addBinding("o", vf.createURI("urn:Bob"));
+
+        // Show the expected Binding Set matches the resulting Binding Set.
+        final Optional<BindingSet> bs = matcher.match(statement);
+        assertEquals(expected, bs.get());
+    }
+
+    @Test
+    public void doesNotMatchSubject() throws Exception {
+        // Create the matcher against a pattern that matches a specific 
subject.
+        final StatementPatternMatcher matcher = new 
StatementPatternMatcher(getSp(
+                "SELECT * WHERE {" +
+                    "<urn:Alice> ?p ?o ." +
+                "}"));
+
+        // Create a statement that does not match the pattern.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Statement statement = 
vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:talksTo"), 
vf.createURI("urn:Bob"), vf.createURI("urn:testGraph"));
+
+        // Show the statement did not match.
+        final Optional<BindingSet> bs = matcher.match(statement);
+        assertFalse(bs.isPresent());
+    }
+
+    @Test
+    public void matchesPredicate() throws Exception {
+        // Create the matcher against a pattern that matches a specific 
predicate.
+        final StatementPatternMatcher matcher = new 
StatementPatternMatcher(getSp(
+                "SELECT * WHERE {" +
+                    "?s <urn:talksTo> ?o ." +
+                "}"));
+
+        // Create a statement that matches the pattern.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Statement statement = 
vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), 
vf.createURI("urn:Bob"), vf.createURI("urn:testGraph"));
+
+        // Create the expected resulting Binding Set.
+        final QueryBindingSet expected = new QueryBindingSet();
+        expected.addBinding("s", vf.createURI("urn:Alice"));
+        expected.addBinding("o", vf.createURI("urn:Bob"));
+
+        // Show the expected Binding Set matches the resulting Binding Set.
+        final Optional<BindingSet> bs = matcher.match(statement);
+        assertEquals(expected, bs.get());
+    }
+
+    @Test
+    public void doesNotMatchPredicate() throws Exception {
+        // Create the matcher against a pattern that matches a specific 
predicate.
+        final StatementPatternMatcher matcher = new 
StatementPatternMatcher(getSp(
+                "SELECT * WHERE {" +
+                    "?s <urn:talksTo> ?o ." +
+                "}"));
+
+        // Create a statement that does not match the pattern.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Statement statement = 
vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:knows"), 
vf.createURI("urn:Bob"), vf.createURI("urn:testGraph"));
+
+        // Show the statement did not match.
+        final Optional<BindingSet> bs = matcher.match(statement);
+        assertFalse(bs.isPresent());
+    }
+
+    @Test
+    public void matchesObject() throws Exception {
+        // Create the matcher against a pattern that matches a specific object.
+        final StatementPatternMatcher matcher = new 
StatementPatternMatcher(getSp(
+                "SELECT * WHERE {" +
+                    "?s ?p <urn:Bob> ." +
+                "}"));
+
+        // Create a statement that matches the pattern.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Statement statement = 
vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), 
vf.createURI("urn:Bob"), vf.createURI("urn:testGraph"));
+
+        // Create the expected resulting Binding Set.
+        final QueryBindingSet expected = new QueryBindingSet();
+        expected.addBinding("s", vf.createURI("urn:Alice"));
+        expected.addBinding("p", vf.createURI("urn:talksTo"));
+
+        // Show the expected Binding Set matches the resulting Binding Set.
+        final Optional<BindingSet> bs = matcher.match(statement);
+        assertEquals(expected, bs.get());
+    }
+
+    @Test
+    public void doesNotMatchObject() throws Exception {
+        // Create the matcher against a pattern that matches a specific object.
+        final StatementPatternMatcher matcher = new 
StatementPatternMatcher(getSp(
+                "SELECT * WHERE {" +
+                    "?s ?p <urn:Bob> ." +
+                "}"));
+
+        // Create a statement that does not match the pattern.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Statement statement = 
vf.createStatement(vf.createURI("urn:Charlie"), vf.createURI("urn:knows"), 
vf.createURI("urn:Alice"), vf.createURI("urn:testGraph"));
+
+        // Show the statement did not match.
+        final Optional<BindingSet> bs = matcher.match(statement);
+        assertFalse(bs.isPresent());
+    }
+
+    @Test
+    public void matchesContext() throws Exception {
+        // Create a matcher against a pattern that matches a specific context.
+        final StatementPatternMatcher matcher = new 
StatementPatternMatcher(getSp(
+                "SELECT * WHERE {" +
+                    "GRAPH <urn:testGraph> {" +
+                        "?s ?p ?o ." +
+                    "}" +
+                "}"));
+
+        // Create a statement that matches the pattern.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Statement statement = 
vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), 
vf.createURI("urn:Bob"), vf.createURI("urn:testGraph"));
+
+        // Create the expected resulting Binding Set.
+        final QueryBindingSet expected = new QueryBindingSet();
+        expected.addBinding("s", vf.createURI("urn:Alice"));
+        expected.addBinding("p", vf.createURI("urn:talksTo"));
+        expected.addBinding("o", vf.createURI("urn:Bob"));
+
+        // Show the expected Binding Set matches the resulting Binding Set.
+        final Optional<BindingSet> bs = matcher.match(statement);
+        assertEquals(expected, bs.get());
+    }
+
+    @Test
+    public void doesNotMatchContext() throws Exception {
+        // Create a matcher against a pattern that matches a specific context.
+        final StatementPatternMatcher matcher = new 
StatementPatternMatcher(getSp(
+                "SELECT * WHERE {" +
+                    "GRAPH <urn:testGraph> {" +
+                        "?s ?p ?o ." +
+                    "}" +
+                "}"));
+
+        // Create a statement that does not match the pattern.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Statement statement = 
vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), 
vf.createURI("urn:Bob"), vf.createURI("urn:wrong"));
+
+        // Show the statement did not match.
+        final Optional<BindingSet> bs = matcher.match(statement);
+        assertFalse(bs.isPresent());
+    }
+
+    @Test
+    public void variableContext() throws Exception {
+        // Create a matcher against a pattern that matches a variable context.
+        final StatementPatternMatcher matcher = new 
StatementPatternMatcher(getSp(
+                "SELECT * WHERE {" +
+                    "GRAPH ?c {" +
+                        "?s ?p ?o ." +
+                    "}" +
+                "}"));
+
+        // Create a statement that matches the pattern.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Statement statement = 
vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), 
vf.createURI("urn:Bob"), vf.createURI("urn:testGraph"));
+
+        // Create the expected resulting Binding Set.
+        final QueryBindingSet expected = new QueryBindingSet();
+        expected.addBinding("s", vf.createURI("urn:Alice"));
+        expected.addBinding("p", vf.createURI("urn:talksTo"));
+        expected.addBinding("o", vf.createURI("urn:Bob"));
+        expected.addBinding("c", vf.createURI("urn:testGraph"));
+
+        // Show the expected Binding Set matches the resulting Binding Set.
+        final Optional<BindingSet> bs = matcher.match(statement);
+        assertEquals(expected, bs.get());
+    }
+
+    @Test
+    public void variableContext_contextFreeStatement() throws Exception {
+        // Create a matcher against a pattern that matches a variable context.
+        final StatementPatternMatcher matcher = new 
StatementPatternMatcher(getSp(
+                "SELECT * WHERE {" +
+                    "GRAPH ?c {" +
+                        "?s ?p ?o ." +
+                    "}" +
+                "}"));
+
+        // Create a statement that does not have a context value.
+        final ValueFactory vf = new ValueFactoryImpl();
+        final Statement statement = 
vf.createStatement(vf.createURI("urn:Alice"), vf.createURI("urn:talksTo"), 
vf.createURI("urn:Bob"));
+
+        // Show the statement did not match.
+        final Optional<BindingSet> bs = matcher.match(statement);
+        assertFalse(bs.isPresent());
+    }
+
+    /**
+     * Fetch the {@link StatementPattern} from a SPARQL string.
+     *
+     * @param sparql - A SPARQL query that contains only a single Statement 
Patern. (not nul)
+     * @return The {@link StatementPattern} that was in the query, if it could 
be found. Otherwise {@code null}
+     * @throws Exception The statement pattern could not be found in the 
parsed SPARQL query.
+     */
+    public static @Nullable StatementPattern getSp(final String sparql) throws 
Exception {
+        requireNonNull(sparql);
+
+        final AtomicReference<StatementPattern> statementPattern = new 
AtomicReference<>();
+        final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null);
+        parsed.getTupleExpr().visitChildren(new 
QueryModelVisitorBase<Exception>() {
+            @Override
+            public void meet(final StatementPattern node) throws Exception {
+                statementPattern.set(node);
+            }
+        });
+        return statementPattern.get();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/LoadStatements.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/LoadStatements.java
 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/LoadStatements.java
index babe914..c64a08d 100644
--- 
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/LoadStatements.java
+++ 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/LoadStatements.java
@@ -19,6 +19,7 @@
 package org.apache.rya.streams.api.interactor;
 
 import java.nio.file.Path;
+import java.util.Collection;
 
 import org.apache.rya.api.model.VisibilityStatement;
 import org.apache.rya.streams.api.exception.RyaStreamsException;
@@ -45,5 +46,13 @@ public interface LoadStatements {
      * @throws RyaStreamsException Thrown when the format of the file provided 
is unknown,
      *         or not a valid RDF format.
      */
-    public void load(final Path statementsPath, final String visibilities) 
throws RyaStreamsException;
+    public void fromFile(final Path statementsPath, final String visibilities) 
throws RyaStreamsException;
+
+    /**
+     * Loads a series of {@link VisibilityStatement}s from a collection into 
the RyaStreams system.
+     *
+     * @param statements - The statements that will be loaded. (not null)
+     * @throws RyaStreamsException The statements could not be loaded.
+     */
+    public void fromCollection(Collection<VisibilityStatement> statements) 
throws RyaStreamsException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
index 6ae63da..9414b28 100644
--- 
a/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
+++ 
b/extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/LoadStatementsCommand.java
@@ -127,7 +127,7 @@ public class LoadStatementsCommand implements 
RyaStreamsCommand {
         final Properties producerProps = buildProperties(params);
         try (final Producer<Object, VisibilityStatement> producer = new 
KafkaProducer<>(producerProps)) {
             final LoadStatements statements = new 
KafkaLoadStatements(KafkaTopics.statementsTopic(params.ryaInstance), producer);
-            statements.load(statementsPath, params.visibilities);
+            statements.fromFile(statementsPath, params.visibilities);
         } catch (final Exception e) {
             System.err.println("Unable to parse statements file: " + 
statementsPath.toString());
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/kafka/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/kafka/pom.xml b/extras/rya.streams/kafka/pom.xml
index d5fffe0..33cc985 100644
--- a/extras/rya.streams/kafka/pom.xml
+++ b/extras/rya.streams/kafka/pom.xml
@@ -41,6 +41,10 @@ under the License.
         </dependency>
         <dependency>
             <groupId>org.apache.rya</groupId>
+            <artifactId>rya.api.function</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
             <artifactId>rya.streams.api</artifactId>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java
index 4cf8f9b..8ab3ab6 100644
--- 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatements.java
@@ -23,6 +23,7 @@ import static java.util.Objects.requireNonNull;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.Collection;
 
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -65,7 +66,7 @@ public class KafkaLoadStatements implements LoadStatements {
 
 
     @Override
-    public void load(final Path statementsPath, final String visibilities) 
throws RyaStreamsException {
+    public void fromFile(final Path statementsPath, final String visibilities) 
throws RyaStreamsException {
         requireNonNull(statementsPath);
         requireNonNull(visibilities);
 
@@ -99,4 +100,14 @@ public class KafkaLoadStatements implements LoadStatements {
             throw new RyaStreamsException("Could not load the RDF file's 
Statements into Rya Streams.", e);
         }
     }
+
+    @Override
+    public void fromCollection(final Collection<VisibilityStatement> 
statements) throws RyaStreamsException {
+        requireNonNull(statements);
+
+        for(final VisibilityStatement statement : statements) {
+            producer.send(new ProducerRecord<>(topic, statement));
+        }
+        producer.flush();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/RyaStreamsSinkFormatterSupplier.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/RyaStreamsSinkFormatterSupplier.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/RyaStreamsSinkFormatterSupplier.java
new file mode 100644
index 0000000..d6a8d2d
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/RyaStreamsSinkFormatterSupplier.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors;
+
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.rya.api.model.VisibilityBindingSet;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Supplies {@link RyaStreamsSinkFormatter} instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class RyaStreamsSinkFormatterSupplier implements 
ProcessorSupplier<Object, ProcessorResult> {
+
+    @Override
+    public Processor<Object, ProcessorResult> get() {
+        return new RyaStreamsSinkFormatter();
+    }
+
+    /**
+     * Accepts {@link ProcessorResult}s and forwards just their {@link 
VisibilityBindingSet} so that it may be
+     * written to a sink.
+     */
+    @DefaultAnnotation(NonNull.class)
+    public static final class RyaStreamsSinkFormatter implements 
Processor<Object, ProcessorResult> {
+
+        private ProcessorContext processorContext;
+
+        @Override
+        public void init(final ProcessorContext context) {
+            processorContext = context;
+        }
+
+        @Override
+        public void process(final Object key, final ProcessorResult value) {
+
+            VisibilityBindingSet result = null;
+            switch(value.getType()) {
+                case UNARY:
+                    result = value.getUnary().getResult();
+                    break;
+
+                case BINARY:
+                    result = value.getBinary().getResult();
+                    break;
+            }
+
+            if(result != null) {
+                processorContext.forward(key, result);
+            }
+        }
+
+        @Override
+        public void punctuate(final long timestamp) {
+            // Does nothing.
+        }
+
+        @Override
+        public void close() {
+            // Does nothing.
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java
new file mode 100644
index 0000000..6991783
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorSupplier.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Optional;
+
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.rya.api.function.sp.StatementPatternMatcher;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.StatementPattern;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Supplies {@link StatementPatternProcessor} instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class StatementPatternProcessorSupplier implements 
ProcessorSupplier<String, VisibilityStatement> {
+
+    private final StatementPattern sp;
+    private final ProcessorResultFactory resultFactory;
+
+    /**
+     * Constructs an instance of {@link StatementPatternProcessorSupplier}.
+     *
+     * @param sp - The statement pattern that the supplied processors will 
match against. (not null)
+     * @param keyFactory - The key factory that the supplied processors will 
use to create result keys. (not null)
+     * @param resultFactory - The factory that the supplied processors will 
use to create results. (not null)
+     */
+    public StatementPatternProcessorSupplier(
+            final StatementPattern sp,
+            final ProcessorResultFactory resultFactory) {
+        this.sp = requireNonNull(sp);
+        this.resultFactory = requireNonNull(resultFactory);
+    }
+
+    @Override
+    public Processor<String, VisibilityStatement> get() {
+        return new StatementPatternProcessor(sp, resultFactory);
+    }
+
+    /**
+     * Evaluates {@link VisibilityStatement}s against a {@link 
StatementPattern}. Any who match the pattern
+     * will forward a {@link VisibilityBindingSet} as well as store that new 
binding set in the local state store
+     * so that downstream join processors may access everything that has ever 
been emitted for the pattern.
+     */
+    @DefaultAnnotation(NonNull.class)
+    public static final class StatementPatternProcessor implements 
Processor<String, VisibilityStatement> {
+
+        private final StatementPatternMatcher spMatcher;
+        private final ProcessorResultFactory resultFactory;
+
+        private ProcessorContext context;
+
+        /**
+         * Constructs an instance of {@link StatementPatternProcessor}.
+         *
+         * @param sp - The statement pattern that the processor will match 
statements against. (not null)
+         * @param resultFactory - The factory that the processor will use to 
create results. (not null)
+         */
+        public StatementPatternProcessor(
+                final StatementPattern sp,
+                final ProcessorResultFactory resultFactory) {
+            this.spMatcher = new StatementPatternMatcher( requireNonNull(sp) );
+            this.resultFactory = requireNonNull(resultFactory);
+        }
+
+        @Override
+        public void init(final ProcessorContext context) {
+            this.context = context;
+        }
+
+        @Override
+        public void process(final String key, final VisibilityStatement 
statement) {
+            // Check to see if the Statement matches the Statement Pattern.
+            final Optional<BindingSet> bs = spMatcher.match(statement);
+
+            if(bs.isPresent()) {
+                // If it does, wrap the Binding Set with the Statement's 
visibility expression and write it to the state store.
+                final VisibilityBindingSet visBs = new 
VisibilityBindingSet(bs.get(), statement.getVisibility());
+
+                // Wrap the binding set as a result and forward it to the 
downstream processor.
+                final ProcessorResult resultValue = resultFactory.make(visBs);
+                context.forward(key, resultValue);
+            }
+        }
+
+        @Override
+        public void punctuate(final long timestamp) {
+            // Nothing to do.
+        }
+
+        @Override
+        public void close() {
+            // Nothing to do.
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
new file mode 100644
index 0000000..bff4fdb
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/KafkaTestUtil.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A set of utility functions that are useful when writing tests against a 
Kafka instance.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class KafkaTestUtil {
+
+    private KafkaTestUtil() { }
+
+    /**
+     * Create a {@link Producer} that is able to write to a topic that is 
hosted within an embedded instance of Kafka.
+     *
+     * @param kafka - The Kafka rule used to connect to the embedded Kafkfa 
instance. (not null)
+     * @param keySerializerClass - Serializes the keys. (not null)
+     * @param valueSerializerClass - Serializes the values. (not null)
+     * @return A {@link Producer} that can be used to write records to a topic.
+     */
+    public static <K, V> Producer<K, V> makeProducer(
+            final KafkaTestInstanceRule kafka,
+            final Class<? extends Serializer<K>> keySerializerClass,
+            final Class<? extends Serializer<V>> valueSerializerClass) {
+        requireNonNull(kafka);
+        requireNonNull(keySerializerClass);
+        requireNonNull(valueSerializerClass);
+
+        final Properties props = kafka.createBootstrapServerConfig();
+        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
keySerializerClass.getName());
+        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
valueSerializerClass.getName());
+        return new KafkaProducer<>(props);
+    }
+
+    /**
+     * Create a {@link Consumer} that has a unique group ID and reads 
everything from a topic that is hosted within an
+     * embedded instance of Kafka starting at the earliest point by default.
+     *
+     * @param kafka - The Kafka rule used to connect to the embedded Kafkfa 
instance. (not null)
+     * @param keyDeserializerClass - Deserializes the keys. (not null)
+     * @param valueDeserializerClass - Deserializes the values. (not null)
+     * @return A {@link Consumer} that can be used to read records from a 
topic.
+     */
+    public static <K, V> Consumer<K, V> fromStartConsumer(
+            final KafkaTestInstanceRule kafka,
+            final Class<? extends Deserializer<K>> keyDeserializerClass,
+            final Class<? extends Deserializer<V>> valueDeserializerClass) {
+        requireNonNull(kafka);
+        requireNonNull(keyDeserializerClass);
+        requireNonNull(valueDeserializerClass);
+
+        final Properties props = kafka.createBootstrapServerConfig();
+        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
UUID.randomUUID().toString());
+        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass.getName());
+        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass.getName());
+        return new KafkaConsumer<>(props);
+    }
+
+    /**
+     * Polls a {@link Consumer> until it has either polled too many times 
without hitting the target number
+     * of results, or it hits the target number of results.
+     *
+     * @param pollMs - How long each poll could take.
+     * @param pollIterations - The maximum number of polls that will be 
attempted.
+     * @param targetSize - The number of results to read before stopping.
+     * @param consumer - The consumer that will be polled.
+     * @return The results that were read frmo the consumer.
+     * @throws Exception If the poll failed.
+     */
+    public static <K, V> List<V> pollForResults(
+            final int pollMs,
+            final int pollIterations,
+            final int targetSize,
+            final Consumer<K, V> consumer) throws Exception {
+        requireNonNull(consumer);
+
+        final List<V> values = new ArrayList<>();
+
+        int i = 0;
+        while(values.size() < targetSize && i < pollIterations) {
+            for(final ConsumerRecord<K, V> record : consumer.poll(pollMs)) {
+                values.add( record.value() );
+            }
+            i++;
+        }
+
+        return values;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java
new file mode 100644
index 0000000..109e40d
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/RdfTestUtil.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A set of utility functions that are useful when writing tests RDF functions.
+ */
+@DefaultAnnotation(NonNull.class)
+public final class RdfTestUtil {
+
+    private RdfTestUtil() { }
+
+    /**
+     * Fetch the {@link StatementPattern} from a SPARQL string.
+     *
+     * @param sparql - A SPARQL query that contains only a single Statement 
Patern. (not nul)
+     * @return The {@link StatementPattern} that was in the query, if it could 
be found. Otherwise {@code null}
+     * @throws Exception The statement pattern could not be found in the 
parsed SPARQL query.
+     */
+    public static @Nullable StatementPattern getSp(final String sparql) throws 
Exception {
+        requireNonNull(sparql);
+
+        final AtomicReference<StatementPattern> statementPattern = new 
AtomicReference<>();
+        final ParsedQuery parsed = new SPARQLParser().parseQuery(sparql, null);
+        parsed.getTupleExpr().visitChildren(new 
QueryModelVisitorBase<Exception>() {
+            @Override
+            public void meet(final StatementPattern node) throws Exception {
+                statementPattern.set(node);
+            }
+        });
+        return statementPattern.get();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
index 3343f76..67889e9 100644
--- 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
+++ 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaGetQueryResultStreamIT.java
@@ -23,17 +23,15 @@ import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Properties;
 import java.util.UUID;
 
-import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.api.model.VisibilityBindingSet;
 import org.apache.rya.streams.api.entity.QueryResultStream;
 import org.apache.rya.streams.api.interactor.GetQueryResultStream;
+import org.apache.rya.streams.kafka.KafkaTestUtil;
 import org.apache.rya.streams.kafka.KafkaTopics;
 import 
org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
@@ -52,16 +50,6 @@ public class KafkaGetQueryResultStreamIT {
     public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
 
     /**
-     * @return A {@link Producer} that is able to write {@link 
VisibilityBindingSet}s.
-     */
-    private Producer<?, VisibilityBindingSet> makeProducer() {
-        final Properties producerProps = kafka.createBootstrapServerConfig();
-        producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-        
producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
VisibilityBindingSetSerializer.class.getName());
-        return new KafkaProducer<>(producerProps);
-    }
-
-    /**
      * Polls a {@link QueryResultStream} until it has either polled too many 
times without hitting
      * the target number of results, or it hits the target number of results.
      *
@@ -112,7 +100,8 @@ public class KafkaGetQueryResultStreamIT {
         original.add(new VisibilityBindingSet(bs, "b|c"));
 
         // Write some entries to the query result topic in Kafka.
-        try(final Producer<?, VisibilityBindingSet> producer = makeProducer()) 
{
+        try(final Producer<?, VisibilityBindingSet> producer =
+                KafkaTestUtil.makeProducer(kafka, StringSerializer.class, 
VisibilityBindingSetSerializer.class)) {
             final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
             for(final VisibilityBindingSet visBs : original) {
                 producer.send(new ProducerRecord<>(resultTopic, visBs));
@@ -132,7 +121,8 @@ public class KafkaGetQueryResultStreamIT {
         // Create an ID for the query.
         final UUID queryId = UUID.randomUUID();
 
-        try(final Producer<?, VisibilityBindingSet> producer = makeProducer()) 
{
+        try(final Producer<?, VisibilityBindingSet> producer =
+                KafkaTestUtil.makeProducer(kafka, StringSerializer.class, 
VisibilityBindingSetSerializer.class)) {
             final String resultTopic = KafkaTopics.queryResultsTopic(queryId);
 
             // Write a single visibility binding set to the query's result 
topic. This will not appear in the expected results.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java
index 5a81d23..b48addd 100644
--- 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java
+++ 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/interactor/KafkaLoadStatementsIT.java
@@ -24,21 +24,14 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Properties;
-import java.util.UUID;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.KafkaTestUtil;
 import 
org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
 import 
org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
 import org.apache.rya.test.kafka.KafkaITBase;
@@ -62,48 +55,30 @@ public class KafkaLoadStatementsIT extends KafkaITBase {
 
     @Test(expected = UnsupportedRDFormatException.class)
     public void test_invalidFile() throws Exception {
-        final String topic = rule.getKafkaTopicName();
-        final String visibilities = "a|b|c";
-        final Properties props = rule.createBootstrapServerConfig();
-        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
VisibilityStatementSerializer.class.getName());
-        try (final Producer<Object, VisibilityStatement> producer = new 
KafkaProducer<>(props)) {
-            final KafkaLoadStatements command = new KafkaLoadStatements(topic, 
producer);
-            command.load(INVALID, visibilities);
+        try(final Producer<?, VisibilityStatement> producer =
+                KafkaTestUtil.makeProducer(rule, StringSerializer.class, 
VisibilityStatementSerializer.class)) {
+            final KafkaLoadStatements command = new 
KafkaLoadStatements(rule.getKafkaTopicName(), producer);
+            command.fromFile(INVALID, "a|b|c");
         }
     }
 
     @Test
     public void testTurtle() throws Exception {
-        final String topic = rule.getKafkaTopicName();
         final String visibilities = "a|b|c";
-        final Properties props = rule.createBootstrapServerConfig();
-        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
VisibilityStatementSerializer.class.getName());
-        try (final Producer<Object, VisibilityStatement> producer = new 
KafkaProducer<>(props)) {
-            final KafkaLoadStatements command = new KafkaLoadStatements(topic, 
producer);
-            command.load(TURTLE_FILE, visibilities);
-        }
-
-        // Read a VisibilityBindingSet from the test topic.
-        final List<VisibilityStatement> read = new ArrayList<>();
 
-        final Properties consumerProps = rule.createBootstrapServerConfig();
-        consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
UUID.randomUUID().toString());
-        consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
-        
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
VisibilityStatementDeserializer.class.getName());
+        // Load the statements into the kafka topic.
+        try(final Producer<?, VisibilityStatement> producer =
+                KafkaTestUtil.makeProducer(rule, StringSerializer.class, 
VisibilityStatementSerializer.class)) {
+            final KafkaLoadStatements command = new 
KafkaLoadStatements(rule.getKafkaTopicName(), producer);
+            command.fromFile(TURTLE_FILE, visibilities);
+        }
 
-        try (final KafkaConsumer<String, VisibilityStatement> consumer = new 
KafkaConsumer<>(consumerProps)) {
+        // Read a VisibilityBindingSets from the test topic.
+        final List<VisibilityStatement> read;// = new ArrayList<>();
+        try(Consumer<String, VisibilityStatement> consumer =
+                KafkaTestUtil.fromStartConsumer(rule, 
StringDeserializer.class, VisibilityStatementDeserializer.class)) {
             consumer.subscribe(Arrays.asList(rule.getKafkaTopicName()));
-            final ConsumerRecords<String, VisibilityStatement> records = 
consumer.poll(2000);
-
-            assertEquals(3, records.count());
-            final Iterator<ConsumerRecord<String, VisibilityStatement>> iter = 
records.iterator();
-            while(iter.hasNext()) {
-                final VisibilityStatement visiSet = iter.next().value();
-                read.add(visiSet);
-            }
+            read = KafkaTestUtil.pollForResults(500, 6, 3, consumer);
         }
 
         final List<VisibilityStatement> original = new ArrayList<>();
@@ -121,4 +96,4 @@ public class KafkaLoadStatementsIT extends KafkaITBase {
         // Show the written statement matches the read one.
         assertEquals(original, read);
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
new file mode 100644
index 0000000..1b58b42
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/processors/StatementPatternProcessorIT.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.processors;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.KafkaTestUtil;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.RdfTestUtil;
+import org.apache.rya.streams.kafka.interactor.KafkaLoadStatements;
+import org.apache.rya.streams.kafka.processors.ProcessorResult.UnaryResult;
+import 
org.apache.rya.streams.kafka.processors.RyaStreamsSinkFormatterSupplier.RyaStreamsSinkFormatter;
+import 
org.apache.rya.streams.kafka.processors.StatementPatternProcessorSupplier.StatementPatternProcessor;
+import 
org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer;
+import 
org.apache.rya.streams.kafka.serialization.VisibilityBindingSetSerializer;
+import 
org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer;
+import 
org.apache.rya.streams.kafka.serialization.VisibilityStatementSerializer;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+/**
+ * Integration tests the methods of {@link StatementPatternProcessor}.
+ */
+public class StatementPatternProcessorIT {
+
+    @Rule
+    public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
+
+    @Test
+    public void statementPatternMatches() throws Exception {
+        // Enumerate some topics that will be re-used
+        final String ryaInstance = UUID.randomUUID().toString();
+        final UUID queryId = UUID.randomUUID();
+        final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
+        final String resultsTopic = KafkaTopics.queryResultsTopic(queryId);
+
+        // Get the StatementPattern object that will be evaluated.
+        final StatementPattern sp = RdfTestUtil.getSp("SELECT * WHERE { 
?person <urn:talksTo> ?otherPerson }");
+
+        // Setup a topology.
+        final TopologyBuilder builder = new TopologyBuilder();
+
+        // The topic that Statements are written to is used as a source.
+        builder.addSource("STATEMENTS", new StringDeserializer(), new 
VisibilityStatementDeserializer(), statementsTopic);
+
+        // Add a processor that handles the first statement pattern.
+        builder.addProcessor("SP1", new StatementPatternProcessorSupplier(sp, 
result -> ProcessorResult.make( new UnaryResult(result) )), "STATEMENTS");
+
+        // Add a processor that formats the VisibilityBindingSet for output.
+        builder.addProcessor("SINK_FORMATTER", RyaStreamsSinkFormatter::new, 
"SP1");
+
+        // Add a sink that writes the data out to a new Kafka topic.
+        builder.addSink("QUERY_RESULTS", resultsTopic, new StringSerializer(), 
new VisibilityBindingSetSerializer(), "SINK_FORMATTER");
+
+        // Start the streams program.
+        final Properties props = kafka.createBootstrapServerConfig();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"StatementPatternProcessorIT");
+
+        final KafkaStreams streams = new KafkaStreams(builder, new 
StreamsConfig(props));
+        streams.cleanUp();
+        try {
+            streams.start();
+
+            // Wait for the streams application to start. Streams only see 
data after their consumers are connected.
+            Thread.sleep(2000);
+
+            // Load some data into the input topic.
+            final ValueFactory vf = new ValueFactoryImpl();
+            final List<VisibilityStatement> statements = new ArrayList<>();
+            statements.add( new 
VisibilityStatement(vf.createStatement(vf.createURI("urn:Alice"), 
vf.createURI("urn:talksTo"), vf.createURI("urn:Bob")), "a") );
+
+            try(Producer<String, VisibilityStatement> producer = 
KafkaTestUtil.makeProducer(
+                    kafka, StringSerializer.class, 
VisibilityStatementSerializer.class)) {
+                new KafkaLoadStatements(statementsTopic, 
producer).fromCollection(statements);
+            }
+
+            // Wait for the final results to appear in the output topic and 
verify the expected Binding Set was found.
+            try(Consumer<String, VisibilityBindingSet> consumer = 
KafkaTestUtil.fromStartConsumer(
+                    kafka, StringDeserializer.class, 
VisibilityBindingSetDeserializer.class)) {
+                // Register the topic.
+                consumer.subscribe(Arrays.asList(resultsTopic));
+
+                // Poll for the result.
+                final List<VisibilityBindingSet> results = 
KafkaTestUtil.pollForResults(500, 6, 1, consumer);
+
+                // Show the correct binding set results from the job.
+                final QueryBindingSet bs = new QueryBindingSet();
+                bs.addBinding("person", vf.createURI("urn:Alice"));
+                bs.addBinding("otherPerson", vf.createURI("urn:Bob"));
+                final VisibilityBindingSet expected = new 
VisibilityBindingSet(bs, "a");
+
+                final VisibilityBindingSet result = results.iterator().next();
+                assertEquals(expected, result);
+            }
+        } finally {
+            streams.close();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
index 9e89ca7..ff2b59b 100644
--- 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
+++ 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
@@ -22,22 +22,18 @@ import static org.junit.Assert.assertEquals;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Properties;
 import java.util.UUID;
 
 import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.streams.api.queries.ChangeLogEntry;
 import org.apache.rya.streams.api.queries.QueryChange;
 import 
org.apache.rya.streams.api.queries.QueryChangeLog.QueryChangeLogException;
+import org.apache.rya.streams.kafka.KafkaTestUtil;
 import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
 import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
 import org.apache.rya.test.kafka.KafkaITBase;
@@ -55,11 +51,10 @@ import info.aduna.iteration.CloseableIteration;
  * Integration tests the {@link KafkaQueryChangeLog}.
  */
 public class KafkaQueryChangeLogIT extends KafkaITBase {
-    KafkaQueryChangeLog changeLog;
 
+    private KafkaQueryChangeLog changeLog;
     private Producer<?, QueryChange> producer;
     private Consumer<?, QueryChange> consumer;
-
     private String topic;
 
     @Rule
@@ -68,25 +63,14 @@ public class KafkaQueryChangeLogIT extends KafkaITBase {
     @Before
     public void setup() {
         topic = rule.getKafkaTopicName();
-        final Properties producerProperties = 
rule.createBootstrapServerConfig();
-        
producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-        
producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
QueryChangeSerializer.class.getName());
-
-        final Properties consumerProperties = 
rule.createBootstrapServerConfig();
-        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
UUID.randomUUID().toString());
-        
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
-        
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
QueryChangeDeserializer.class.getName());
-        producer = new KafkaProducer<>(producerProperties);
-        consumer = new KafkaConsumer<>(consumerProperties);
+        producer = KafkaTestUtil.makeProducer(rule, StringSerializer.class, 
QueryChangeSerializer.class);
+        consumer = KafkaTestUtil.fromStartConsumer(rule, 
StringDeserializer.class, QueryChangeDeserializer.class);
         changeLog = new KafkaQueryChangeLog(producer, consumer, topic);
     }
 
     @After
     public void cleanup() {
-        producer.flush();
         producer.close();
-
         consumer.close();
     }
 
@@ -202,4 +186,4 @@ public class KafkaQueryChangeLogIT extends KafkaITBase {
         }
         return changes;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetKafkaIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetKafkaIT.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetKafkaIT.java
index 6104578..f9129ff 100644
--- 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetKafkaIT.java
+++ 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetKafkaIT.java
@@ -21,18 +21,15 @@ package org.apache.rya.streams.kafka.serialization;
 import static org.junit.Assert.assertEquals;
 
 import java.util.Arrays;
-import java.util.Properties;
-import java.util.UUID;
+import java.util.List;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.api.model.VisibilityBindingSet;
+import org.apache.rya.streams.kafka.KafkaTestUtil;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 import org.junit.Rule;
 import org.junit.Test;
@@ -49,7 +46,7 @@ public class VisibilityBindingSetKafkaIT {
     public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
 
     @Test
-    public void readAndWrite() {
+    public void readAndWrite() throws Exception {
         // Create the object that will be written to the topic.
         final ValueFactory vf = new ValueFactoryImpl();
 
@@ -59,32 +56,23 @@ public class VisibilityBindingSetKafkaIT {
         final VisibilityBindingSet original = new VisibilityBindingSet(bs, 
"a|b|c");
 
         // Write a VisibilityBindingSet to the test topic.
-        final Properties producerProps = kafka.createBootstrapServerConfig();
-        producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-        
producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
VisibilityBindingSetSerializer.class.getName());
-
-        try(final KafkaProducer<String, VisibilityBindingSet> producer = new 
KafkaProducer<>(producerProps)) {
+        try(Producer<String, VisibilityBindingSet> producer = 
KafkaTestUtil.makeProducer(
+                kafka, StringSerializer.class, 
VisibilityBindingSetSerializer.class)) {
             producer.send( new ProducerRecord<String, 
VisibilityBindingSet>(kafka.getKafkaTopicName(), original) );
         }
 
         // Read a VisibilityBindingSet from the test topic.
-        VisibilityBindingSet read;
-
-        final Properties consumerProps = kafka.createBootstrapServerConfig();
-        consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
UUID.randomUUID().toString());
-        consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
-        
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
VisibilityBindingSetDeserializer.class.getName());
-
-        try(final KafkaConsumer<String, VisibilityBindingSet> consumer = new 
KafkaConsumer<>(consumerProps)) {
+        try(Consumer<String, VisibilityBindingSet> consumer = 
KafkaTestUtil.fromStartConsumer(
+                kafka, StringDeserializer.class, 
VisibilityBindingSetDeserializer.class)) {
+            // Register the topic.
             consumer.subscribe(Arrays.asList(kafka.getKafkaTopicName()));
-            final ConsumerRecords<String, VisibilityBindingSet> records = 
consumer.poll(1000);
 
-            assertEquals(1, records.count());
-            read = records.iterator().next().value();
-        }
+            // Poll for the result.
+            final List<VisibilityBindingSet> results = 
KafkaTestUtil.pollForResults(500, 6, 1, consumer);
 
-        // Show the written statement matches the read one.
-        assertEquals(original, read);
+            // Show the written statement matches the read one.
+            final VisibilityBindingSet read = results.iterator().next();
+            assertEquals(original, read);
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/653e4b83/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementKafkaIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementKafkaIT.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementKafkaIT.java
index 62122bd..b85eb0c 100644
--- 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementKafkaIT.java
+++ 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementKafkaIT.java
@@ -21,18 +21,15 @@ package org.apache.rya.streams.kafka.serialization;
 import static org.junit.Assert.assertEquals;
 
 import java.util.Arrays;
-import java.util.Properties;
-import java.util.UUID;
+import java.util.List;
 
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.rya.api.model.VisibilityStatement;
+import org.apache.rya.streams.kafka.KafkaTestUtil;
 import org.apache.rya.test.kafka.KafkaTestInstanceRule;
 import org.junit.Rule;
 import org.junit.Test;
@@ -48,7 +45,7 @@ public class VisibilityStatementKafkaIT {
     public KafkaTestInstanceRule kafka = new KafkaTestInstanceRule(true);
 
     @Test
-    public void readAndWrite() {
+    public void readAndWrite() throws Exception {
         // Create the object that will be written to the topic.
         final ValueFactory vf = new ValueFactoryImpl();
         final VisibilityStatement original = new VisibilityStatement(
@@ -60,32 +57,23 @@ public class VisibilityStatementKafkaIT {
                 "a|b|c");
 
         // Write a VisibilityStatement to the test topic.
-        final Properties producerProps = kafka.createBootstrapServerConfig();
-        producerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
-        
producerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
VisibilityStatementSerializer.class.getName());
-
-        try(final KafkaProducer<String, VisibilityStatement> producer = new 
KafkaProducer<>(producerProps)) {
+        try(Producer<String, VisibilityStatement> producer = 
KafkaTestUtil.makeProducer(
+                kafka, StringSerializer.class, 
VisibilityStatementSerializer.class)) {
             producer.send( new ProducerRecord<String, 
VisibilityStatement>(kafka.getKafkaTopicName(), original) );
         }
 
         // Read a VisibilityStatement from the test topic.
-        VisibilityStatement read;
-
-        final Properties consumerProps = kafka.createBootstrapServerConfig();
-        consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
UUID.randomUUID().toString());
-        consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
-        
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
VisibilityStatementDeserializer.class.getName());
-
-        try(final KafkaConsumer<String, VisibilityStatement> consumer = new 
KafkaConsumer<>(consumerProps)) {
+        try(Consumer<String, VisibilityStatement> consumer = 
KafkaTestUtil.fromStartConsumer(
+                kafka, StringDeserializer.class, 
VisibilityStatementDeserializer.class)) {
+            // Register the topic.
             consumer.subscribe(Arrays.asList(kafka.getKafkaTopicName()));
-            final ConsumerRecords<String, VisibilityStatement> records = 
consumer.poll(1000);
 
-            assertEquals(1, records.count());
-            read = records.iterator().next().value();
-        }
+            // Poll for the result.
+            final List<VisibilityStatement> results = 
KafkaTestUtil.pollForResults(500, 6, 1, consumer);
 
-        // Show the written statement matches the read one.
-        assertEquals(original, read);
+            // Show the written statement matches the read one.
+            final VisibilityStatement read = results.iterator().next();
+            assertEquals(original, read);
+        }
     }
 }
\ No newline at end of file

Reply via email to