http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/PeriodicQueryCommand.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/PeriodicQueryCommand.java
 
b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/PeriodicQueryCommand.java
new file mode 100644
index 0000000..37993bf
--- /dev/null
+++ 
b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/PeriodicQueryCommand.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.benchmark.periodic;
+
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Objects;
+
+@Parameters(commandNames = { "periodic" }, commandDescription = "Run benchmark 
with a PeriodicQuery that uses Filter(function:periodic(?temporalVariable, 
<windowSize>, <updatePeriod>, <timeUnits>)).  This requires the Rya Periodic 
Notification Twill YARN Application to be running in addition to the Rya PCJ 
Updater Incremental Join Application.")
+public class PeriodicQueryCommand extends BenchmarkOptions {
+
+    @Parameter(names = { "-pqw", "--periodic-query-window" }, description = 
"The window size, in --periodic-query-time-units, for returning query 
results.", required = true)
+    private double periodicQueryWindow;
+
+    @Parameter(names = { "-pqp", "--periodic-query-period" }, description = 
"The period, in --periodic-query-time-units, for results of the windowed query 
to be returned.", required = true)
+    private double periodicQueryPeriod;
+
+    @Parameter(names = { "-pqtu", "--periodic-query-time-units" }, description 
= "The unit in time (days,hours,minutes)", required = true)
+    private PeriodicQueryTimeUnits periodicQueryTimeUnits;
+
+    @Parameter(names = { "-pqrt", "--periodic-query-registration-topic" }, 
description = "The kafka topic which periodic notification registration 
requests are published to.  (typically 'notifications')", required = true)
+    private String periodicQueryRegistrationTopic;
+
+    public enum PeriodicQueryTimeUnits {
+        days, hours, minutes;
+    }
+
+    public PeriodicQueryTimeUnits getPeriodicQueryTimeUnits() {
+        return periodicQueryTimeUnits;
+    }
+
+    public double getPeriodicQueryWindow() {
+        return periodicQueryWindow;
+    }
+
+    public double getPeriodicQueryPeriod() {
+        return periodicQueryPeriod;
+    }
+
+    public String getPeriodicQueryRegistrationTopic() {
+        return periodicQueryRegistrationTopic;
+    }
+
+    @Override
+    public String toString() {
+        return Objects.toStringHelper(this)
+                .add("periodicQueryWindow", periodicQueryWindow)
+                .add("periodicQueryPeriod", periodicQueryPeriod)
+                .add("periodicQueryTimeUnits", periodicQueryTimeUnits)
+                .add("periodicQueryRegistrationTopic", 
periodicQueryRegistrationTopic)
+                .toString() + super.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/ProjectionQueryCommand.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/ProjectionQueryCommand.java
 
b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/ProjectionQueryCommand.java
new file mode 100644
index 0000000..472b649
--- /dev/null
+++ 
b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/periodic/ProjectionQueryCommand.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.benchmark.periodic;
+
+import com.beust.jcommander.Parameters;
+import com.google.common.base.Objects;
+
+@Parameters(commandNames = { "projection" }, commandDescription = "Run 
benchmark with a simple projection query.  This requires the Rya PCJ Updater 
Incremental Join Application to be running.")
+public class ProjectionQueryCommand extends BenchmarkOptions {
+
+    @Override
+    public String toString() {
+        return Objects.toStringHelper(this).toString() + super.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/scripts/periodicNotificationBenchmark.sh
----------------------------------------------------------------------
diff --git 
a/extras/rya.benchmark/src/main/scripts/periodicNotificationBenchmark.sh 
b/extras/rya.benchmark/src/main/scripts/periodicNotificationBenchmark.sh
new file mode 100644
index 0000000..a69b71d
--- /dev/null
+++ b/extras/rya.benchmark/src/main/scripts/periodicNotificationBenchmark.sh
@@ -0,0 +1,32 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# navigate to the project directory
+PROJECT_HOME=$(dirname $(cd $(dirname $0) && pwd))
+cd $PROJECT_HOME
+
+
+# run the program
+$JAVA_HOME/bin/java -cp .:lib/* \
+  -Dlog4j.configuration=conf/log4j.properties \
+  org.apache.rya.benchmark.periodic.KafkaLatencyBenchmark \
+  @conf/common.options \
+  periodic \
+  @conf/periodic.options \
+  "$@"

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.benchmark/src/main/scripts/projectionNotificationBenchmark.sh
----------------------------------------------------------------------
diff --git 
a/extras/rya.benchmark/src/main/scripts/projectionNotificationBenchmark.sh 
b/extras/rya.benchmark/src/main/scripts/projectionNotificationBenchmark.sh
new file mode 100644
index 0000000..6039e60
--- /dev/null
+++ b/extras/rya.benchmark/src/main/scripts/projectionNotificationBenchmark.sh
@@ -0,0 +1,32 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# navigate to the project directory
+PROJECT_HOME=$(dirname $(cd $(dirname $0) && pwd))
+cd $PROJECT_HOME
+
+
+# run the program
+$JAVA_HOME/bin/java -cp .:lib/* \
+  -Dlog4j.configuration=conf/log4j.properties \
+  org.apache.rya.benchmark.periodic.KafkaLatencyBenchmark \
+  @conf/common.options \
+  projection \
+  @conf/projection.options \
+  "$@"

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.export/export.client/conf/config.xml
----------------------------------------------------------------------
diff --git a/extras/rya.export/export.client/conf/config.xml 
b/extras/rya.export/export.client/conf/config.xml
index 57787b1..f2a7fdd 100644
--- a/extras/rya.export/export.client/conf/config.xml
+++ b/extras/rya.export/export.client/conf/config.xml
@@ -16,18 +16,18 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License. -->
 <MergeToolConfiguration xmlns="http://mergeconfig";>
-    <parentHostname>10.63.8.102</parentHostname>
-    <parentUsername>SPEAR</parentUsername>
-    <parentPassword>spear</parentPassword>
-    <parentRyaInstanceName>spear_instance</parentRyaInstanceName>
-    <parentTablePrefix>asmith_demo_export_</parentTablePrefix>
-    <parentTomcatUrl>http://10.63.8.102:8080</parentTomcatUrl>
+    <parentHostname>10.10.10.100</parentHostname>
+    <parentUsername>accumuloUsername</parentUsername>
+    <parentPassword>accumuloPassword</parentPassword>
+    <parentRyaInstanceName>accumuloInstance</parentRyaInstanceName>
+    <parentTablePrefix>rya_demo_export_</parentTablePrefix>
+    <parentTomcatUrl>http://10.10.10.100:8080</parentTomcatUrl>
     <parentDBType>accumulo</parentDBType>
     <parentPort>1111</parentPort>
-    <childHostname>localhost</childHostname>
+    <childHostname>10.10.10.101</childHostname>
     <childRyaInstanceName>rya_demo_child</childRyaInstanceName>
-    <childTablePrefix>asmith_demo_export_</childTablePrefix>
-    <childTomcatUrl>http://localhost:8080</childTomcatUrl>
+    <childTablePrefix>rya_demo_export_</childTablePrefix>
+    <childTomcatUrl>http://10.10.10.101:8080</childTomcatUrl>
     <childDBType>mongo</childDBType>
     <childPort>27017</childPort>
     <mergePolicy>timestamp</mergePolicy>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
 
b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
index d7a50a7..309cb1f 100644
--- 
a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
+++ 
b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPeriodicQueryResultStorage.java
@@ -62,8 +62,8 @@ import com.google.common.base.Preconditions;
  */
 public class AccumuloPeriodicQueryResultStorage implements 
PeriodicQueryResultStorage {
 
-    private String ryaInstance;
-    private Connector accumuloConn;
+    private final String ryaInstance;
+    private final Connector accumuloConn;
     private Authorizations auths;
     private final PCJIdFactory pcjIdFactory = new PCJIdFactory();
     private final AccumuloPcjSerializer converter = new 
AccumuloPcjSerializer();
@@ -75,10 +75,10 @@ public class AccumuloPeriodicQueryResultStorage implements 
PeriodicQueryResultSt
      * @param accumuloConn - Accumulo Connector for connecting to an Accumulo 
instance
      * @param ryaInstance - Rya Instance name for connecting to Rya
      */
-    public AccumuloPeriodicQueryResultStorage(Connector accumuloConn, String 
ryaInstance) {
+    public AccumuloPeriodicQueryResultStorage(final Connector accumuloConn, 
final String ryaInstance) {
         this.accumuloConn = Preconditions.checkNotNull(accumuloConn);
         this.ryaInstance = Preconditions.checkNotNull(ryaInstance);
-        String user = accumuloConn.whoami();
+        final String user = accumuloConn.whoami();
         try {
             this.auths = 
accumuloConn.securityOperations().getUserAuthorizations(user);
         } catch (AccumuloException | AccumuloSecurityException e) {
@@ -87,21 +87,21 @@ public class AccumuloPeriodicQueryResultStorage implements 
PeriodicQueryResultSt
     }
 
     @Override
-    public String createPeriodicQuery(String sparql) throws 
PeriodicQueryStorageException {
+    public String createPeriodicQuery(final String sparql) throws 
PeriodicQueryStorageException {
         Preconditions.checkNotNull(sparql);
-        String queryId = pcjIdFactory.nextId();
+        final String queryId = pcjIdFactory.nextId();
         return createPeriodicQuery(queryId, sparql);
     }
-    
+
     @Override
-    public String createPeriodicQuery(String queryId, String sparql) throws 
PeriodicQueryStorageException {
+    public String createPeriodicQuery(final String queryId, final String 
sparql) throws PeriodicQueryStorageException {
         Set<String> bindingNames;
         try {
             bindingNames = new 
AggregateVariableRemover().getNonAggregationVariables(sparql);
-        } catch (MalformedQueryException e) {
+        } catch (final MalformedQueryException e) {
             throw new PeriodicQueryStorageException(e.getMessage());
         }
-        List<String> varOrderList = new ArrayList<>();
+        final List<String> varOrderList = new ArrayList<>();
         varOrderList.add(PeriodicQueryResultStorage.PeriodicBinId);
         varOrderList.addAll(bindingNames);
         createPeriodicQuery(queryId, sparql, new VariableOrder(varOrderList));
@@ -109,79 +109,88 @@ public class AccumuloPeriodicQueryResultStorage 
implements PeriodicQueryResultSt
     }
 
     @Override
-    public void createPeriodicQuery(String queryId, String sparql, 
VariableOrder order) throws PeriodicQueryStorageException {
+    public void createPeriodicQuery(final String queryId, final String sparql, 
final VariableOrder order) throws PeriodicQueryStorageException {
         Preconditions.checkNotNull(sparql);
         Preconditions.checkNotNull(queryId);
         Preconditions.checkNotNull(order);
         
Preconditions.checkArgument(PeriodicQueryResultStorage.PeriodicBinId.equals(order.getVariableOrders().get(0)),
                 "periodicBinId binding name must occur first in 
VariableOrder.");
-        String tableName = tableNameFactory.makeTableName(ryaInstance, 
queryId);
-        Set<VariableOrder> varOrders = new HashSet<>();
+        final String tableName = tableNameFactory.makeTableName(ryaInstance, 
queryId);
+        final Set<VariableOrder> varOrders = new HashSet<>();
         varOrders.add(order);
         try {
             pcjTables.createPcjTable(accumuloConn, tableName, varOrders, 
sparql);
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new PeriodicQueryStorageException(e.getMessage());
         }
     }
 
     @Override
-    public PeriodicQueryStorageMetadata getPeriodicQueryMetadata(String 
queryId) throws PeriodicQueryStorageException {
+    public PeriodicQueryStorageMetadata getPeriodicQueryMetadata(final String 
queryId) throws PeriodicQueryStorageException {
         try {
             return new PeriodicQueryStorageMetadata(
                     pcjTables.getPcjMetadata(accumuloConn, 
tableNameFactory.makeTableName(ryaInstance, queryId)));
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new PeriodicQueryStorageException(e.getMessage());
         }
     }
 
     @Override
-    public void addPeriodicQueryResults(String queryId, 
Collection<VisibilityBindingSet> results) throws PeriodicQueryStorageException {
+    public void addPeriodicQueryResults(final String queryId, final 
Collection<VisibilityBindingSet> results) throws PeriodicQueryStorageException {
         results.forEach(x -> 
Preconditions.checkArgument(x.hasBinding(PeriodicQueryResultStorage.PeriodicBinId),
                 "BindingSet must contain periodBinId binding."));
         try {
             pcjTables.addResults(accumuloConn, 
tableNameFactory.makeTableName(ryaInstance, queryId), results);
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new PeriodicQueryStorageException(e.getMessage());
         }
     }
 
     @Override
-    public void deletePeriodicQueryResults(String queryId, long binId) throws 
PeriodicQueryStorageException {
-        String tableName = tableNameFactory.makeTableName(ryaInstance, 
queryId);
+    public void deletePeriodicQueryResults(final String queryId, final long 
binId) throws PeriodicQueryStorageException {
+        final String tableName = tableNameFactory.makeTableName(ryaInstance, 
queryId);
+        BatchDeleter deleter = null;
         try {
-            Text prefix = getRowPrefix(binId);
-            BatchDeleter deleter = accumuloConn.createBatchDeleter(tableName, 
auths, 1, new BatchWriterConfig());
+            final Text prefix = getRowPrefix(binId);
+            deleter = accumuloConn.createBatchDeleter(tableName, auths, 1, new 
BatchWriterConfig());
             deleter.setRanges(Collections.singleton(Range.prefix(prefix)));
             deleter.delete();
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new PeriodicQueryStorageException(e.getMessage());
+        } finally {
+            try {
+                if(deleter != null) {
+                    deleter.close();
+                }
+            } catch (final Exception e) {
+                throw new PeriodicQueryStorageException(e.getMessage());
+            }
         }
     }
 
-    public void deletePeriodicQueryResults(String queryId) throws 
PeriodicQueryStorageException {
+    public void deletePeriodicQueryResults(final String queryId) throws 
PeriodicQueryStorageException {
         try {
             pcjTables.purgePcjTable(accumuloConn, 
tableNameFactory.makeTableName(ryaInstance, queryId));
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new PeriodicQueryStorageException(e.getMessage());
         }
     }
 
     @Override
-    public void deletePeriodicQuery(String queryId) throws 
PeriodicQueryStorageException {
+    public void deletePeriodicQuery(final String queryId) throws 
PeriodicQueryStorageException {
         try {
             pcjTables.dropPcjTable(accumuloConn, 
tableNameFactory.makeTableName(ryaInstance, queryId));
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new PeriodicQueryStorageException(e.getMessage());
         }
     }
 
     @Override
-    public CloseableIterator<BindingSet> listResults(String queryId, 
Optional<Long> binId)
+    public CloseableIterator<BindingSet> listResults(final String queryId, 
final Optional<Long> binId)
             throws PeriodicQueryStorageException {
         requireNonNull(queryId);
 
-        String tableName = tableNameFactory.makeTableName(ryaInstance, 
queryId);
+        final String tableName = tableNameFactory.makeTableName(ryaInstance, 
queryId);
         // Fetch the Variable Orders for the binding sets and choose one of
         // them. It
         // doesn't matter which one we choose because they all result in the
@@ -199,15 +208,15 @@ public class AccumuloPeriodicQueryResultStorage 
implements PeriodicQueryResultSt
             }
             return new AccumuloValueBindingSetIterator(scanner);
 
-        } catch (Exception e) {
+        } catch (final Exception e) {
             throw new PeriodicQueryStorageException(String.format("PCJ Table 
does not exist for name '%s'.", tableName), e);
         }
     }
-    
-    private Text getRowPrefix(long binId) throws BindingSetConversionException 
{
-        QueryBindingSet bs = new QueryBindingSet();
+
+    private Text getRowPrefix(final long binId) throws 
BindingSetConversionException {
+        final QueryBindingSet bs = new QueryBindingSet();
         bs.addBinding(PeriodicQueryResultStorage.PeriodicBinId, new 
LiteralImpl(Long.toString(binId), XMLSchema.LONG));
-        
+
         return new Text(converter.convert(bs, new 
VariableOrder(PeriodicQueryResultStorage.PeriodicBinId)));
     }
 
@@ -236,35 +245,35 @@ public class AccumuloPeriodicQueryResultStorage 
implements PeriodicQueryResultSt
         }
         return periodicTables;
     }
-    
+
     /**
      * Class for removing any aggregate variables from the 
ProjectionElementList
      * of the parsed SPARQL queries. This ensures that only non-aggregation
      * values are contained in the Accumulo row.  The non-aggregation variables
      * are not updated while the aggregation variables are, so they are 
included in
      * the serialized BindingSet in the Accumulo Value field, which is 
overwritten
-     * if an entry with the same Key and different Value (updated aggregation) 
is 
+     * if an entry with the same Key and different Value (updated aggregation) 
is
      * written to the table.
      *
      */
     static class AggregateVariableRemover extends 
QueryModelVisitorBase<RuntimeException> {
-        
+
         private Set<String> bindingNames;
-        
-        public Set<String> getNonAggregationVariables(String sparql) throws 
MalformedQueryException {
-            TupleExpr te = new SPARQLParser().parseQuery(sparql, 
null).getTupleExpr();
+
+        public Set<String> getNonAggregationVariables(final String sparql) 
throws MalformedQueryException {
+            final TupleExpr te = new SPARQLParser().parseQuery(sparql, 
null).getTupleExpr();
             bindingNames = te.getBindingNames();
             te.visit(this);
             return bindingNames;
         }
-        
+
         @Override
-        public void meet(ExtensionElem node) {
+        public void meet(final ExtensionElem node) {
             if(node.getExpr() instanceof AggregateOperatorBase) {
                 bindingNames.remove(node.getName());
             }
         }
-        
+
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.manual/src/site/markdown/pcj-updater.md
----------------------------------------------------------------------
diff --git a/extras/rya.manual/src/site/markdown/pcj-updater.md 
b/extras/rya.manual/src/site/markdown/pcj-updater.md
index 8f3c27f..11cb560 100644
--- a/extras/rya.manual/src/site/markdown/pcj-updater.md
+++ b/extras/rya.manual/src/site/markdown/pcj-updater.md
@@ -211,14 +211,16 @@ Add the following entries under Observer properties in the
 # fluo.observer.0=com.foo.Observer1
 # Can optionally have configuration key values
 # fluo.observer.1=com.foo.Observer2,configKey1=configVal1,configKey2=configVal2
-fluo.observer.0=org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver
-fluo.observer.1=org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver
-fluo.observer.2=org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver
-fluo.observer.3=org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver
-fluo.observer.4=org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver
-fluo.observer.5=org.apache.rya.indexing.pcj.fluo.app.observers.ProjectionObserver
-#fluo.observer.5=org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver
-fluo.observer.6=org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver,pcj.fluo.export.rya.enabled=true,pcj.fluo.export.rya.ryaInstanceName=rya_,pcj.fluo.export.rya.accumuloInstanceName=myAccumuloInstance,pcj.fluo.export.rya.zookeeperServers=zoo1;zoo2;zoo3,pcj.fluo.export.rya.exporterUsername=myUserName,pcj.fluo.export.rya.exporterPassword=myPassword,pcj.fluo.export.kafka.enabled=true,bootstrap.servers=myKafkaBroker:9092,key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer,value.serializer=org.apache.rya.indexing.pcj.fluo.app.export.kafka.KryoVisibilityBindingSetSerializer
+fluo.observer.0=org.apache.rya.indexing.pcj.fluo.app.batch.BatchObserver
+fluo.observer.1=org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver
+fluo.observer.2=org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver
+fluo.observer.3=org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver
+fluo.observer.4=org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver
+fluo.observer.5=org.apache.rya.indexing.pcj.fluo.app.observers.AggregationObserver
+fluo.observer.6=org.apache.rya.indexing.pcj.fluo.app.observers.PeriodicQueryObserver
+fluo.observer.7=org.apache.rya.indexing.pcj.fluo.app.observers.ProjectionObserver
+#fluo.observer.8=org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver
+fluo.observer.8=org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver,pcj.fluo.export.rya.enabled=true,pcj.fluo.export.rya.ryaInstanceName=rya_,pcj.fluo.export.rya.fluo.application.name=rya_pcj_updater,pcj.fluo.export.rya.accumuloInstanceName=myAccumuloInstance,pcj.fluo.export.rya.zookeeperServers=zoo1;zoo2;zoo3;zoo4;zoo5,pcj.fluo.export.rya.exporterUsername=myUserName,pcj.fluo.export.rya.exporterPassword=myPassword,pcj.fluo.export.rya.bindingset.enabled=true,pcj.fluo.export.periodic.bindingset.enabled=true,pcj.fluo.export.kafka.subgraph.enabled=true,pcj.fluo.export.kafka.bindingset.enabled=true,bootstrap.servers=kafka1:9092
 ```
 
 Description of configuration keys for the 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml 
b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
index e2f3a22..01da2dc 100644
--- a/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
+++ b/extras/rya.pcj.fluo/pcj.fluo.app/pom.xml
@@ -35,9 +35,7 @@
         A Fluo implementation of Rya Precomputed Join Indexing. This module 
produces
         a jar that may be executed by the 'fluo' command line tool as a YARN 
job.
     </description>
-    <properties>
-        <kryo.version>3.0.3</kryo.version>
-    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.slf4j</groupId>
@@ -81,7 +79,6 @@
         <dependency>
             <groupId>com.esotericsoftware</groupId>
             <artifactId>kryo</artifactId>
-            <version>${kryo.version}</version>
         </dependency>
 
         <!-- Testing dependencies. -->

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
index 17ed158..2cc9f77 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java
@@ -22,7 +22,6 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 
 import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
-import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
 import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer;
@@ -46,6 +45,8 @@ import 
org.openrdf.query.algebra.evaluation.ValueExprEvaluationException;
 import org.openrdf.query.algebra.evaluation.function.FunctionRegistry;
 import org.openrdf.query.algebra.evaluation.impl.EvaluationStrategyImpl;
 import org.openrdf.query.algebra.evaluation.util.QueryEvaluationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -58,7 +59,7 @@ import info.aduna.iteration.CloseableIteration;
 @DefaultAnnotation(NonNull.class)
 public class FilterResultUpdater {
 
-    private static final Logger log = 
Logger.getLogger(FilterResultUpdater.class);
+    private static final Logger log = 
LoggerFactory.getLogger(FilterResultUpdater.class);
 
     private static final VisibilityBindingSetSerDe BS_SERDE = new 
VisibilityBindingSetSerDe();
 
@@ -101,14 +102,11 @@ public class FilterResultUpdater {
         checkNotNull(childBindingSet);
         checkNotNull(filterMetadata);
 
-        log.trace(
-                "Transaction ID: " + tx.getStartTimestamp() + "\n" +
-                "Filter Node ID: " + filterMetadata.getNodeId() + "\n" +
-                "Binding Set:\n" + childBindingSet + "\n");
+        log.trace("Transaction ID: {}\nFilter Node ID: {}\nBinding 
Set:\n{}\n", tx.getStartTimestamp(), filterMetadata.getNodeId(), 
childBindingSet);
 
         // Parse the original query and find the Filter that represents 
filterId.
         final String sparql = filterMetadata.getFilterSparql();
-        Filter filter = FilterSerializer.deserialize(sparql);
+        final Filter filter = FilterSerializer.deserialize(sparql);
 
         // Evaluate whether the child BindingSet satisfies the filter's 
condition.
         final ValueExpr condition = filter.getCondition();
@@ -120,7 +118,7 @@ public class FilterResultUpdater {
 
             // Serialize and emit BindingSet
             final Bytes nodeValueBytes = BS_SERDE.serialize(childBindingSet);
-            log.trace("Transaction ID: " + tx.getStartTimestamp() + "\n" + 
"New Binding Set: " + childBindingSet + "\n");
+            log.trace("Transaction ID: {}\nNew Binding Set: {}\n", 
tx.getStartTimestamp(), childBindingSet);
 
             tx.set(resultRow, FluoQueryColumns.FILTER_BINDING_SET, 
nodeValueBytes);
         }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
index 4933d57..2da3e39 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/AbstractSpanBatchInformation.java
@@ -23,8 +23,6 @@ import java.util.Objects;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.Span;
 
-import com.google.common.base.Preconditions;
-
 /**
  * Abstract class for generating span based notifications.  A spanned 
notification
  * uses a {@link Span} to begin processing a Fluo Column at the position 
designated by the Span.
@@ -43,7 +41,7 @@ public abstract class AbstractSpanBatchInformation extends 
BasicBatchInformation
      */
     public AbstractSpanBatchInformation(int batchSize, Task task, Column 
column, Span span) {
         super(batchSize, task, column);
-        this.span = Preconditions.checkNotNull(span);
+        this.span = Objects.requireNonNull(span);
     }
 
     public AbstractSpanBatchInformation(Task task, Column column, Span span) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
index 3354fdc..989a8e5 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/batch/JoinBatchInformation.java
@@ -23,12 +23,9 @@ import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.Span;
 import org.apache.rya.indexing.pcj.fluo.app.JoinResultUpdater.Side;
 import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata.JoinType;
-import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.openrdf.query.Binding;
 
-import com.google.common.base.Preconditions;
-
 /**
  * This class updates join results based on parameters specified for the join's
  * children. The join has two children, and for one child a 
VisibilityBindingSet
@@ -66,9 +63,9 @@ public class JoinBatchInformation extends 
AbstractSpanBatchInformation {
      */
     public JoinBatchInformation(int batchSize, Task task, Column column, Span 
span, VisibilityBindingSet bs, Side side, JoinType join) {
         super(batchSize, task, column, span);
-        this.bs = Preconditions.checkNotNull(bs);
-        this.side = Preconditions.checkNotNull(side);
-        this.join = Preconditions.checkNotNull(join);
+        this.bs = Objects.requireNonNull(bs);
+        this.side = Objects.requireNonNull(side);
+        this.join = Objects.requireNonNull(join);
     }
     
     public JoinBatchInformation(Task task, Column column, Span span, 
VisibilityBindingSet bs, Side side, JoinType join) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
index 0c26d65..61b3aa2 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java
@@ -27,11 +27,12 @@ import java.util.concurrent.TimeUnit;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.log4j.Logger;
 import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
 import org.apache.rya.api.client.CreatePCJ.QueryType;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Sets;
 
@@ -39,8 +40,8 @@ import com.google.common.collect.Sets;
  * Incrementally exports SPARQL query results to Kafka topics.
  */
 public class KafkaBindingSetExporter implements IncrementalBindingSetExporter {
-    
-    private static final Logger log = 
Logger.getLogger(KafkaBindingSetExporter.class);
+
+    private static final Logger log = 
LoggerFactory.getLogger(KafkaBindingSetExporter.class);
     private final KafkaProducer<String, VisibilityBindingSet> producer;
 
 
@@ -50,7 +51,7 @@ public class KafkaBindingSetExporter implements 
IncrementalBindingSetExporter {
      * @param producer for sending result set alerts to a broker. (not null) 
Can be created and configured by
      *            {@link KafkaBindingSetExporterFactory}
      */
-    public KafkaBindingSetExporter(KafkaProducer<String, VisibilityBindingSet> 
producer) {
+    public KafkaBindingSetExporter(final KafkaProducer<String, 
VisibilityBindingSet> producer) {
         super();
         checkNotNull(producer, "Producer is required.");
         this.producer = producer;
@@ -64,9 +65,6 @@ public class KafkaBindingSetExporter implements 
IncrementalBindingSetExporter {
         checkNotNull(queryId);
         checkNotNull(result);
         try {
-            final String msg = "Out to Kafka topic: " + queryId + ", Result: " 
+ result;
-            log.trace(msg);
-
             // Send the result to the topic whose name matches the PCJ ID.
             final ProducerRecord<String, VisibilityBindingSet> rec = new 
ProducerRecord<>(queryId, result);
             final Future<RecordMetadata> future = producer.send(rec);
@@ -74,7 +72,7 @@ public class KafkaBindingSetExporter implements 
IncrementalBindingSetExporter {
             // Don't let the export return until the result has been written 
to the topic. Otherwise we may lose results.
             future.get();
 
-            log.debug("producer.send(rec) completed");
+            log.debug("Producer successfully sent record with queryId: {} and 
visbilityBindingSet: \n{}", queryId, result);
 
         } catch (final Throwable e) {
             throw new ResultExportException("A result could not be exported to 
Kafka.", e);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java
index b796a6f..c25cde6 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterFactory.java
@@ -21,11 +21,12 @@ package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
 import org.apache.fluo.api.observer.Observer.Context;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.log4j.Logger;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
 import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
 
@@ -42,18 +43,19 @@ import com.google.common.base.Optional;
  *     producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArraySerializer");
  *     producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer");
  * </pre>
- * 
+ *
  * @see ProducerConfig
  */
 public class KafkaBindingSetExporterFactory implements 
IncrementalResultExporterFactory {
-    private static final Logger log = 
Logger.getLogger(KafkaBindingSetExporterFactory.class);
+    private static final Logger log = 
LoggerFactory.getLogger(KafkaBindingSetExporterFactory.class);
+
     @Override
-    public Optional<IncrementalResultExporter> build(Context context) throws 
IncrementalExporterFactoryException, ConfigurationException {
+    public Optional<IncrementalResultExporter> build(final Context context) 
throws IncrementalExporterFactoryException, ConfigurationException {
         final KafkaBindingSetExporterParameters exportParams = new 
KafkaBindingSetExporterParameters(context.getObserverConfiguration().toMap());
-        log.debug("KafkaResultExporterFactory.build(): 
params.isExportToKafka()=" + exportParams.getUseKafkaBindingSetExporter());
         if (exportParams.getUseKafkaBindingSetExporter()) {
+            log.info("Exporter is enabled.");
             // Setup Kafka connection
-            KafkaProducer<String, VisibilityBindingSet> producer = new 
KafkaProducer<String, VisibilityBindingSet>(exportParams.listAllConfig());
+            final KafkaProducer<String, VisibilityBindingSet> producer = new 
KafkaProducer<>(exportParams.listAllConfig());
             // Create the exporter
             final IncrementalBindingSetExporter exporter = new 
KafkaBindingSetExporter(producer);
             return Optional.of(exporter);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java
index 8686c85..2d25c35 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaExportParameterBase.java
@@ -19,6 +19,7 @@
 package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
 
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 
@@ -26,8 +27,6 @@ import org.apache.fluo.api.observer.Observer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.rya.indexing.pcj.fluo.app.export.ParametersBase;
 
-import com.google.common.base.Preconditions;
-
 /**
  * Provides read/write functions to the parameters map that is passed into an
  * {@link Observer#init(io.fluo.api.observer.Observer.Context)} method related
@@ -46,7 +45,7 @@ public class KafkaExportParameterBase extends ParametersBase {
      * @param bootstrapServers - connect string for Kafka brokers
      */
     public void setKafkaBootStrapServers(String bootstrapServers) {
-        params.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
Preconditions.checkNotNull(bootstrapServers));
+        params.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
Objects.requireNonNull(bootstrapServers));
     }
     
     /**

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
index da26329..df0e387 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporter.java
@@ -26,37 +26,42 @@ import java.util.concurrent.TimeUnit;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.log4j.Logger;
 import org.apache.rya.api.client.CreatePCJ.ExportStrategy;
 import org.apache.rya.api.client.CreatePCJ.QueryType;
 import org.apache.rya.api.domain.RyaSubGraph;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException;
+import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Sets;
 
-import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
-
 /**
- * Exports {@link RyaSubGraph}s to Kafka from Rya Fluo Application 
+ * Exports {@link RyaSubGraph}s to Kafka from Rya Fluo Application
  *
  */
 public class KafkaRyaSubGraphExporter implements 
IncrementalRyaSubGraphExporter {
 
     private final KafkaProducer<String, RyaSubGraph> producer;
-    private static final Logger log = 
Logger.getLogger(KafkaRyaSubGraphExporter.class);
+    private static final Logger log = 
LoggerFactory.getLogger(KafkaRyaSubGraphExporter.class);
+
 
-    public KafkaRyaSubGraphExporter(KafkaProducer<String, RyaSubGraph> 
producer) {
+    /**
+     *
+     * @param producer - The producer used by this exporter.
+     */
+    public KafkaRyaSubGraphExporter(final KafkaProducer<String, RyaSubGraph> 
producer) {
         checkNotNull(producer);
         this.producer = producer;
     }
-    
+
     /**
      * Exports the RyaSubGraph to a Kafka topic equivalent to the result 
returned by {@link RyaSubGraph#getId()}
      * @param subgraph - RyaSubGraph exported to Kafka
      * @param contructID - rowID of result that is exported. Used for logging 
purposes.
      */
     @Override
-    public void export(String constructID, RyaSubGraph subGraph) throws 
ResultExportException {
+    public void export(final String constructID, final RyaSubGraph subGraph) 
throws ResultExportException {
         checkNotNull(constructID);
         checkNotNull(subGraph);
         try {
@@ -67,7 +72,7 @@ public class KafkaRyaSubGraphExporter implements 
IncrementalRyaSubGraphExporter
             // Don't let the export return until the result has been written 
to the topic. Otherwise we may lose results.
             future.get();
 
-            log.debug("Producer successfully sent record with id: " + 
constructID + " and statements: " + subGraph.getStatements());
+            log.debug("Producer successfully sent record with id: {} and 
statements: {}", constructID, subGraph.getStatements());
 
         } catch (final Throwable e) {
             throw new ResultExportException("A result could not be exported to 
Kafka.", e);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java
index 60e9294..e0c4190 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaRyaSubGraphExporterFactory.java
@@ -19,11 +19,12 @@ package org.apache.rya.indexing.pcj.fluo.app.export.kafka;
  */
 import org.apache.fluo.api.observer.Observer.Context;
 import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.log4j.Logger;
 import org.apache.rya.api.domain.RyaSubGraph;
 import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalRyaSubGraphExporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
 
@@ -34,10 +35,10 @@ import com.google.common.base.Optional;
  */
 public class KafkaRyaSubGraphExporterFactory implements 
IncrementalResultExporterFactory {
 
-    private static final Logger log = 
Logger.getLogger(KafkaRyaSubGraphExporterFactory.class);
+    private static final Logger log = 
LoggerFactory.getLogger(KafkaRyaSubGraphExporterFactory.class);
     public static final String CONF_USE_KAFKA_SUBGRAPH_EXPORTER = 
"pcj.fluo.export.kafka.subgraph.enabled";
     public static final String CONF_KAFKA_SUBGRAPH_SERIALIZER = 
"pcj.fluo.export.kafka.subgraph.serializer";
-    
+
     /**
      * Builds a {@link KafkaRyaSubGraphExporter}.
      * @param context - {@link Context} object used to pass configuration 
parameters
@@ -46,12 +47,12 @@ public class KafkaRyaSubGraphExporterFactory implements 
IncrementalResultExporte
      * @throws ConfigurationException
      */
     @Override
-    public Optional<IncrementalResultExporter> build(Context context) throws 
IncrementalExporterFactoryException, ConfigurationException {
+    public Optional<IncrementalResultExporter> build(final Context context) 
throws IncrementalExporterFactoryException, ConfigurationException {
         final KafkaSubGraphExporterParameters exportParams = new 
KafkaSubGraphExporterParameters(context.getObserverConfiguration().toMap());
-        log.debug("KafkaRyaSubGraphExporterFactory.build(): 
params.isExportToKafka()=" + exportParams.getUseKafkaSubgraphExporter());
+        log.info("Exporter is enabled: {}", 
exportParams.getUseKafkaSubgraphExporter());
         if (exportParams.getUseKafkaSubgraphExporter()) {
             // Setup Kafka connection
-            KafkaProducer<String, RyaSubGraph> producer = new 
KafkaProducer<String, RyaSubGraph>(exportParams.listAllConfig());
+            final KafkaProducer<String, RyaSubGraph> producer = new 
KafkaProducer<String, RyaSubGraph>(exportParams.listAllConfig());
             // Create the exporter
             final IncrementalRyaSubGraphExporter exporter = new 
KafkaRyaSubGraphExporter(producer);
             return Optional.of(exporter);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KryoVisibilityBindingSetSerializer.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KryoVisibilityBindingSetSerializer.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KryoVisibilityBindingSetSerializer.java
index d12233a..c67527d 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KryoVisibilityBindingSetSerializer.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KryoVisibilityBindingSetSerializer.java
@@ -25,7 +25,6 @@ import java.util.Map;
 
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
-import org.apache.log4j.Logger;
 import org.apache.rya.api.domain.RyaType;
 import org.apache.rya.api.resolver.RdfToRyaConversions;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
@@ -51,7 +50,7 @@ public class KryoVisibilityBindingSetSerializer implements 
Serializer<Visibility
     private static final ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>() {
         @Override
         protected Kryo initialValue() {
-            Kryo kryo = new Kryo();
+            final Kryo kryo = new Kryo();
             return kryo;
         };
     };
@@ -60,7 +59,7 @@ public class KryoVisibilityBindingSetSerializer implements 
Serializer<Visibility
      * Deserialize a VisibilityBindingSet using Kyro lib. Exporting results of 
queries.
      * If you don't want to use Kyro, here is an alternative:
      * return (new VisibilityBindingSetStringConverter()).convert(new 
String(data, StandardCharsets.UTF_8), null);
-     * 
+     *
      * @param topic
      *            ignored
      * @param data
@@ -68,9 +67,9 @@ public class KryoVisibilityBindingSetSerializer implements 
Serializer<Visibility
      * @return deserialized instance of VisibilityBindingSet
      */
     @Override
-    public VisibilityBindingSet deserialize(String topic, byte[] data) {
-        KryoInternalSerializer internalSerializer = new 
KryoInternalSerializer();
-        Input input = new Input(new ByteArrayInputStream(data));
+    public VisibilityBindingSet deserialize(final String topic, final byte[] 
data) {
+        final KryoInternalSerializer internalSerializer = new 
KryoInternalSerializer();
+        final Input input = new Input(new ByteArrayInputStream(data));
         return internalSerializer.read(kryos.get(), input, 
VisibilityBindingSet.class);
     }
 
@@ -78,7 +77,7 @@ public class KryoVisibilityBindingSetSerializer implements 
Serializer<Visibility
      * Ignored. Nothing to configure.
      */
     @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
         // Do nothing.
     }
 
@@ -86,7 +85,7 @@ public class KryoVisibilityBindingSetSerializer implements 
Serializer<Visibility
      * Serialize a VisibilityBindingSet using Kyro lib. Exporting results of 
queries.
      * If you don't want to use Kyro, here is an alternative:
      * return (new VisibilityBindingSetStringConverter()).convert(data, 
null).getBytes(StandardCharsets.UTF_8);
-     * 
+     *
      * @param topic
      *            ignored
      * @param data
@@ -94,13 +93,13 @@ public class KryoVisibilityBindingSetSerializer implements 
Serializer<Visibility
      * @return Serialized form of VisibilityBindingSet
      */
     @Override
-    public byte[] serialize(String topic, VisibilityBindingSet data) {
-        KryoInternalSerializer internalSerializer = new 
KryoInternalSerializer();
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        Output output = new Output(baos);
+    public byte[] serialize(final String topic, final VisibilityBindingSet 
data) {
+        final KryoInternalSerializer internalSerializer = new 
KryoInternalSerializer();
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        final Output output = new Output(baos);
         internalSerializer.write(kryos.get(), output, data);
         output.flush();
-        byte[] array = baos.toByteArray();
+        final byte[] array = baos.toByteArray();
         return array;
     }
 
@@ -127,14 +126,12 @@ public class KryoVisibilityBindingSetSerializer 
implements Serializer<Visibility
      *
      */
     private static class KryoInternalSerializer extends 
com.esotericsoftware.kryo.Serializer<VisibilityBindingSet> {
-        private static final Logger log = 
Logger.getLogger(KryoVisibilityBindingSetSerializer.class);
         @Override
-        public void write(Kryo kryo, Output output, VisibilityBindingSet 
visBindingSet) {
-            log.debug("Serializer writing visBindingSet" + visBindingSet);
+        public void write(final Kryo kryo, final Output output, final 
VisibilityBindingSet visBindingSet) {
             output.writeString(visBindingSet.getVisibility());
             // write the number count for the reader.
             output.writeInt(visBindingSet.size());
-            for (Binding binding : visBindingSet) {
+            for (final Binding binding : visBindingSet) {
                 output.writeString(binding.getName());
                 final RyaType ryaValue = 
RdfToRyaConversions.convertValue(binding.getValue());
                 final String valueString = ryaValue.getData();
@@ -145,19 +142,18 @@ public class KryoVisibilityBindingSetSerializer 
implements Serializer<Visibility
         }
 
         @Override
-        public VisibilityBindingSet read(Kryo kryo, Input input, 
Class<VisibilityBindingSet> aClass) {
-            log.debug("Serializer reading visBindingSet");
-            String visibility = input.readString();
-            int bindingCount = input.readInt();
-            ArrayList<String> namesList = new ArrayList<String>(bindingCount);
-            ArrayList<Value> valuesList = new ArrayList<Value>(bindingCount);
+        public VisibilityBindingSet read(final Kryo kryo, final Input input, 
final Class<VisibilityBindingSet> aClass) {
+            final String visibility = input.readString();
+            final int bindingCount = input.readInt();
+            final ArrayList<String> namesList = new 
ArrayList<String>(bindingCount);
+            final ArrayList<Value> valuesList = new 
ArrayList<Value>(bindingCount);
             for (int i = bindingCount; i > 0; i--) {
                 namesList.add(input.readString());
-                String valueString = input.readString();
+                final String valueString = input.readString();
                 final URI type = new URIImpl(input.readString());
                 valuesList.add(makeValue(valueString, type));
             }
-            BindingSet bindingSet = new ListBindingSet(namesList, valuesList);
+            final BindingSet bindingSet = new ListBindingSet(namesList, 
valuesList);
             return new VisibilityBindingSet(bindingSet, visibility);
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
index a87243e..901f351 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/RyaBindingSetExporterFactory.java
@@ -30,10 +30,8 @@ import org.apache.fluo.api.observer.Observer.Context;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
 import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporterFactory;
-import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
 import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
 import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
-import 
org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
 
 import com.google.common.base.Optional;
 
@@ -45,7 +43,6 @@ public class RyaBindingSetExporterFactory implements 
IncrementalResultExporterFa
     @Override
     public Optional<IncrementalResultExporter> build(final Context context) 
throws IncrementalExporterFactoryException, ConfigurationException {
         checkNotNull(context);
-
         // Wrap the context's parameters for parsing.
         final RyaExportParameters params = new RyaExportParameters( 
context.getObserverConfiguration().toMap() );
 
@@ -64,7 +61,7 @@ public class RyaBindingSetExporterFactory implements 
IncrementalResultExporterFa
                 // Setup Rya PCJ Storage.
                 final String ryaInstanceName = 
params.getRyaInstanceName().get();
                 final PrecomputedJoinStorage pcjStorage = new 
AccumuloPcjStorage(accumuloConn, ryaInstanceName);
-                
+
                 // Make the exporter.
                 final IncrementalBindingSetExporter exporter = new 
RyaBindingSetExporter(pcjStorage);
                 return Optional.of(exporter);

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
index e07c514..9514932 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java
@@ -25,7 +25,6 @@ import org.apache.fluo.api.client.TransactionBase;
 import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.observer.AbstractObserver;
-import org.apache.log4j.Logger;
 import org.apache.rya.indexing.pcj.fluo.app.export.ExporterManager;
 import 
org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter;
 import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalResultExporter;
@@ -38,6 +37,8 @@ import 
org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaBindingSetExporterFact
 import 
org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaSubGraphExporterFactory;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableSet;
@@ -46,14 +47,14 @@ import com.google.common.collect.ImmutableSet;
  * Performs incremental result exporting to the configured destinations.
  */
 public class QueryResultObserver extends AbstractObserver {
-    
-    private static final Logger log = 
Logger.getLogger(QueryResultObserver.class);
-    private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO();
-    
+
+    private static final Logger log = 
LoggerFactory.getLogger(QueryResultObserver.class);
+    private static final FluoQueryMetadataDAO DAO = new FluoQueryMetadataDAO();
+
     /**
      * Builders for each type of {@link IncrementalBindingSetExporter} we 
support.
      */
-    private static final ImmutableSet<IncrementalResultExporterFactory> 
factories =
+    private static final ImmutableSet<IncrementalResultExporterFactory> 
FACTORIES =
             ImmutableSet.<IncrementalResultExporterFactory>builder()
                 .add(new RyaBindingSetExporterFactory())
                 .add(new KafkaBindingSetExporterFactory())
@@ -61,7 +62,7 @@ public class QueryResultObserver extends AbstractObserver {
                 .add(new RyaSubGraphExporterFactory())
                 .add(new PeriodicBindingSetExporterFactory())
                 .build();
-    
+
     private ExporterManager exporterManager;
 
     @Override
@@ -74,25 +75,25 @@ public class QueryResultObserver extends AbstractObserver {
      */
     @Override
     public void init(final Context context) {
-        
-        ExporterManager.Builder managerBuilder = ExporterManager.builder();
-        
-        for(final IncrementalResultExporterFactory builder : factories) {
-            try {
-                log.debug("QueryResultObserver.init(): for each 
exportersBuilder=" + builder);
 
+        final ExporterManager.Builder managerBuilder = 
ExporterManager.builder();
+
+        for(final IncrementalResultExporterFactory builder : FACTORIES) {
+            try {
+                log.debug("Attempting to build exporter from factory: {}", 
builder);
                 final Optional<IncrementalResultExporter> exporter = 
builder.build(context);
                 if(exporter.isPresent()) {
+                    log.info("Adding exporter: {}", exporter.get());
                     
managerBuilder.addIncrementalResultExporter(exporter.get());
                 }
             } catch (final IncrementalExporterFactoryException e) {
                 log.error("Could not initialize a result exporter.", e);
             }
         }
-        
+
         exporterManager = managerBuilder.build();
     }
-    
+
 
     @Override
     public void process(final TransactionBase tx, final Bytes brow, final 
Column col) throws Exception {
@@ -100,11 +101,11 @@ public class QueryResultObserver extends AbstractObserver 
{
 
         // Read the queryId from the row and get the QueryMetadata.
         final String queryId = row.split(NODEID_BS_DELIM)[0];
-        final QueryMetadata metadata = dao.readQueryMetadata(tx, queryId);
+        final QueryMetadata metadata = DAO.readQueryMetadata(tx, queryId);
 
         // Read the Child Binding Set that will be exported.
         final Bytes valueBytes = tx.get(brow, col);
-        
+
         exporterManager.export(metadata.getQueryType(), 
metadata.getExportStrategies(), queryId, valueBytes);
     }
 
@@ -112,7 +113,7 @@ public class QueryResultObserver extends AbstractObserver {
     public void close() {
         try {
             exporterManager.close();
-        } catch (Exception e) {
+        } catch (final Exception e) {
            log.warn("Encountered problems closing the ExporterManager.");
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
index 6fc8e91..2d7f390 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/TripleObserver.java
@@ -31,7 +31,6 @@ import org.apache.fluo.api.data.Bytes;
 import org.apache.fluo.api.data.Column;
 import org.apache.fluo.api.data.Span;
 import org.apache.fluo.api.observer.AbstractObserver;
-import org.apache.log4j.Logger;
 import org.apache.rya.api.domain.RyaStatement;
 import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO;
 import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
@@ -41,6 +40,8 @@ import 
org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
 import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe;
 import 
org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetStringConverter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.Maps;
@@ -51,7 +52,7 @@ import com.google.common.collect.Maps;
  * the new result is stored as a binding set for the pattern.
  */
 public class TripleObserver extends AbstractObserver {
-    private static final Logger log = Logger.getLogger(TripleObserver.class);
+    private static final Logger log = 
LoggerFactory.getLogger(TripleObserver.class);
 
     private static final VisibilityBindingSetSerDe BS_SERDE = new 
VisibilityBindingSetSerDe();
     private static final FluoQueryMetadataDAO QUERY_METADATA_DAO = new 
FluoQueryMetadataDAO();
@@ -68,9 +69,7 @@ public class TripleObserver extends AbstractObserver {
     public void process(final TransactionBase tx, final Bytes brow, final 
Column column) {
         // Get string representation of triple.
         final RyaStatement ryaStatement = IncUpdateDAO.deserializeTriple(brow);
-        log.trace(
-                "Transaction ID: " + tx.getStartTimestamp() + "\n" +
-                "Rya Statement: " + ryaStatement + "\n");
+        log.trace("Transaction ID: {}\nRya Statement: {}\n", 
tx.getStartTimestamp(), ryaStatement);
 
         final String triple = IncUpdateDAO.getTripleString(ryaStatement);
 
@@ -114,10 +113,8 @@ public class TripleObserver extends AbstractObserver {
                     try {
                         final Bytes valueBytes = 
BS_SERDE.serialize(visBindingSet);
 
-                        log.trace(
-                                "Transaction ID: " + tx.getStartTimestamp() + 
"\n" +
-                                        "Matched Statement Pattern: " + spID + 
"\n" +
-                                        "Binding Set: " + visBindingSet + 
"\n");
+                        log.trace("Transaction ID: {}\nMatched Statement 
Pattern: {}\nBinding Set: {}\n",
+                                tx.getStartTimestamp(), spID, visBindingSet);
 
                         tx.set(rowBytes, 
FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, valueBytes);
                     } catch(final Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/8acd24b5/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
index a1c7c00..acf9b39 100644
--- 
a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
+++ 
b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java
@@ -81,7 +81,7 @@ public class FluoQuery {
             final Optional<PeriodicQueryMetadata> periodicQueryMetadata,
             final ImmutableMap<String, StatementPatternMetadata> 
statementPatternMetadata,
             final ImmutableMap<String, FilterMetadata> filterMetadata,
-            final ImmutableMap<String, JoinMetadata> joinMetadata, 
+            final ImmutableMap<String, JoinMetadata> joinMetadata,
             final ImmutableMap<String, AggregationMetadata> 
aggregationMetadata) {
                 this.aggregationMetadata = requireNonNull(aggregationMetadata);
         this.queryMetadata = requireNonNull(queryMetadata);
@@ -94,7 +94,7 @@ public class FluoQuery {
         this.joinMetadata = requireNonNull(joinMetadata);
         this.type = queryMetadata.getQueryType();
     }
-    
+
     /**
      * Returns the {@link QueryType} of this query
      * @return the QueryType of this query (either Construct or Projection}
@@ -102,7 +102,7 @@ public class FluoQuery {
     public QueryType getQueryType() {
         return type;
     }
-    
+
     /**
      * @return the unique id of this query
      */
@@ -116,66 +116,66 @@ public class FluoQuery {
     public QueryMetadata getQueryMetadata() {
         return queryMetadata;
     }
-    
+
     /**
      * @param nodeId - node id of the query metadata
      * @return Optional containing the queryMetadata if it matches the 
specified nodeId
      */
-    public Optional<QueryMetadata> getQueryMetadata(String nodeId) {
+    public Optional<QueryMetadata> getQueryMetadata(final String nodeId) {
         if(queryMetadata.getNodeId().equals(nodeId)) {
             return Optional.of(queryMetadata);
         } else {
             return Optional.absent();
         }
     }
-    
+
     /**
      * @return construct query metadata for generating subgraphs
      */
     public Optional<ConstructQueryMetadata> getConstructQueryMetadata() {
         return constructMetadata;
     }
-    
+
     /**
      * @param nodeId - node id of the ConstructMetadata
      * @return Optional containing the ConstructMetadata if it is present and 
has the given nodeId
      */
-    public Optional<ConstructQueryMetadata> getConstructQueryMetadata(String 
nodeId) {
+    public Optional<ConstructQueryMetadata> getConstructQueryMetadata(final 
String nodeId) {
         if(constructMetadata.isPresent() && 
constructMetadata.get().getNodeId().equals(nodeId)) {
             return constructMetadata;
         } else {
             return Optional.absent();
         }
     }
-    
+
     /**
      * @param nodeId - id of the Projection metadata you want (not null)
      * @return projection metadata corresponding to give nodeId
      */
-    public Optional<ProjectionMetadata> getProjectionMetadata(String nodeId) {
+    public Optional<ProjectionMetadata> getProjectionMetadata(final String 
nodeId) {
         return Optional.fromNullable(projectionMetadata.get(nodeId));
     }
-    
+
     /**
      * @return All of the projection metadata that is stored for the query
      */
     public Collection<ProjectionMetadata> getProjectionMetadata() {
         return projectionMetadata.values();
     }
-    
+
     /**
      * @return All of the Periodic Query metadata that is stored for the query.
      */
     public Optional<PeriodicQueryMetadata> getPeriodicQueryMetadata() {
         return periodicQueryMetadata;
     }
-    
+
     /**
      * @param nodeId - id of the PeriodicQueryMetadata
      * @return Optional containing the PeriodicQueryMetadata if it is present 
and has the given nodeId
      */
-    public Optional<PeriodicQueryMetadata> getPeriodicQueryMetadata(String 
nodeId) {
-        
+    public Optional<PeriodicQueryMetadata> getPeriodicQueryMetadata(final 
String nodeId) {
+
         if(periodicQueryMetadata.isPresent() && 
periodicQueryMetadata.get().getNodeId().equals(nodeId)) {
             return periodicQueryMetadata;
         } else {
@@ -294,17 +294,17 @@ public class FluoQuery {
 
         builder.append(queryMetadata.toString());
         builder.append("\n");
-        
+
         for(final ProjectionMetadata metadata : projectionMetadata.values()) {
             builder.append(metadata);
             builder.append("\n");
         }
-        
+
         if(constructMetadata.isPresent()) {
             builder.append( constructMetadata.get().toString() );
             builder.append("\n");
         }
-        
+
         if(periodicQueryMetadata.isPresent()) {
             builder.append(periodicQueryMetadata.get());
             builder.append("\n");
@@ -372,20 +372,20 @@ public class FluoQuery {
         public QueryMetadata.Builder getQueryBuilder() {
             return queryBuilder;
         }
-        
+
         /**
          * @param nodeId - id of the QueryMetadata.Builder
          * @return Optional containing the QueryMetadata.Builder if it has the 
specified nodeId
          */
-        public Optional<QueryMetadata.Builder> getQueryBuilder(String nodeId) {
+        public Optional<QueryMetadata.Builder> getQueryBuilder(final String 
nodeId) {
             if(queryBuilder.getNodeId().equals(nodeId)) {
                 return Optional.of(queryBuilder);
             } else {
                 return Optional.absent();
             }
-            
+
         }
-        
+
         /**
          * Sets the {@link ProjectionMetadata.Builder} that is used by this 
builder.
          *
@@ -401,11 +401,11 @@ public class FluoQuery {
         /**
          * @return The ProjectionMetadata builder if one has been set.
          */
-        public Optional<ProjectionMetadata.Builder> 
getProjectionBuilder(String nodeId) {
+        public Optional<ProjectionMetadata.Builder> getProjectionBuilder(final 
String nodeId) {
             requireNonNull(nodeId);
             return Optional.fromNullable( projectionBuilders.get(nodeId) );
         }
-        
+
         /**
          * Sets the {@link ConstructQueryMetadata.Builder} that is used by 
this builder.
          *
@@ -421,21 +421,21 @@ public class FluoQuery {
          * @param id of the ConstructQueryMetadata.Builder
          * @return Optional containing the ConstructQueryMetadata.Builder if 
it has been set and has the given nodeId.
          */
-        public Optional<ConstructQueryMetadata.Builder> 
getConstructQueryBuilder(String nodeId) {
+        public Optional<ConstructQueryMetadata.Builder> 
getConstructQueryBuilder(final String nodeId) {
             if(constructBuilder != null && 
constructBuilder.getNodeId().equals(nodeId)) {
                 return Optional.of(constructBuilder);
             } else {
                 return Optional.absent();
             }
         }
-        
+
         /**
          * @return The Construct Query metadata builder if one has been set.
          */
         public Optional<ConstructQueryMetadata.Builder> 
getConstructQueryBuilder() {
             return Optional.fromNullable( constructBuilder );
         }
-        
+
 
         /**
          * Adds a new {@link StatementPatternMetadata.Builder} to this builder.
@@ -505,7 +505,7 @@ public class FluoQuery {
             requireNonNull(nodeId);
             return Optional.fromNullable( joinBuilders.get(nodeId) );
         }
-        
+
         /**
          * Get an Aggregate builder from this builder.
          *
@@ -528,7 +528,7 @@ public class FluoQuery {
             this.aggregationBuilders.put(aggregationBuilder.getNodeId(), 
aggregationBuilder);
             return this;
         }
-        
+
         /**
          * Adds a new {@link PeriodicQueryMetadata.Builder} to this builder.
          *
@@ -549,33 +549,33 @@ public class FluoQuery {
         public Optional<PeriodicQueryMetadata.Builder> 
getPeriodicQueryBuilder() {
             return Optional.fromNullable( periodicQueryBuilder);
         }
-        
+
         /**
          * @param - id of the PeriodicQueryMetadata.Builder
          * @return - Optional containing the PeriodicQueryMetadata.Builder if 
one has been set and it has the given nodeId
          */
-        public Optional<PeriodicQueryMetadata.Builder> 
getPeriodicQueryBuilder(String nodeId) {
-            
+        public Optional<PeriodicQueryMetadata.Builder> 
getPeriodicQueryBuilder(final String nodeId) {
+
             if(periodicQueryBuilder != null && 
periodicQueryBuilder.getNodeId().equals(nodeId)) {
                 return Optional.of(periodicQueryBuilder);
             } else {
                 return Optional.absent();
             }
         }
-        
+
         /**
          * @return Creates a {@link FluoQuery} using the values that have been 
supplied to this builder.
-         * @throws UnsupportedQueryException 
+         * @throws UnsupportedQueryException
          */
         public FluoQuery build() throws UnsupportedQueryException {
             checkArgument((projectionBuilders.size() > 0 || constructBuilder 
!= null));
-            
-            Optional<PeriodicQueryMetadata.Builder> 
optionalPeriodicQueryBuilder = getPeriodicQueryBuilder();
+
+            final Optional<PeriodicQueryMetadata.Builder> 
optionalPeriodicQueryBuilder = getPeriodicQueryBuilder();
             PeriodicQueryMetadata periodicQueryMetadata = null;
             if(optionalPeriodicQueryBuilder.isPresent()) {
                 periodicQueryMetadata = 
optionalPeriodicQueryBuilder.get().build();
             }
-            
+
             final ImmutableMap.Builder<String, ProjectionMetadata> 
projectionMetadata = ImmutableMap.builder();
             for(final Entry<String, ProjectionMetadata.Builder> entry : 
projectionBuilders.entrySet()) {
                 projectionMetadata.put(entry.getKey(), 
entry.getValue().build());
@@ -601,8 +601,8 @@ public class FluoQuery {
                 aggregateMetadata.put(entry.getKey(), 
entry.getValue().build());
             }
 
-            QueryMetadata qMetadata = queryBuilder.build();
-            
+            final QueryMetadata qMetadata = queryBuilder.build();
+
             if(constructBuilder != null) {
                 if(periodicQueryMetadata != null) {
                     throw new UnsupportedQueryException("Queries containing 
sliding window filters and construct query patterns are not supported.");
@@ -612,10 +612,10 @@ public class FluoQuery {
                 if(aggregationBuilders.size() > 0 && qMetadata.getQueryType() 
== QueryType.PROJECTION && 
qMetadata.getExportStrategies().contains(ExportStrategy.RYA)) {
                     throw new UnsupportedQueryException("Exporting to Rya PCJ 
tables is currently not supported for queries containing aggregations.");
                 }
-                
-                return new FluoQuery(queryBuilder.build(), 
projectionMetadata.build(), Optional.absent(), 
Optional.fromNullable(periodicQueryMetadata), spMetadata.build(), 
filterMetadata.build(), joinMetadata.build(), aggregateMetadata.build());
+
+                return new FluoQuery(qMetadata, projectionMetadata.build(), 
Optional.absent(), Optional.fromNullable(periodicQueryMetadata), 
spMetadata.build(), filterMetadata.build(), joinMetadata.build(), 
aggregateMetadata.build());
             }
-            
+
         }
     }
 }
\ No newline at end of file

Reply via email to