Github user kchilton2 commented on a diff in the pull request:
https://github.com/apache/incubator-rya/pull/272#discussion_r168276972
--- Diff:
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
---
@@ -0,0 +1,884 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChangeLogListener;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import
org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
+import
org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import
com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A service for managing {@link StreamsQuery} running on a Rya Streams
system.
+ * <p>
+ * Only one QueryManager needs to be running to manage any number of rya
+ * instances/rya streams instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryManager extends AbstractService {
+ private static final Logger log =
LoggerFactory.getLogger(QueryManager.class);
+
+ /**
+ * The source of {@link QueryChangeLog}s. Each log discovered is bound
to a specific
+ * Rya instnace.
+ */
+ private final QueryChangeLogSource changeLogSource;
+
+ /**
+ * The engine that is responsible for executing {@link StreamsQuery}s.
+ */
+ private final QueryExecutor queryExecutor;
+
+ /**
+ * How long blocking operations will be attempted before potentially
trying again.
+ */
+ private final long blockingValue;
+
+ /**
+ * The units for {@link #blockingValue}.
+ */
+ private final TimeUnit blockingUnits;
+
+ /**
+ * Used to inform threads that the application is shutting down, so
they must stop work.
+ */
+ private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+ /**
+ * This thread pool manages the two thread used to work the {@link
LogEvent}s
+ * and the {@link QueryEvent}s.
+ */
+ private final ExecutorService executor =
Executors.newFixedThreadPool(2);
+
+ /**
+ * Creates a new {@link QueryManager}.
+ *
+ * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not
null)
+ * @param source - The {@link QueryChangeLogSource} of
QueryChangeLogs. (not null)
+ * @param blockingValue - How long blocking operations will try before
looping. (> 0)
+ * @param blockingUnits - The units of the {@code blockingValue}. (not
null)
+ */
+ public QueryManager(
+ final QueryExecutor queryExecutor,
+ final QueryChangeLogSource source,
+ final long blockingValue,
+ final TimeUnit blockingUnits) {
+ this.changeLogSource = requireNonNull(source);
+ this.queryExecutor = requireNonNull(queryExecutor);
+ Preconditions.checkArgument(blockingValue > 0, "The blocking value
must be > 0. Was: " + blockingValue);
+ this.blockingValue = blockingValue;
+ this.blockingUnits = requireNonNull(blockingUnits);
+ }
+
+ @Override
+ protected void doStart() {
+ log.info("Starting a QueryManager.");
+
+ // A work queue of discovered Query Change Logs that need to be
handled.
+ // This queue exists so that the source notifying thread may be
released
+ // immediately instead of calling into blocking functions.
+ final BlockingQueue<LogEvent> logEvents = new
ArrayBlockingQueue<>(1024);
+
+ // A work queue of discovered Query Changes from the monitored
Query Change Logs
+ // that need to be handled. This queue exists so that the Query
Repository notifying
+ // thread may be released immediately instead of calling into
blocking functions.
+ final BlockingQueue<QueryEvent> queryEvents = new
ArrayBlockingQueue<>(1024);
+
+ try {
+ // Start up a LogEventWorker using the executor service.
+ executor.submit(new LogEventWorker(logEvents, queryEvents,
blockingValue, blockingUnits, shutdownSignal));
+
+ // Start up a QueryEvent Worker using the executor service.
+ executor.submit(new QueryEventWorker(queryEvents,
queryExecutor, blockingValue, blockingUnits, shutdownSignal));
+
+ // Start up the query execution framework.
+ queryExecutor.startAndWait();
+
+ // Startup the source that discovers new Query Change Logs.
+ changeLogSource.startAndWait();
+
+ // Subscribe the source a listener that writes to the
LogEventWorker's work queue.
+ changeLogSource.subscribe(new LogEventWorkGenerator(logEvents,
blockingValue, blockingUnits, shutdownSignal));
+ } catch(final RejectedExecutionException |
UncheckedExecutionException e) {
+ log.error("Could not start up a QueryManager.", e);
+ notifyFailed(e);
+ }
+
+ // Notify the service was successfully started.
+ notifyStarted();
+
+ log.info("QueryManager has finished starting.");
+ }
+
+ @Override
+ protected void doStop() {
+ log.info("Stopping a QueryManager.");
+
+ // Set the shutdown flag so that all components that rely on that
signal will stop processing.
+ shutdownSignal.set(true);
+
+ // Stop the workers and wait for them to die.
+ executor.shutdownNow();
+ try {
+ if(!executor.awaitTermination(10, TimeUnit.SECONDS)) {
+ log.warn("Waited 10 seconds for the worker threads to die,
but they are still running.");
+ }
+ } catch (final InterruptedException e) {
+ log.warn("Waited 10 seconds for the worker threads to die, but
they are still running.");
+ }
+
+ // Stop the source of new Change Logs.
+ try {
+ changeLogSource.stopAndWait();
+ } catch(final UncheckedExecutionException e) {
+ log.warn("Could not stop the Change Log Source.", e);
+ }
+
+ // Stop the query execution framework.
+ try {
+ queryExecutor.stopAndWait();
+ } catch(final UncheckedExecutionException e) {
+ log.warn("Could not stop the Query Executor", e);
+ }
+
+ // Notify the service was successfully stopped.
+ notifyStopped();
+
+ log.info("QueryManager has finished stopping.");
+ }
+
+ /**
+ * Offer a unit of work to a blocking queue until it is either
accepted, or the
+ * shutdown signal is set.
+ *
+ * @param workQueue - The blocking work queue to write to. (not null)
+ * @param event - The event that will be offered to the work queue.
(not null)
+ * @param offerValue - How long to wait when offering new work.
+ * @param offerUnits - The unit for the {@code offerValue}. (not null)
+ * @param shutdownSignal - Used to signal application shutdown has
started, so
+ * this method may terminate without ever placing the event on the
queue. (not null)
+ * @return {@code true} if the evet nwas added to the queue, otherwise
false.
+ */
+ private static <T> boolean offerUntilAcceptedOrShutdown(
+ final BlockingQueue<T> workQueue,
+ final T event, final
+ long offerValue,
+ final TimeUnit offerUnits,
+ final AtomicBoolean shutdownSignal) {
+ requireNonNull(workQueue);
+ requireNonNull(event);
+ requireNonNull(shutdownSignal);
+
+ boolean submitted = false;
+ while(!submitted && !shutdownSignal.get()) {
+ try {
+ submitted = workQueue.offer(event, offerValue, offerUnits);
+ if(!submitted) {
+ log.debug("An event could not be added to a work queue
after waiting 5 seconds. Trying again...");
+ }
+ } catch (final InterruptedException e) {
+ log.debug("An event could not be added to a work queue
after waiting 5 seconds. Trying again...");
+ }
+ }
+ return submitted;
+ }
+
+ /**
+ * An observation that a {@link QueryChangeLog} was created within or
+ * removed from a {@link QueryChangeLogSource}.
+ */
+ @DefaultAnnotation(NonNull.class)
+ static class LogEvent {
+
+ /**
+ * The types of events that may be observed.
+ */
+ static enum LogEventType {
+ /**
+ * A {@link QueryChangeLog} was created within a {@link
QueryChangeLogSource}.
+ */
+ CREATE,
+
+ /**
+ * A {@link QueryChangeLog} was deleted from a {@link
QueryChangeLogSource}.
+ */
+ DELETE;
+ }
+
+ private final String ryaInstance;
+ private final LogEventType eventType;
+ private final Optional<QueryChangeLog> log;
+
+ /**
+ * Constructs an instance of {@link LogEvent}.
+ *
+ * @param ryaInstance - The Rya Instance the log is/was for. (not
null)
+ * @param eventType - The type of event that was observed. (not
null)
+ * @param log - The log if this is a create event. (not null)
+ */
+ private LogEvent(final String ryaInstance, final LogEventType
eventType, final Optional<QueryChangeLog> log) {
+ this.ryaInstance = requireNonNull(ryaInstance);
+ this.eventType = requireNonNull(eventType);
+ this.log = requireNonNull(log);
+ }
+
+ /**
+ * @return The Rya Instance whose log was either created or
deleted.
+ */
+ public String getRyaInstanceName() {
+ return ryaInstance;
+ }
+
+ /**
+ * @return The type of event that was observed.
+ */
+ public LogEventType getEventType() {
+ return eventType;
+ }
+
+ /**
+ * @return The {@link QueryChangeLog} if this is a CREATE event.
+ */
+ public Optional<QueryChangeLog> getQueryChangeLog() {
+ return log;
+ }
+
+ @Override
+ public String toString() {
+ return "LogEvent {\n" +
+ " Rya Instance: " + ryaInstance + ",\n" +
+ " Event Type: " + eventType + "\n" +
+ "}";
+ }
+
+ /**
+ * Make a {@link LogEvent} that indicates a {@link QueryChangeLog}
was created within a
+ * {@link QueryChangeLogSource}.
+ *
+ * @param ryaInstance - The Rya Instance the created log is for.
(not null)
+ * @param log - The created {@link QueryChangeLog. (not null)
+ * @return A {@link LogEvent} built using the provided values.
+ */
+ public static LogEvent create(final String ryaInstance, final
QueryChangeLog log) {
+ return new LogEvent(ryaInstance, LogEventType.CREATE
,Optional.of(log));
+ }
+
+ /**
+ * Make a {@link LogEvent} that indicates a {@link QueryChangeLog}
was deleted from
+ * a {@link QueryChangeLogSource}.
+ *
+ * @param ryaInstance - The Rya Instance whose log was deleted.
(not null)
+ * @return A {@link LogEvent} built using the provided values.
+ */
+ public static LogEvent delete(final String ryaInstance) {
+ return new LogEvent(ryaInstance, LogEventType.DELETE,
Optional.empty());
+ }
+ }
+
+ /**
+ * An observation that a {@link StreamsQuery} needs to be executing or
not
+ * via the provided {@link QueryExecutor}.
+ */
+ @DefaultAnnotation(NonNull.class)
+ static class QueryEvent {
+
+ /**
+ * The type of events that may be observed.
+ */
+ public static enum QueryEventType {
+ /**
+ * Indicates a {@link StreamsQuery} needs to be executing.
+ */
+ EXECUTING,
+
+ /**
+ * Indicates a {@link StreamsQuery} needs to be stopped.
+ */
+ STOPPED,
+
+ /**
+ * Indicates all {@link StreamsQuery}s for a Rya instance need
to be stopped.
+ */
+ STOP_ALL;
+ }
+
+ private final String ryaInstance;
+ private final QueryEventType type;
+ private final Optional<UUID> queryId;
+ private final Optional<StreamsQuery> query;
+
+ /**
+ * Constructs an instance of {@link QueryEvent}.
+ *
+ * @param ryaInstance - The Rya instance that generated the event.
(not null)
+ * @param type - Indicates whether the query needs to be executing
or not. (not null)
+ * @param queryId - If stopped, the ID of the query that must not
be running. (not null)
+ * @param query - If executing, the StreamsQuery that defines what
should be executing. (not null)
+ */
+ private QueryEvent(
+ final String ryaInstance,
+ final QueryEventType type,
+ final Optional<UUID> queryId,
+ final Optional<StreamsQuery> query) {
+ this.ryaInstance = requireNonNull(ryaInstance);
+ this.type = requireNonNull(type);
+ this.queryId = requireNonNull(queryId);
+ this.query = requireNonNull(query);
+ }
+
+ /**
+ * @return The Rya instance that generated the event.
+ */
+ public String getRyaInstance() {
+ return ryaInstance;
+ }
+
+ /**
+ * @return Indicates whether the query needs to be executing or
not.
+ */
+ public QueryEventType getType() {
+ return type;
+ }
+
+ /**
+ * @return If stopped, the ID of the query that must not be
running. Otherwise absent.
+ */
+ public Optional<UUID> getQueryId() {
+ return queryId;
+ }
+
+ /**
+ * @return If executing, the StreamsQuery that defines what should
be executing. Otherwise absent.
+ */
+ public Optional<StreamsQuery> getStreamsQuery() {
+ return query;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(ryaInstance, type, queryId, query);
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if(o instanceof QueryEvent) {
+ final QueryEvent other = (QueryEvent) o;
+ return Objects.equals(ryaInstance, other.ryaInstance) &&
+ Objects.equals(type, other.type) &&
+ Objects.equals(queryId, other.queryId) &&
+ Objects.equals(query, other.query);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder string = new StringBuilder();
+ string.append("Query Event {\n")
+ .append(" Rya
Instance:").append(ryaInstance).append(",\n")
+ .append(" Type: ").append(type).append(",\n");
+ switch(type) {
+ case EXECUTING:
+ append(string, query.get());
+ break;
+ case STOPPED:
+ string.append(" Query ID:
").append(queryId.get()).append("\n");
+ break;
+ case STOP_ALL:
+ break;
+ default:
+ // Default to showing everything that is in the object.
+ string.append(" Query ID:
").append(queryId.get()).append("\n");
+ append(string, query.get());
+ break;
+ }
+ string.append("}");
+ return string.toString();
+ }
+
+ private void append(final StringBuilder string, final StreamsQuery
query) {
+ requireNonNull(string);
+ requireNonNull(query);
+ string.append(" Streams Query {\n")
+ .append(" Query ID:
").append(query.getQueryId()).append(",\n")
+ .append(" Is Active:
").append(query.isActive()).append(",\n")
+ .append(" SPARQL:
").append(query.getSparql()).append("\n")
+ .append(" }");
+ }
+
+ /**
+ * Create a {@link QueryEvent} that indicates a query needs to be
executing.
+ *
+ * @param ryaInstance - The Rya instance that generated the event.
(not null)
+ * @param query - The StreamsQuery that defines what should be
executing. (not null)
+ * @return A {@link QueryEvent} built using the provided values.
+ */
+ public static QueryEvent executing(final String ryaInstance, final
StreamsQuery query) {
+ return new QueryEvent(ryaInstance, QueryEventType.EXECUTING,
Optional.empty(), Optional.of(query));
+ }
+
+ /**
+ * Create a {@link QueryEvent} that indicates a query needs to be
stopped.
+ *
+ * @param ryaInstance - The Rya instance that generated the event.
(not null)
+ * @param queryId - The ID of the query that must not be running.
(not null)
+ * @return A {@link QueryEvent} built using the provided values.
+ */
+ public static QueryEvent stopped(final String ryaInstance, final
UUID queryId) {
+ return new QueryEvent(ryaInstance, QueryEventType.STOPPED,
Optional.of(queryId), Optional.empty());
+ }
+
+ /**
+ * Create a {@link QueryEvent} that indicates all queries for a
Rya instance needs to be stopped.
+ *
+ * @param ryaInstance - The Rya instance that generated the event.
(not null)
+ * @return A {@link QueryEvent} built using the provided values.
+ */
+ public static QueryEvent stopALL(final String ryaInstance) {
+ return new QueryEvent(ryaInstance, QueryEventType.STOP_ALL,
Optional.empty(), Optional.empty());
+ }
+ }
+
+ /**
+ * Listens to a {@link QueryChangeLogSource} and adds observations to
the provided
+ * work queue. It does so until the provided shutdown signal is set.
+ */
+ @DefaultAnnotation(NonNull.class)
+ static class LogEventWorkGenerator implements SourceListener {
+
+ private final BlockingQueue<LogEvent> workQueue;
+ private final AtomicBoolean shutdownSignal;
+ private final long offerValue;
+ private final TimeUnit offerUnits;
+
+ /**
+ * Constructs an instance of {@link QueryManagerSourceListener}.
+ *
+ * @param workQueue - A blocking queue that will have {@link
LogEvent}s offered to it. (not null)
+ * @param offerValue - How long to wait when offering new work.
+ * @param offerUnits - The unit for the {@code offerValue}. (not
null)
+ * @param shutdownSignal - Indicates to this listener that it
needs to stop adding events
+ * to the work queue because the application is shutting down.
(not null)
+ */
+ public LogEventWorkGenerator(
+ final BlockingQueue<LogEvent> workQueue,
+ final long offerValue,
+ final TimeUnit offerUnits,
+ final AtomicBoolean shutdownSignal) {
+ this.workQueue = requireNonNull(workQueue);
+ this.shutdownSignal = requireNonNull(shutdownSignal);
+ this.offerValue = offerValue;
+ this.offerUnits = requireNonNull(offerUnits);
+ }
+
+ @Override
+ public void notifyCreate(final String ryaInstanceName, final
QueryChangeLog changeLog) {
+ log.info("A new Query Change Log has been discovered for Rya
Instance " + ryaInstanceName + ". All " +
+ "queries that are set to active within it will be
started.");
+
+ // Create an event that summarizes this notification.
+ final LogEvent event = LogEvent.create(ryaInstanceName,
changeLog);
+
+ // Offer it to the worker until there is room for it in the
work queue, or we are shutting down.
+ offerUntilAcceptedOrShutdown(workQueue, event, offerValue,
offerUnits, shutdownSignal);
+ }
+
+ @Override
+ public void notifyDelete(final String ryaInstanceName) {
+ log.info("The Query Change Log for Rya Instance " +
ryaInstanceName + " has been deleted. All of the " +
+ "queries related to that instance will be stopped.");
+
+ // Create an event that summarizes this notification.
+ final LogEvent event = LogEvent.delete(ryaInstanceName);
+
+ // Offer it to the worker until there is room for it in the
work queue, or we are shutting down.
+ offerUntilAcceptedOrShutdown(workQueue, event, offerValue,
offerUnits, shutdownSignal);
+ }
+ }
+
+ /**
+ * Processes a work queue of {@link LogEvent}s.
+ * <p/>
+ * Whenever a new log has been created, then it registers a {@link
QueryEventWorkGenerator}
+ * that generates {@link QueryEvent}s based on the content and updates
to the discovered
+ * {@link QueryChagneLog}.
+ * <p/>
+ * Whenever a log is deleted, then the generator is stopped and a stop
all {@link QueryEvent}
+ * is written to the work queue.
+ */
+ @DefaultAnnotation(NonNull.class)
+ static class LogEventWorker implements Runnable {
+
+ /**
+ * A map of Rya Instance name to he Query Repository for that
instance.
+ */
+ private final Map<String, QueryRepository> repos = new HashMap<>();
+
+ private final BlockingQueue<LogEvent> logWorkQueue;
+ private final BlockingQueue<QueryEvent> queryWorkQueue;
+ private final long blockingValue;
+ private final TimeUnit blockingUnits;
+ private final AtomicBoolean shutdownSignal;
+
+ /**
+ * Constructs an instance of {@link LogEventWorker}.
+ *
+ * @param logWorkQueue - A queue of {@link LogEvent}s that will be
worked by this object. (not null)
+ * @param queryWorkQueue - A queue where {@link QueryEvent}s will
be placed by this object. (not null)
+ * @param blockingValue - How long to wait when polling/offering
new work.
+ * @param blockingUnits - The unit for the {@code blockingValue}.
(not null)
+ * @param shutdownSignal - Indicates when the application has been
shutdown, so the executing thread
+ * may exit the {@link #run()} method. (not null)
+ */
+ public LogEventWorker(
+ final BlockingQueue<LogEvent> logWorkQueue,
+ final BlockingQueue<QueryEvent> queryWorkQueue,
+ final long blockingValue,
+ final TimeUnit blockingUnits,
+ final AtomicBoolean shutdownSignal) {
+ this.logWorkQueue = requireNonNull(logWorkQueue);
+ this.queryWorkQueue = requireNonNull(queryWorkQueue);
+ this.blockingValue = blockingValue;
+ this.blockingUnits = requireNonNull(blockingUnits);
+ this.shutdownSignal = requireNonNull(shutdownSignal);
+ }
+
+ @Override
+ public void run() {
+ // Run until the shutdown signal is set.
+ while(!shutdownSignal.get()) {
+ try {
+ // Pull a unit of work from the queue.
+ log.debug("LogEventWorker - Polling the work queue for
a new LogEvent.");
+ final LogEvent logEvent =
logWorkQueue.poll(blockingValue, blockingUnits);
+ if(logEvent == null) {
+ // Poll again if nothing was found.
+ continue;
+ }
+
+ log.info("LogEventWorker - handling: \n" + logEvent);
+ final String ryaInstance =
logEvent.getRyaInstanceName();
+
+ switch(logEvent.getEventType()) {
+ case CREATE:
+ // If we see a create message for a Rya
Instance we are already maintaining,
+ // then don't do anything.
+ if(repos.containsKey(ryaInstance)) {
+ log.warn("LogEventWorker - A repository is
already being managed for the Rya Instance " +
+ ryaInstance + ". This message will
be ignored.");
+ continue;
+ }
+
+ // Create and start a QueryRepository for the
discovered log. Hold onto the repository
+ // so that it may be shutdown later.
+ final Scheduler scheduler =
Scheduler.newFixedRateSchedule(0, blockingValue, blockingUnits);
+ final QueryRepository repo = new
InMemoryQueryRepository(logEvent.getQueryChangeLog().get(), scheduler);
+ repo.startAndWait();
+ repos.put(ryaInstance, repo);
+
+ // Subscribe a worker that adds the Query
Events to the queryWorkQueue queue.
+ // A count down latch is used to ensure the
returned set of queries are handled
+ // prior to any notifications from the
repository.
+ final CountDownLatch subscriptionWorkFinished
= new CountDownLatch(1);
+ final QueryEventWorkGenerator
queryWorkGenerator =
+ new
QueryEventWorkGenerator(ryaInstance, subscriptionWorkFinished, queryWorkQueue,
+ blockingValue, blockingUnits,
shutdownSignal);
+
+ log.debug("LogEventWorker - Setting up a
QueryWorkGenerator...");
+ final Set<StreamsQuery> queries =
repo.subscribe(queryWorkGenerator);
+ log.debug("LogEventWorker - Finished setting
up a QueryWorkGenerator.");
+
+ // Handle the view of the queries within the
repository as it existed when
+ // the subscription was registered.
+ queries.stream()
+ .forEach(query -> {
+ // Create a QueryEvent that represents the
active state of the existing query.
+ final QueryEvent queryEvent =
query.isActive() ?
+ QueryEvent.executing(ryaInstance,
query) : QueryEvent.stopped(ryaInstance, query.getQueryId());
+ log.debug("LogEventWorker - offering: " +
queryEvent);
+
+ // Offer it to the worker until there is
room for it in the work queue, or we are shutting down.
+
offerUntilAcceptedOrShutdown(queryWorkQueue, queryEvent, blockingValue,
blockingUnits, shutdownSignal);
+ });
+
+ // Indicate the subscription work is finished
so that the registered listener may start
+ // adding work to the queue.
+ log.info("LogEventWorker - Counting down the
subscription work latch.");
+ subscriptionWorkFinished.countDown();
+ break;
+
+ case DELETE:
+ if(repos.containsKey(ryaInstance)) {
+ // Shut down the query repository for the
Rya instance. This ensures the listener will
+ // not receive any more work that needs to
be done.
+ final QueryRepository deletedRepo =
repos.remove(ryaInstance);
+ deletedRepo.stopAndWait();
+
+ // Add work that stops all of the queries
related to the instance.
+ final QueryEvent stopAllEvent =
QueryEvent.stopALL(ryaInstance);
+
offerUntilAcceptedOrShutdown(queryWorkQueue, stopAllEvent, blockingValue,
blockingUnits, shutdownSignal);
+ }
+ break;
+ }
+ } catch (final InterruptedException e) {
+ log.debug("LogEventWorker did not see any new events
over the past 5 seconds. Polling again...");
+ }
+ }
+
+ log.info("LogEventWorker shutting down...");
+
+ // Shutdown all of the QueryRepositories that were started.
+ repos.values().forEach(repo -> repo.stopAndWait());
+
+ log.info("LogEventWorker shut down.");
+ }
+ }
+
+ /**
+ * Listens to a {@link QueryRepository} and adds observations to the
provided work queue.
+ * It does so until the provided shutdown signal is set.
+ */
+ @DefaultAnnotation(NonNull.class)
+ static class QueryEventWorkGenerator implements QueryChangeLogListener
{
+
+ private final String ryaInstance;
+ private final CountDownLatch subscriptionWorkFinished;
+ private final BlockingQueue<QueryEvent> queryWorkQueue;
+ private final long blockingValue;
+ private final TimeUnit blockingUnits;
+ private final AtomicBoolean shutdownSignal;
+
+ /**
+ * Constructs an instance of {@link QueryEventWorkGenerator}.
+ *
+ * @param ryaInstance - The rya instance whose log this objects is
watching. (not null)
+ * @param subscriptionWorkFinished - Indicates when work that
needs to be completed before this
+ * listener handles notifications is completed. (not null)
+ * @param queryWorkQueue - A queue where {@link QueryEvent}s will
be placed by this object. (not null)
+ * @param blockingValue - How long to wait when polling/offering
new work.
+ * @param blockingUnits - The unit for the {@code blockingValue}.
(not null)
+ * @param shutdownSignal - Indicates to this listener that it
needs to stop adding events
+ * to the work queue because the application is shutting down.
(not null)
+ */
+ public QueryEventWorkGenerator(
+ final String ryaInstance,
+ final CountDownLatch subscriptionWorkFinished,
+ final BlockingQueue<QueryEvent> queryWorkQueue,
+ final long blockingValue,
+ final TimeUnit blockingUnits,
+ final AtomicBoolean shutdownSignal) {
+ this.ryaInstance = requireNonNull(ryaInstance);
+ this.subscriptionWorkFinished =
requireNonNull(subscriptionWorkFinished);
+ this.queryWorkQueue = requireNonNull(queryWorkQueue);
+ this.blockingValue = blockingValue;
+ this.blockingUnits = requireNonNull(blockingUnits);
+ this.shutdownSignal = requireNonNull(shutdownSignal);
+ }
+
+ @Override
+ public void notify(final ChangeLogEntry<QueryChange>
queryChangeEvent, final Optional<StreamsQuery> newQueryState) {
+ requireNonNull(queryChangeEvent);
+ requireNonNull(newQueryState);
+
+ // Wait for the subscription work to be finished.
+ try {
+ log.debug("Waiting for Subscription Work Finished latch to
release...");
+ while(!shutdownSignal.get() &&
!subscriptionWorkFinished.await(blockingValue, blockingUnits)) {
+ log.debug("Still waiting...");
+ }
+ log.debug("Subscription Work Finished latch to released.");
+ } catch (final InterruptedException e) {
+ log.warn("Interrupted while waiting for the Subscription
Work Finished latch to be " +
+ "released. Shutting down?", e);
+ }
+
+ // If we left the loop because of a shutdown, return
immediately.
+ if(shutdownSignal.get()) {
+ log.debug("Not processing notification. Shutting down.");
+ return;
+ }
+
+ // Generate work from the notification.
+ final QueryChange change = queryChangeEvent.getEntry();
+ switch(change.getChangeType()) {
+ case CREATE:
+ if(newQueryState.isPresent()) {
+ log.info("Rya Instance " + ryaInstance + " created
Rya Streams query " + newQueryState + ".");
+ final StreamsQuery newQuery = newQueryState.get();
+ if(newQuery.isActive()) {
+ final QueryEvent executeNewQuery =
QueryEvent.executing(ryaInstance, newQuery);
+ offerUntilAcceptedOrShutdown(queryWorkQueue,
executeNewQuery, blockingValue, blockingUnits, shutdownSignal);
+ }
+ } else {
+ log.error("Received a CREATE QueryChange for Rya
Instance: " + ryaInstance +
+ ", Query ID: " + change.getQueryId() + ",
but the QueryRepository did not supply a " +
+ "StreamsQuery representing the created
query. The query will not be processed.");
+ }
+ break;
+
+ case DELETE:
+ final UUID deletedQueryId = change.getQueryId();
+ log.info("Rya Instance " + ryaInstance + " deleted Rya
Streams query with ID " + deletedQueryId);
+ final QueryEvent stopDeletedQuery =
QueryEvent.stopped(ryaInstance, deletedQueryId);
+ offerUntilAcceptedOrShutdown(queryWorkQueue,
stopDeletedQuery, blockingValue, blockingUnits, shutdownSignal);
+ break;
+
+ case UPDATE:
+ if(newQueryState.isPresent()) {
+ final StreamsQuery updatedQuery =
newQueryState.get();
+ if(updatedQuery.isActive()) {
+ log.info("Rya Instance " + ryaInstance + "
updated Rya Streams query with ID " +
+ updatedQuery.getQueryId() + " to be
active.");
+ final QueryEvent executeUpdatedQuery =
QueryEvent.executing(ryaInstance, updatedQuery);
+ offerUntilAcceptedOrShutdown(queryWorkQueue,
executeUpdatedQuery, blockingValue, blockingUnits, shutdownSignal);
+ } else {
+ log.info("Rya Instance " + ryaInstance + "
updated Rya Streams query with ID " +
+ updatedQuery.getQueryId() + " to be
inactive.");
+ final QueryEvent stopUpdatedQuery =
QueryEvent.stopped(ryaInstance, updatedQuery.getQueryId());
+ offerUntilAcceptedOrShutdown(queryWorkQueue,
stopUpdatedQuery, blockingValue, blockingUnits, shutdownSignal);
+ }
+ } else {
+ log.error("Received an UPDATE QueryChange for Rya
Instance: " + ryaInstance +
+ ", Query ID: " + change.getQueryId() + ",
but the QueryRepository did not supply a " +
+ "StreamsQuery representing the created
query. The query will not be processed.");
+ }
+ break;
+ }
+ }
+ }
+
+ /**
+ * Processes a work queue of {@link QueryEvent}s.
+ * <p/>
+ * Each type of event maps the to corresponding method on {@link
QueryExecutor} that is called into.
+ */
+ @DefaultAnnotation(NonNull.class)
+ static class QueryEventWorker implements Runnable {
+
+ private final BlockingQueue<QueryEvent> workQueue;
+ private final QueryExecutor queryExecutor;
+ private final long pollingValue;
+ private final TimeUnit pollingUnits;
+ private final AtomicBoolean shutdownSignal;
+
+ /**
+ * Constructs an instance of {@link QueryEventWorker}.
+ *
+ * @param logWorkQueue - A queue of {@link QueryEvent}s that will
be worked by this object. (not null)
--- End diff --
Done.
---