http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java
new file mode 100644
index 0000000..f5cd13a
--- /dev/null
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.registration.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.rya.periodic.notification.api.LifeCycle;
+import org.apache.rya.periodic.notification.api.Notification;
+import 
org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Consumer group to pull all requests for adding and deleting {@link 
Notification}s
+ * from Kafka.  This Object executes {@link PeriodicNotificationConsumer}s 
that retrieve
+ * the {@link CommandNotification}s and register them with the {@link 
NotificationCoordinatorExecutor}.
+ *
+ */
+public class KafkaNotificationProvider implements LifeCycle {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaNotificationProvider.class);
+    private String topic;
+    private ExecutorService executor;
+    private NotificationCoordinatorExecutor coord;
+    private Properties props;
+    private int numThreads;
+    private boolean running = false;
+    Deserializer<String> keyDe;
+    Deserializer<CommandNotification> valDe;
+    List<PeriodicNotificationConsumer> consumers;
+
+    /**
+     * Create KafkaNotificationProvider for reading new notification requests 
form Kafka
+     * @param topic - notification topic    
+     * @param keyDe - Kafka message key deserializer
+     * @param valDe - Kafka message value deserializer
+     * @param props - properties used to creates a {@link KafkaConsumer}
+     * @param coord - {@link NotificationCoordinatorExecutor} for managing and 
generating notifications
+     * @param numThreads - number of threads used by this notification provider
+     */
+    public KafkaNotificationProvider(String topic, Deserializer<String> keyDe, 
Deserializer<CommandNotification> valDe, Properties props,
+            NotificationCoordinatorExecutor coord, int numThreads) {
+        this.coord = coord;
+        this.numThreads = numThreads;
+        this.topic = topic;
+        this.props = props;
+        this.consumers = new ArrayList<>();
+        this.keyDe = keyDe;
+        this.valDe = valDe;
+    }
+
+    @Override
+    public void stop() {
+        if (consumers != null && consumers.size() > 0) {
+            for (PeriodicNotificationConsumer consumer : consumers) {
+                consumer.shutdown();
+            }
+        }
+        if (executor != null) {
+            executor.shutdown();
+        }
+        running = false;
+        try {
+            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
+                LOG.info("Timed out waiting for consumer threads to shut down, 
exiting uncleanly");
+                executor.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            LOG.info("Interrupted during shutdown, exiting uncleanly");
+        }
+    }
+
+    public void start() {
+        if (!running) {
+            if (!coord.currentlyRunning()) {
+                coord.start();
+            }
+            // now launch all the threads
+            executor = Executors.newFixedThreadPool(numThreads);
+
+            // now create consumers to consume the messages
+            int threadNumber = 0;
+            for (int i = 0; i < numThreads; i++) {
+                LOG.info("Creating consumer:" + threadNumber);
+                KafkaConsumer<String, CommandNotification> consumer = new 
KafkaConsumer<String, CommandNotification>(props, keyDe, valDe);
+                PeriodicNotificationConsumer periodicConsumer = new 
PeriodicNotificationConsumer(topic, consumer, threadNumber, coord);
+                consumers.add(periodicConsumer);
+                executor.submit(periodicConsumer);
+                threadNumber++;
+            }
+            running = true;
+        }
+    }
+
+    @Override
+    public boolean currentlyRunning() {
+        return running;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java
 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java
new file mode 100644
index 0000000..6785ce8
--- /dev/null
+++ 
b/extras/periodic.notification/service/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.registration.kafka;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.log4j.Logger;
+import 
org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+
+/**
+ * Consumer for the {@link KafkaNotificationProvider}.  This consumer pull 
messages
+ * from Kafka and registers them with the {@link 
NotificationCoordinatorExecutor}.
+ *
+ */
+public class PeriodicNotificationConsumer implements Runnable {
+    private KafkaConsumer<String, CommandNotification> consumer;
+    private int m_threadNumber;
+    private String topic;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private NotificationCoordinatorExecutor coord;
+    private static final Logger LOG = 
Logger.getLogger(PeriodicNotificationConsumer.class);
+
+    /**
+     * Creates a new PeriodicNotificationConsumer for consuming new 
notification requests from
+     * Kafka.
+     * @param topic - new notification topic
+     * @param consumer - consumer for pulling new requests from Kafka
+     * @param a_threadNumber - number of consumer threads to be used
+     * @param coord - notification coordinator for managing and generating 
notifications
+     */
+    public PeriodicNotificationConsumer(String topic, KafkaConsumer<String, 
CommandNotification> consumer, int a_threadNumber,
+            NotificationCoordinatorExecutor coord) {
+        this.topic = topic;
+        m_threadNumber = a_threadNumber;
+        this.consumer = consumer;
+        this.coord = coord;
+    }
+
+    public void run() {
+        
+        try {
+            LOG.info("Creating kafka stream for consumer:" + m_threadNumber);
+            consumer.subscribe(Arrays.asList(topic));
+            while (!closed.get()) {
+                ConsumerRecords<String, CommandNotification> records = 
consumer.poll(10000);
+                // Handle new records
+                for(ConsumerRecord<String, CommandNotification> record: 
records) {
+                    CommandNotification notification = record.value();
+                    LOG.info("Thread " + m_threadNumber + " is adding 
notification " + notification + " to queue.");
+                    LOG.info("Message: " + notification);
+                    coord.processNextCommandNotification(notification);
+                }
+            }
+        } catch (WakeupException e) {
+            // Ignore exception if closing
+            if (!closed.get()) throw e;
+        } finally {
+            consumer.close();
+        }
+    }
+    
+    public void shutdown() {
+        closed.set(true);
+        consumer.wakeup();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/service/src/test/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/service/src/test/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializerTest.java
 
b/extras/periodic.notification/service/src/test/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializerTest.java
new file mode 100644
index 0000000..4aad1c6
--- /dev/null
+++ 
b/extras/periodic.notification/service/src/test/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializerTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rya.periodic.notification.notification.BasicNotification;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import 
org.apache.rya.periodic.notification.notification.CommandNotification.Command;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+import 
org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CommandNotificationSerializerTest {
+
+    private CommandNotificationSerializer serializer = new 
CommandNotificationSerializer();
+    private static final String topic = "topic";
+
+    @Test
+    public void basicSerializationTest() {
+        PeriodicNotification notification = 
PeriodicNotification.builder().id(UUID.randomUUID().toString()).period(24)
+                .timeUnit(TimeUnit.DAYS).initialDelay(1).build();
+        CommandNotification command = new CommandNotification(Command.ADD, 
notification);
+        Assert.assertEquals(command, 
serializer.deserialize(topic,serializer.serialize(topic, command)));
+
+        PeriodicNotification notification1 = 
PeriodicNotification.builder().id(UUID.randomUUID().toString()).period(32)
+                .timeUnit(TimeUnit.SECONDS).initialDelay(15).build();
+        CommandNotification command1 = new CommandNotification(Command.ADD, 
notification1);
+        Assert.assertEquals(command1, 
serializer.deserialize(topic,serializer.serialize(topic,command1)));
+
+        PeriodicNotification notification2 = 
PeriodicNotification.builder().id(UUID.randomUUID().toString()).period(32)
+                .timeUnit(TimeUnit.SECONDS).initialDelay(15).build();
+        CommandNotification command2 = new CommandNotification(Command.ADD, 
notification2);
+        Assert.assertEquals(command2, 
serializer.deserialize(topic,serializer.serialize(topic,command2)));
+
+        BasicNotification notification3 = new 
BasicNotification(UUID.randomUUID().toString());
+        CommandNotification command3 = new CommandNotification(Command.ADD, 
notification3);
+        Assert.assertEquals(command3, 
serializer.deserialize(topic,serializer.serialize(topic,command3)));
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/tests/pom.xml
----------------------------------------------------------------------
diff --git a/extras/periodic.notification/tests/pom.xml 
b/extras/periodic.notification/tests/pom.xml
new file mode 100644
index 0000000..31a6c0e
--- /dev/null
+++ b/extras/periodic.notification/tests/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more 
contributor 
+    license agreements. See the NOTICE file distributed with this work for 
additional 
+    information regarding copyright ownership. The ASF licenses this file to 
+    you under the Apache License, Version 2.0 (the "License"); you may not use 
+    this file except in compliance with the License. You may obtain a copy of 
+    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+    by applicable law or agreed to in writing, software distributed under the 
+    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS 
+    OF ANY KIND, either express or implied. See the License for the specific 
+    language governing permissions and limitations under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.periodic.notification.parent</artifactId>
+        <version>3.2.11-incubating-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>rya.periodic.notification.tests</artifactId>
+
+    <name>Apache Rya Periodic Notification Service Integration Tests</name>
+    <description>Integration Tests for Rya Periodic Notification 
Service</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.pcj.fluo.test.base</artifactId>
+            <exclusions>
+                <exclusion>
+                    <artifactId>log4j-1.2-api</artifactId>
+                    <groupId>org.apache.logging.log4j</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-api</artifactId>
+                    <groupId>org.apache.logging.log4j</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>log4j-core</artifactId>
+                    <groupId>org.apache.logging.log4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.periodic.notification.service</artifactId>
+            <exclusions>
+                <exclusion>
+                    <artifactId>logback-classic</artifactId>
+                    <groupId>ch.qos.logback</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>logback-core</artifactId>
+                    <groupId>ch.qos.logback</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
 
b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
new file mode 100644
index 0000000..9109775
--- /dev/null
+++ 
b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.application;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import javax.xml.datatype.DatatypeConfigurationException;
+import javax.xml.datatype.DatatypeFactory;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.config.FluoConfiguration;
+import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery;
+import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoClientFactory;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
+import org.apache.rya.kafka.base.EmbeddedKafkaInstance;
+import org.apache.rya.kafka.base.EmbeddedKafkaSingleton;
+import org.apache.rya.kafka.base.KafkaTestInstanceRule;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import 
org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
+import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
+import 
org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+import static 
org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.NOTIFICATION_TOPIC;
+import static 
org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.KAFKA_BOOTSTRAP_SERVERS;;
+
+
+public class PeriodicNotificationApplicationIT extends RyaExportITBase {
+
+    private PeriodicNotificationApplication app;
+    private KafkaNotificationRegistrationClient registrar;
+    private KafkaProducer<String, CommandNotification> producer;
+    private Properties props;
+    private Properties kafkaProps;
+    private PeriodicNotificationApplicationConfiguration conf;
+    private static EmbeddedKafkaInstance embeddedKafka = 
EmbeddedKafkaSingleton.getInstance();
+    private static String bootstrapServers;
+    
+    @Rule
+    public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(false);
+    
+    @BeforeClass
+    public static void initClass() {
+        bootstrapServers = 
embeddedKafka.createBootstrapServerConfig().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+    }
+    
+    @Before
+    public void init() throws Exception {
+        String topic = rule.getKafkaTopicName();
+        rule.createTopic(topic);
+        
+        //get user specified props and update with the embedded kafka 
bootstrap servers and rule generated topic
+        props = getProps();
+        props.setProperty(NOTIFICATION_TOPIC, topic);
+        props.setProperty(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers);
+        conf = new PeriodicNotificationApplicationConfiguration(props);
+        
+        //create Kafka Producer
+        kafkaProps = getKafkaProperties(conf);
+        producer = new KafkaProducer<>(kafkaProps, new StringSerializer(), new 
CommandNotificationSerializer());
+        
+        //extract kafka specific properties from application config
+        app = 
PeriodicNotificationApplicationFactory.getPeriodicApplication(props);
+        registrar = new 
KafkaNotificationRegistrationClient(conf.getNotificationTopic(), producer);
+    }
+    
+    @Test
+    public void periodicApplicationWithAggAndGroupByTest() throws Exception {
+
+        String sparql = "prefix function: <http://org.apache.rya/function#> " 
// n
+                + "prefix time: <http://www.w3.org/2006/time#> " // n
+                + "select ?type (count(?obs) as ?total) where {" // n
+                + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // 
n
+                + "?obs <uri:hasTime> ?time. " // n
+                + "?obs <uri:hasObsType> ?type } group by ?type"; // n
+        
+        //make data
+        int periodMult = 15;
+        final ValueFactory vf = new ValueFactoryImpl();
+        final DatatypeFactory dtf = DatatypeFactory.newInstance();
+        //Sleep until current time aligns nicely with period to makell
+        //results more predictable
+        while(System.currentTimeMillis() % (periodMult*1000) > 500);
+        ZonedDateTime time = ZonedDateTime.now();
+
+        ZonedDateTime zTime1 = time.minusSeconds(2*periodMult);
+        String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult);
+        String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult);
+        String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
+
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
+                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasObsType"), vf.createLiteral("ship")),
+                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
+                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasObsType"), vf.createLiteral("airplane")),
+                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
+                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasObsType"), vf.createLiteral("ship")),
+                vf.createStatement(vf.createURI("urn:obs_4"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
+                vf.createStatement(vf.createURI("urn:obs_4"), 
vf.createURI("uri:hasObsType"), vf.createLiteral("airplane")),
+                vf.createStatement(vf.createURI("urn:obs_5"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+                vf.createStatement(vf.createURI("urn:obs_5"), 
vf.createURI("uri:hasObsType"), vf.createLiteral("automobile")));
+        
+        try (FluoClient fluo = 
FluoClientFactory.getFluoClient(conf.getFluoAppName(), 
Optional.of(conf.getFluoTableName()), conf)) {
+            Connector connector = ConfigUtils.getConnector(conf);
+            PeriodicQueryResultStorage storage = new 
AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
+            CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, 
storage);
+            String id = 
FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql,
 registrar).getQueryId());
+            addData(statements);
+            app.start();
+           
+            Multimap<Long, BindingSet> actual = HashMultimap.create();
+            try (KafkaConsumer<String, BindingSet> consumer = new 
KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) {
+                consumer.subscribe(Arrays.asList(id));
+                long end = System.currentTimeMillis() + 4*periodMult*1000;
+                long lastBinId = 0L;
+                long binId = 0L;
+                List<Long> ids = new ArrayList<>();
+                while (System.currentTimeMillis() < end) {
+                    ConsumerRecords<String, BindingSet> records = 
consumer.poll(periodMult*1000);
+                    for(ConsumerRecord<String, BindingSet> record: records){
+                        BindingSet result = record.value();
+                        binId = 
Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue());
+                        if(lastBinId != binId) {
+                            lastBinId = binId;
+                            ids.add(binId);
+                        }
+                        actual.put(binId, result);
+                    }
+                }
+                
+                Map<Long, Set<BindingSet>> expected = new HashMap<>();
+                
+                Set<BindingSet> expected1 = new HashSet<>();
+                QueryBindingSet bs1 = new QueryBindingSet();
+                bs1.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, 
vf.createLiteral(ids.get(0)));
+                bs1.addBinding("total", new LiteralImpl("2", 
XMLSchema.INTEGER));
+                bs1.addBinding("type", vf.createLiteral("airplane"));
+                
+                QueryBindingSet bs2 = new QueryBindingSet();
+                bs2.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, 
vf.createLiteral(ids.get(0)));
+                bs2.addBinding("total", new LiteralImpl("2", 
XMLSchema.INTEGER));
+                bs2.addBinding("type", vf.createLiteral("ship"));
+                
+                QueryBindingSet bs3 = new QueryBindingSet();
+                bs3.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, 
vf.createLiteral(ids.get(0)));
+                bs3.addBinding("total", new LiteralImpl("1", 
XMLSchema.INTEGER));
+                bs3.addBinding("type", vf.createLiteral("automobile"));
+                
+                expected1.add(bs1);
+                expected1.add(bs2);
+                expected1.add(bs3);
+                
+                Set<BindingSet> expected2 = new HashSet<>();
+                QueryBindingSet bs4 = new QueryBindingSet();
+                bs4.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, 
vf.createLiteral(ids.get(1)));
+                bs4.addBinding("total", new LiteralImpl("2", 
XMLSchema.INTEGER));
+                bs4.addBinding("type", vf.createLiteral("airplane"));
+                
+                QueryBindingSet bs5 = new QueryBindingSet();
+                bs5.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, 
vf.createLiteral(ids.get(1)));
+                bs5.addBinding("total", new LiteralImpl("2", 
XMLSchema.INTEGER));
+                bs5.addBinding("type", vf.createLiteral("ship"));
+                
+                expected2.add(bs4);
+                expected2.add(bs5);
+                
+                Set<BindingSet> expected3 = new HashSet<>();
+                QueryBindingSet bs6 = new QueryBindingSet();
+                bs6.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, 
vf.createLiteral(ids.get(2)));
+                bs6.addBinding("total", new LiteralImpl("1", 
XMLSchema.INTEGER));
+                bs6.addBinding("type", vf.createLiteral("ship"));
+                
+                QueryBindingSet bs7 = new QueryBindingSet();
+                bs7.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, 
vf.createLiteral(ids.get(2)));
+                bs7.addBinding("total", new LiteralImpl("1", 
XMLSchema.INTEGER));
+                bs7.addBinding("type", vf.createLiteral("airplane"));
+                
+                expected3.add(bs6);
+                expected3.add(bs7);
+                
+                expected.put(ids.get(0), expected1);
+                expected.put(ids.get(1), expected2);
+                expected.put(ids.get(2), expected3);
+                
+                Assert.assertEquals(3, actual.asMap().size());
+                for(Long ident: ids) {
+                    Assert.assertEquals(expected.get(ident), 
actual.get(ident));
+                }
+            }
+            
+            Set<BindingSet> expectedResults = new HashSet<>();
+            try (CloseableIterator<BindingSet> results = 
storage.listResults(id, Optional.empty())) {
+                results.forEachRemaining(x -> expectedResults.add(x));
+                Assert.assertEquals(0, expectedResults.size());
+            }
+        }
+    }
+    
+    
+    @Test
+    public void periodicApplicationWithAggTest() throws Exception {
+
+        String sparql = "prefix function: <http://org.apache.rya/function#> " 
// n
+                + "prefix time: <http://www.w3.org/2006/time#> " // n
+                + "select (count(?obs) as ?total) where {" // n
+                + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // 
n
+                + "?obs <uri:hasTime> ?time. " // n
+                + "?obs <uri:hasId> ?id } "; // n
+        
+        //make data
+        int periodMult = 15;
+        final ValueFactory vf = new ValueFactoryImpl();
+        final DatatypeFactory dtf = DatatypeFactory.newInstance();
+        //Sleep until current time aligns nicely with period to make
+        //results more predictable
+        while(System.currentTimeMillis() % (periodMult*1000) > 500);
+        ZonedDateTime time = ZonedDateTime.now();
+
+        ZonedDateTime zTime1 = time.minusSeconds(2*periodMult);
+        String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult);
+        String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult);
+        String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
+
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
+                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
+                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
+                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_2")),
+                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_3")));
+        
+        try (FluoClient fluo = 
FluoClientFactory.getFluoClient(conf.getFluoAppName(), 
Optional.of(conf.getFluoTableName()), conf)) {
+            Connector connector = ConfigUtils.getConnector(conf);
+            PeriodicQueryResultStorage storage = new 
AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
+            CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, 
storage);
+            String id = 
FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql,
 registrar).getQueryId());
+            addData(statements);
+            app.start();
+            
+            Multimap<Long, BindingSet> expected = HashMultimap.create();
+            try (KafkaConsumer<String, BindingSet> consumer = new 
KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) {
+                consumer.subscribe(Arrays.asList(id));
+                long end = System.currentTimeMillis() + 4*periodMult*1000;
+                long lastBinId = 0L;
+                long binId = 0L;
+                List<Long> ids = new ArrayList<>();
+                while (System.currentTimeMillis() < end) {
+                    ConsumerRecords<String, BindingSet> records = 
consumer.poll(periodMult*1000);
+                    for(ConsumerRecord<String, BindingSet> record: records){
+                        BindingSet result = record.value();
+                        binId = 
Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue());
+                        if(lastBinId != binId) {
+                            lastBinId = binId;
+                            ids.add(binId);
+                        }
+                        expected.put(binId, result);
+                    }
+                }
+                
+                Assert.assertEquals(3, expected.asMap().size());
+                int i = 0;
+                for(Long ident: ids) {
+                    Assert.assertEquals(1, expected.get(ident).size());
+                    BindingSet bs = expected.get(ident).iterator().next();
+                    Value val = bs.getValue("total");
+                    int total = Integer.parseInt(val.stringValue());
+                    Assert.assertEquals(3-i, total);
+                    i++;
+                }
+            }
+            
+            
+            Set<BindingSet> expectedResults = new HashSet<>();
+            try (CloseableIterator<BindingSet> results = 
storage.listResults(id, Optional.empty())) {
+                results.forEachRemaining(x -> expectedResults.add(x));
+                Assert.assertEquals(0, expectedResults.size());
+            }
+        }
+
+    }
+    
+    
+    @Test
+    public void periodicApplicationTest() throws Exception {
+
+        String sparql = "prefix function: <http://org.apache.rya/function#> " 
// n
+                + "prefix time: <http://www.w3.org/2006/time#> " // n
+                + "select ?obs ?id where {" // n
+                + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // 
n
+                + "?obs <uri:hasTime> ?time. " // n
+                + "?obs <uri:hasId> ?id } "; // n
+        
+        //make data
+        int periodMult = 15;
+        final ValueFactory vf = new ValueFactoryImpl();
+        final DatatypeFactory dtf = DatatypeFactory.newInstance();
+        //Sleep until current time aligns nicely with period to make
+        //results more predictable
+        while(System.currentTimeMillis() % (periodMult*1000) > 500);
+        ZonedDateTime time = ZonedDateTime.now();
+
+        ZonedDateTime zTime1 = time.minusSeconds(2*periodMult);
+        String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime2 = zTime1.minusSeconds(periodMult);
+        String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime3 = zTime2.minusSeconds(periodMult);
+        String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
+
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
+                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
+                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
+                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_2")),
+                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_3")));
+        
+        try (FluoClient fluo = 
FluoClientFactory.getFluoClient(conf.getFluoAppName(), 
Optional.of(conf.getFluoTableName()), conf)) {
+            Connector connector = ConfigUtils.getConnector(conf);
+            PeriodicQueryResultStorage storage = new 
AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix());
+            CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, 
storage);
+            String id = 
FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql,
 registrar).getQueryId());
+            addData(statements);
+            app.start();
+           
+            Multimap<Long, BindingSet> expected = HashMultimap.create();
+            try (KafkaConsumer<String, BindingSet> consumer = new 
KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) {
+                consumer.subscribe(Arrays.asList(id));
+                long end = System.currentTimeMillis() + 4*periodMult*1000;
+                long lastBinId = 0L;
+                long binId = 0L;
+                List<Long> ids = new ArrayList<>();
+                while (System.currentTimeMillis() < end) {
+                    ConsumerRecords<String, BindingSet> records = 
consumer.poll(periodMult*1000);
+                    for(ConsumerRecord<String, BindingSet> record: records){
+                        BindingSet result = record.value();
+                        binId = 
Long.parseLong(result.getBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID).getValue().stringValue());
+                        if(lastBinId != binId) {
+                            lastBinId = binId;
+                            ids.add(binId);
+                        }
+                        expected.put(binId, result);
+                    }
+                }
+                
+                Assert.assertEquals(3, expected.asMap().size());
+                int i = 0;
+                for(Long ident: ids) {
+                    Assert.assertEquals(3-i, expected.get(ident).size());
+                    i++;
+                }
+            }
+            
+            
+            Set<BindingSet> expectedResults = new HashSet<>();
+            try (CloseableIterator<BindingSet> results = 
storage.listResults(id, Optional.empty())) {
+                results.forEachRemaining(x -> expectedResults.add(x));
+                Assert.assertEquals(0, expectedResults.size());
+            }
+        }
+
+    }
+    
+    
+    @After
+    public void shutdown() {
+        registrar.close();
+        app.stop();
+    }
+    
+    private void addData(Collection<Statement> statements) throws 
DatatypeConfigurationException {
+        // add statements to Fluo
+        try (FluoClient fluo = new FluoClientImpl(getFluoConfiguration())) {
+            InsertTriples inserter = new InsertTriples();
+            statements.forEach(x -> inserter.insert(fluo, 
RdfToRyaConversions.convertStatement(x)));
+            getMiniFluo().waitForObservers();
+        }
+    }
+
+    private static Properties 
getKafkaProperties(PeriodicNotificationApplicationConfiguration conf) { 
+        Properties kafkaProps = new Properties();
+        kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers);
+        kafkaProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
UUID.randomUUID().toString());
+        kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
conf.getNotificationGroupId());
+        kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        return kafkaProps;
+    }
+    
+    private Properties getProps() throws IOException {
+        
+        Properties props = new Properties();
+        try(InputStream in = new 
FileInputStream("src/test/resources/notification.properties")) {
+            props.load(in);
+        } 
+        
+        FluoConfiguration fluoConf = getFluoConfiguration();
+        props.setProperty("accumulo.user", getUsername());
+        props.setProperty("accumulo.password", getPassword());
+        props.setProperty("accumulo.instance", 
getMiniAccumuloCluster().getInstanceName());
+        props.setProperty("accumulo.zookeepers", 
getMiniAccumuloCluster().getZooKeepers());
+        props.setProperty("accumulo.rya.prefix", getRyaInstanceName());
+        
props.setProperty(PeriodicNotificationApplicationConfiguration.FLUO_APP_NAME, 
fluoConf.getApplicationName());
+        
props.setProperty(PeriodicNotificationApplicationConfiguration.FLUO_TABLE_NAME, 
fluoConf.getAccumuloTable());
+        return props;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
 
b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
new file mode 100644
index 0000000..e05ca6f
--- /dev/null
+++ 
b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.application;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.fluo.recipes.test.AccumuloExportITBase;
+import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj;
+import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
+import 
org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
+import 
org.apache.rya.periodic.notification.notification.TimestampedNotification;
+import 
org.apache.rya.periodic.notification.recovery.PeriodicNotificationProvider;
+import org.junit.Assert;
+import org.junit.Test;
+import org.openrdf.query.MalformedQueryException;
+
+import com.google.common.collect.Sets;
+
+public class PeriodicNotificationProviderIT extends AccumuloExportITBase {
+
+    @Test
+    public void testProvider() throws MalformedQueryException, 
InterruptedException, UnsupportedQueryException {
+        
+        String sparql = "prefix function: <http://org.apache.rya/function#> " 
// n
+                + "prefix time: <http://www.w3.org/2006/time#> " // n
+                + "select ?id (count(?obs) as ?total) where {" // n
+                + "Filter(function:periodic(?time, 1, .25, time:minutes)) " // 
n
+                + "?obs <uri:hasTime> ?time. " // n
+                + "?obs <uri:hasId> ?id } group by ?id"; // n
+        
+        BlockingQueue<TimestampedNotification> notifications = new 
LinkedBlockingQueue<>();
+        PeriodicNotificationCoordinatorExecutor coord = new 
PeriodicNotificationCoordinatorExecutor(2, notifications);
+        PeriodicNotificationProvider provider = new 
PeriodicNotificationProvider();
+        CreateFluoPcj pcj = new CreateFluoPcj();
+        
+        String id = null;
+        try(FluoClient fluo = new FluoClientImpl(getFluoConfiguration())) {
+            id = pcj.createPcj(FluoQueryUtils.createNewPcjId(), sparql, 
Sets.newHashSet(), fluo).getQueryId();
+            provider.processRegisteredNotifications(coord, fluo.newSnapshot());
+        }
+        
+        TimestampedNotification notification = notifications.take();
+        Assert.assertEquals(5000, notification.getInitialDelay());
+        Assert.assertEquals(15000, notification.getPeriod());
+        Assert.assertEquals(TimeUnit.MILLISECONDS, notification.getTimeUnit());
+        Assert.assertEquals(FluoQueryUtils.convertFluoQueryIdToPcjId(id), 
notification.getId());
+        
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
 
b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
new file mode 100644
index 0000000..874e7e2
--- /dev/null
+++ 
b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.exporter;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.kafka.base.KafkaITBase;
+import org.apache.rya.kafka.base.KafkaTestInstanceRule;
+import org.apache.rya.periodic.notification.api.BindingSetRecord;
+import org.apache.rya.periodic.notification.serialization.BindingSetSerDe;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+public class PeriodicNotificationExporterIT extends KafkaITBase {
+
+
+    @Rule
+    public KafkaTestInstanceRule kafkaTestInstanceRule = new 
KafkaTestInstanceRule(false);
+
+
+    private static final ValueFactory vf = new ValueFactoryImpl();
+
+    @Test
+    public void testExporter() throws InterruptedException {
+
+        final String topic1 = kafkaTestInstanceRule.getKafkaTopicName() + "1";
+        final String topic2 = kafkaTestInstanceRule.getKafkaTopicName() + "2";
+
+        kafkaTestInstanceRule.createTopic(topic1);
+        kafkaTestInstanceRule.createTopic(topic2);
+
+        final BlockingQueue<BindingSetRecord> records = new 
LinkedBlockingQueue<>();
+
+        final KafkaExporterExecutor exporter = new KafkaExporterExecutor(new 
KafkaProducer<String, BindingSet>(createKafkaProducerConfig()), 1, records);
+        exporter.start();
+        final QueryBindingSet bs1 = new QueryBindingSet();
+        bs1.addBinding(PeriodicQueryResultStorage.PeriodicBinId, 
vf.createLiteral(1L));
+        bs1.addBinding("name", vf.createURI("uri:Bob"));
+        final BindingSetRecord record1 = new BindingSetRecord(bs1, topic1);
+
+        final QueryBindingSet bs2 = new QueryBindingSet();
+        bs2.addBinding(PeriodicQueryResultStorage.PeriodicBinId, 
vf.createLiteral(2L));
+        bs2.addBinding("name", vf.createURI("uri:Joe"));
+        final BindingSetRecord record2 = new BindingSetRecord(bs2, topic2);
+
+        records.add(record1);
+        records.add(record2);
+
+        final Set<BindingSet> expected1 = new HashSet<>();
+        expected1.add(bs1);
+        final Set<BindingSet> expected2 = new HashSet<>();
+        expected2.add(bs2);
+
+        final Set<BindingSet> actual1 = getBindingSetsFromKafka(topic1);
+        final Set<BindingSet> actual2 = getBindingSetsFromKafka(topic2);
+
+        Assert.assertEquals(expected1, actual1);
+        Assert.assertEquals(expected2, actual2);
+
+        exporter.stop();
+    }
+
+
+    private Properties createKafkaProducerConfig() {
+        final Properties props = createBootstrapServerConfig();
+        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
BindingSetSerDe.class.getName());
+        return props;
+    }
+    private Properties createKafkaConsumerConfig() {
+        final Properties props = createBootstrapServerConfig();
+        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
UUID.randomUUID().toString());
+        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
+        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
BindingSetSerDe.class.getName());
+        return props;
+    }
+
+
+    private KafkaConsumer<String, BindingSet> makeBindingSetConsumer(final 
String topicName) {
+        // setup consumer
+        final KafkaConsumer<String, BindingSet> consumer = new 
KafkaConsumer<>(createKafkaConsumerConfig());
+        consumer.subscribe(Arrays.asList(topicName));
+        return consumer;
+    }
+
+    private Set<BindingSet> getBindingSetsFromKafka(final String topicName) {
+        KafkaConsumer<String, BindingSet> consumer = null;
+
+        try {
+            consumer = makeBindingSetConsumer(topicName);
+            final ConsumerRecords<String, BindingSet> records = 
consumer.poll(20000);  // Wait up to 20 seconds for a result to be published.
+
+            final Set<BindingSet> bindingSets = new HashSet<>();
+            records.forEach(x -> bindingSets.add(x.value()));
+
+            return bindingSets;
+
+        } catch (final Exception e) {
+            throw new RuntimeException(e);
+        } finally {
+            if (consumer != null) {
+                consumer.close();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java
 
b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java
new file mode 100644
index 0000000..21109ae
--- /dev/null
+++ 
b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.processor;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.fluo.recipes.test.AccumuloExportITBase;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet;
+import org.apache.rya.periodic.notification.api.BindingSetRecord;
+import org.apache.rya.periodic.notification.api.NodeBin;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+import 
org.apache.rya.periodic.notification.notification.TimestampedNotification;
+import org.junit.Assert;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+public class PeriodicNotificationProcessorIT extends AccumuloExportITBase {
+
+    private static final ValueFactory vf = new ValueFactoryImpl();
+    private static final String RYA_INSTANCE_NAME = "rya_";
+    
+    @Test
+    public void periodicProcessorTest() throws Exception {
+        
+        String id = UUID.randomUUID().toString().replace("-", "");
+        BlockingQueue<TimestampedNotification> notifications = new 
LinkedBlockingQueue<>();
+        BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>();
+        BlockingQueue<BindingSetRecord> bindingSets = new 
LinkedBlockingQueue<>();
+        
+        TimestampedNotification ts1 = new TimestampedNotification(
+                
PeriodicNotification.builder().id(id).initialDelay(0).period(2000).timeUnit(TimeUnit.SECONDS).build());
  
+        long binId1 = 
(ts1.getTimestamp().getTime()/ts1.getPeriod())*ts1.getPeriod();
+        
+        Thread.sleep(2000);
+        
+        TimestampedNotification ts2 = new TimestampedNotification(
+                
PeriodicNotification.builder().id(id).initialDelay(0).period(2000).timeUnit(TimeUnit.SECONDS).build());
  
+        long binId2 = 
(ts2.getTimestamp().getTime()/ts2.getPeriod())*ts2.getPeriod();
+        
+        Set<NodeBin> expectedBins = new HashSet<>();
+        expectedBins.add(new NodeBin(id, binId1));
+        expectedBins.add(new NodeBin(id, binId2));
+        
+        Set<BindingSet> expected = new HashSet<>();
+        Set<VisibilityBindingSet> storageResults = new HashSet<>();
+        
+        QueryBindingSet bs1 = new QueryBindingSet();
+        bs1.addBinding("periodicBinId", vf.createLiteral(binId1));
+        bs1.addBinding("id", vf.createLiteral(1));
+        expected.add(bs1);
+        storageResults.add(new VisibilityBindingSet(bs1));
+        
+        QueryBindingSet bs2 = new QueryBindingSet();
+        bs2.addBinding("periodicBinId", vf.createLiteral(binId1));
+        bs2.addBinding("id", vf.createLiteral(2));
+        expected.add(bs2);
+        storageResults.add(new VisibilityBindingSet(bs2));
+        
+        QueryBindingSet bs3 = new QueryBindingSet();
+        bs3.addBinding("periodicBinId", vf.createLiteral(binId2));
+        bs3.addBinding("id", vf.createLiteral(3));
+        expected.add(bs3);
+        storageResults.add(new VisibilityBindingSet(bs3));
+        
+        QueryBindingSet bs4 = new QueryBindingSet();
+        bs4.addBinding("periodicBinId", vf.createLiteral(binId2));
+        bs4.addBinding("id", vf.createLiteral(4));
+        expected.add(bs4);
+        storageResults.add(new VisibilityBindingSet(bs4));
+        
+        PeriodicQueryResultStorage periodicStorage = new 
AccumuloPeriodicQueryResultStorage(super.getAccumuloConnector(),
+                RYA_INSTANCE_NAME);
+        periodicStorage.createPeriodicQuery(id, "select ?id where {?obs 
<urn:hasId> ?id.}", new VariableOrder("periodicBinId", "id"));
+        periodicStorage.addPeriodicQueryResults(id, storageResults);
+
+        NotificationProcessorExecutor processor = new 
NotificationProcessorExecutor(periodicStorage, notifications, bins, 
bindingSets, 1);
+        processor.start();
+        
+        notifications.add(ts1);
+        notifications.add(ts2);
+
+        Thread.sleep(5000);
+        
+        Assert.assertEquals(expectedBins.size(), bins.size());
+        Assert.assertEquals(true, bins.containsAll(expectedBins));
+        
+        Set<BindingSet> actual = new HashSet<>();
+        bindingSets.forEach(x -> actual.add(x.getBindingSet()));
+        Assert.assertEquals(expected, actual);
+        
+        processor.stop();
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
 
b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
new file mode 100644
index 0000000..830fa46
--- /dev/null
+++ 
b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.pruner;
+
+import java.time.ZonedDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.xml.datatype.DatatypeFactory;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.Span;
+import org.apache.fluo.core.client.FluoClientImpl;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery;
+import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
+import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
+import org.apache.rya.periodic.notification.api.NodeBin;
+import org.junit.Assert;
+import org.junit.Test;
+import org.openrdf.model.Statement;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.LiteralImpl;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.XMLSchema;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.impl.MapBindingSet;
+
+import com.google.common.collect.Sets;
+
+public class PeriodicNotificationBinPrunerIT extends RyaExportITBase {
+
+    
+    @Test
+    public void periodicPrunerTest() throws Exception {
+
+        String sparql = "prefix function: <http://org.apache.rya/function#> " 
// n
+                + "prefix time: <http://www.w3.org/2006/time#> " // n
+                + "select ?id (count(?obs) as ?total) where {" // n
+                + "Filter(function:periodic(?time, 2, .5, time:hours)) " // n
+                + "?obs <uri:hasTime> ?time. " // n
+                + "?obs <uri:hasId> ?id } group by ?id"; // n
+
+        FluoClient fluo = new FluoClientImpl(super.getFluoConfiguration());
+
+        // initialize resources and create pcj
+        PeriodicQueryResultStorage periodicStorage = new 
AccumuloPeriodicQueryResultStorage(super.getAccumuloConnector(),
+                getRyaInstanceName());
+        CreatePeriodicQuery createPeriodicQuery = new 
CreatePeriodicQuery(fluo, periodicStorage);
+        String queryId = 
FluoQueryUtils.convertFluoQueryIdToPcjId(createPeriodicQuery.createPeriodicQuery(sparql).getQueryId());
+
+        // create statements to ingest into Fluo
+        final ValueFactory vf = new ValueFactoryImpl();
+        final DatatypeFactory dtf = DatatypeFactory.newInstance();
+        ZonedDateTime time = ZonedDateTime.now();
+        long currentTime = time.toInstant().toEpochMilli();
+
+        ZonedDateTime zTime1 = time.minusMinutes(30);
+        String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime2 = zTime1.minusMinutes(30);
+        String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime3 = zTime2.minusMinutes(30);
+        String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
+
+        ZonedDateTime zTime4 = zTime3.minusMinutes(30);
+        String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT);
+
+        final Collection<Statement> statements = Sets.newHashSet(
+                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
+                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
+                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
+                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_2")),
+                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+                vf.createStatement(vf.createURI("urn:obs_3"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_3")),
+                vf.createStatement(vf.createURI("urn:obs_4"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
+                vf.createStatement(vf.createURI("urn:obs_4"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_4")),
+                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
+                vf.createStatement(vf.createURI("urn:obs_1"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_1")),
+                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasTime"),
+                        vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
+                vf.createStatement(vf.createURI("urn:obs_2"), 
vf.createURI("uri:hasId"), vf.createLiteral("id_2")));
+
+        // add statements to Fluo
+        InsertTriples inserter = new InsertTriples();
+        statements.forEach(x -> inserter.insert(fluo, 
RdfToRyaConversions.convertStatement(x)));
+
+        super.getMiniFluo().waitForObservers();
+
+        // FluoITHelper.printFluoTable(fluo);
+
+        // Create the expected results of the SPARQL query once the PCJ has 
been
+        // computed.
+        final Set<BindingSet> expected1 = new HashSet<>();
+        final Set<BindingSet> expected2 = new HashSet<>();
+        final Set<BindingSet> expected3 = new HashSet<>();
+        final Set<BindingSet> expected4 = new HashSet<>();
+
+        long period = 1800000;
+        long binId = (currentTime / period) * period;
+
+        long bin1 = binId;
+        long bin2 = binId + period;
+        long bin3 = binId + 2 * period;
+        long bin4 = binId + 3 * period;
+
+        MapBindingSet bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(bin1));
+        expected1.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(bin1));
+        expected1.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(bin1));
+        expected1.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_4", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(bin1));
+        expected1.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(bin2));
+        expected2.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(bin2));
+        expected2.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(bin2));
+        expected2.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(bin3));
+        expected3.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(bin3));
+        expected3.add(bs);
+
+        bs = new MapBindingSet();
+        bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
+        bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
+        bs.addBinding("periodicBinId", vf.createLiteral(bin4));
+        expected4.add(bs);
+
+        // make sure that expected and actual results align after ingest
+        compareResults(periodicStorage, queryId, bin1, expected1);
+        compareResults(periodicStorage, queryId, bin2, expected2);
+        compareResults(periodicStorage, queryId, bin3, expected3);
+        compareResults(periodicStorage, queryId, bin4, expected4);
+
+        BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>();
+        PeriodicQueryPrunerExecutor pruner = new 
PeriodicQueryPrunerExecutor(periodicStorage, fluo, 1, bins);
+        pruner.start();
+
+        bins.add(new NodeBin(queryId, bin1));
+        bins.add(new NodeBin(queryId, bin2));
+        bins.add(new NodeBin(queryId, bin3));
+        bins.add(new NodeBin(queryId, bin4));
+
+        Thread.sleep(10000);
+
+        compareResults(periodicStorage, queryId, bin1, new HashSet<>());
+        compareResults(periodicStorage, queryId, bin2, new HashSet<>());
+        compareResults(periodicStorage, queryId, bin3, new HashSet<>());
+        compareResults(periodicStorage, queryId, bin4, new HashSet<>());
+
+        compareFluoCounts(fluo, queryId, bin1);
+        compareFluoCounts(fluo, queryId, bin2);
+        compareFluoCounts(fluo, queryId, bin3);
+        compareFluoCounts(fluo, queryId, bin4);
+
+        pruner.stop();
+
+    }
+    
+    private void compareResults(PeriodicQueryResultStorage periodicStorage, 
String queryId, long bin, Set<BindingSet> expected) throws 
PeriodicQueryStorageException, Exception {
+        try(CloseableIterator<BindingSet> iter = 
periodicStorage.listResults(queryId, Optional.of(bin))) {
+            Set<BindingSet> actual = new HashSet<>();
+            while(iter.hasNext()) {
+                actual.add(iter.next());
+            }
+            Assert.assertEquals(expected, actual);
+        }
+    }
+    
+    private void compareFluoCounts(FluoClient client, String pcjId, long bin) {
+        QueryBindingSet bs = new QueryBindingSet();
+        bs.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, new 
LiteralImpl(Long.toString(bin), XMLSchema.LONG));
+        
+        VariableOrder varOrder = new 
VariableOrder(IncrementalUpdateConstants.PERIODIC_BIN_ID);
+        
+        try(Snapshot sx = client.newSnapshot()) {
+            String fluoQueryId = NodeType.generateNewIdForType(NodeType.QUERY, 
pcjId);
+            Set<String> ids = new HashSet<>();
+            PeriodicQueryUtil.getPeriodicQueryNodeAncestorIds(sx, fluoQueryId, 
ids);
+            for(String id: ids) {
+                NodeType optNode = NodeType.fromNodeId(id).orNull();
+                if(optNode == null) throw new RuntimeException("Invalid 
NodeType.");
+                Bytes prefix = RowKeyUtil.makeRowKey(id,varOrder, bs);
+                RowScanner scanner = 
sx.scanner().fetch(optNode.getResultColumn()).over(Span.prefix(prefix)).byRow().build();
+                int count = 0;
+                Iterator<ColumnScanner> colScannerIter = scanner.iterator();
+                while(colScannerIter.hasNext()) {
+                    ColumnScanner colScanner = colScannerIter.next();
+                    String row = colScanner.getRow().toString();
+                    Iterator<ColumnValue> values = colScanner.iterator();
+                    while(values.hasNext()) {
+                        values.next();
+                        count++;
+                    }
+                }
+                Assert.assertEquals(0, count);
+            }
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
 
b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
new file mode 100644
index 0000000..522e69d
--- /dev/null
+++ 
b/extras/periodic.notification/tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */package org.apache.rya.periodic.notification.registration.kafka;
+
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.log4j.BasicConfigurator;
+import org.apache.rya.kafka.base.KafkaITBase;
+import org.apache.rya.kafka.base.KafkaTestInstanceRule;
+import 
org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import 
org.apache.rya.periodic.notification.notification.TimestampedNotification;
+import 
org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient;
+import 
org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+public class PeriodicCommandNotificationConsumerIT extends KafkaITBase {
+
+    private KafkaNotificationRegistrationClient registration;
+    private PeriodicNotificationCoordinatorExecutor coord;
+    private KafkaNotificationProvider provider;
+    private String bootstrapServer;
+    
+    @Rule
+    public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(false);
+    
+    @Before
+    public void init() throws Exception {
+        bootstrapServer = 
createBootstrapServerConfig().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+    }
+
+    @Test
+    public void kafkaNotificationProviderTest() throws InterruptedException {
+
+        BasicConfigurator.configure();
+
+        BlockingQueue<TimestampedNotification> notifications = new 
LinkedBlockingQueue<>();
+        Properties props = createKafkaConfig();
+        KafkaProducer<String, CommandNotification> producer = new 
KafkaProducer<>(props);
+        String topic = rule.getKafkaTopicName();
+        rule.createTopic(topic);
+        
+        registration = new KafkaNotificationRegistrationClient(topic, 
producer);
+        coord = new PeriodicNotificationCoordinatorExecutor(1, notifications);
+        provider = new KafkaNotificationProvider(topic, new 
StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1);
+        provider.start();
+
+        registration.addNotification("1", 1, 0, TimeUnit.SECONDS);
+        Thread.sleep(4000);
+        // check that notifications are being added to the blocking queue
+        Assert.assertEquals(true, notifications.size() > 0);
+
+        registration.deleteNotification("1");
+        Thread.sleep(2000);
+        int size = notifications.size();
+        // sleep for 2 seconds to ensure no more messages being produced
+        Thread.sleep(2000);
+        Assert.assertEquals(size, notifications.size());
+        
+        tearDown();
+    }
+
+    @Test
+    public void kafkaNotificationMillisProviderTest() throws 
InterruptedException {
+
+        BasicConfigurator.configure();
+
+        BlockingQueue<TimestampedNotification> notifications = new 
LinkedBlockingQueue<>();
+        Properties props = createKafkaConfig();
+        KafkaProducer<String, CommandNotification> producer = new 
KafkaProducer<>(props);
+        String topic = rule.getKafkaTopicName();
+        rule.createTopic(topic);
+        
+        registration = new KafkaNotificationRegistrationClient(topic, 
producer);
+        coord = new PeriodicNotificationCoordinatorExecutor(1, notifications);
+        provider = new KafkaNotificationProvider(topic, new 
StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1);
+        provider.start();
+
+        registration.addNotification("1", 1000, 0, TimeUnit.MILLISECONDS);
+        Thread.sleep(4000);
+        // check that notifications are being added to the blocking queue
+        Assert.assertEquals(true, notifications.size() > 0);
+
+        registration.deleteNotification("1");
+        Thread.sleep(2000);
+        int size = notifications.size();
+        // sleep for 2 seconds to ensure no more messages being produced
+        Thread.sleep(2000);
+        Assert.assertEquals(size, notifications.size());
+        
+        tearDown();
+    }
+
+    private void tearDown() {
+        registration.close();
+        provider.stop();
+        coord.stop();
+    }
+
+    private Properties createKafkaConfig() {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServer);
+        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
UUID.randomUUID().toString());
+        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0");
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
CommandNotificationSerializer.class.getName());
+
+        return props;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/tests/src/test/resources/log4j.properties 
b/extras/periodic.notification/tests/src/test/resources/log4j.properties
new file mode 100644
index 0000000..19cc13c
--- /dev/null
+++ b/extras/periodic.notification/tests/src/test/resources/log4j.properties
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Valid levels:
+# TRACE, DEBUG, INFO, WARN, ERROR and FATAL
+log4j.rootLogger=INFO, CONSOLE
+
+# Set independent logging levels
+log4j.logger.org.apache.zookeeper=WARN
+log4j.logger.kafka=WARN
+log4j.logger.org.apache.kafka=WARN
+
+# LOGFILE is set to be a File appender using a PatternLayout.
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+#log4j.appender.CONSOLE.Threshold=DEBUG
+
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
+
+#log4j.appender.CONSOLE.layout=org.apache.log4j.EnhancedPatternLayout
+#log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p %c{1.} - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/periodic.notification/tests/src/test/resources/notification.properties
----------------------------------------------------------------------
diff --git 
a/extras/periodic.notification/tests/src/test/resources/notification.properties 
b/extras/periodic.notification/tests/src/test/resources/notification.properties
new file mode 100644
index 0000000..4b25b93
--- /dev/null
+++ 
b/extras/periodic.notification/tests/src/test/resources/notification.properties
@@ -0,0 +1,35 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#/
+accumulo.auths=
+accumulo.instance="instance"
+accumulo.user="root"
+accumulo.password="secret"
+accumulo.rya.prefix="rya_"
+accumulo.zookeepers=
+fluo.app.name="fluo_app"
+fluo.table.name="fluo_table"
+kafka.bootstrap.servers=127.0.0.1:9092
+kafka.notification.topic=notifications
+kafka.notification.client.id=consumer0
+kafka.notification.group.id=group0
+cep.coordinator.threads=1
+cep.producer.threads=1
+cep.exporter.threads=1
+cep.processor.threads=1
+cep.pruner.threads=1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/de365c17/extras/pom.xml
----------------------------------------------------------------------
diff --git a/extras/pom.xml b/extras/pom.xml
index 82930bc..53c7b4f 100644
--- a/extras/pom.xml
+++ b/extras/pom.xml
@@ -33,7 +33,7 @@ under the License.
     <modules>
         <module>rya.prospector</module>
         <module>rya.manual</module>
-        <module>rya.periodic.service</module>
+        <module>periodic.notification</module>
         <module>shell</module>
         <module>indexing</module>
         <module>rya.indexing.pcj</module>


Reply via email to