Github user ejwhite922 commented on a diff in the pull request:

    https://github.com/apache/incubator-rya/pull/272#discussion_r167242868
  
    --- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
 ---
    @@ -0,0 +1,884 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.rya.streams.querymanager;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.Objects;
    +import java.util.Optional;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.concurrent.ArrayBlockingQueue;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.CountDownLatch;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.Executors;
    +import java.util.concurrent.RejectedExecutionException;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +import org.apache.rya.streams.api.entity.StreamsQuery;
    +import org.apache.rya.streams.api.queries.ChangeLogEntry;
    +import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
    +import org.apache.rya.streams.api.queries.QueryChange;
    +import org.apache.rya.streams.api.queries.QueryChangeLog;
    +import org.apache.rya.streams.api.queries.QueryChangeLogListener;
    +import org.apache.rya.streams.api.queries.QueryRepository;
    +import 
org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
    +import 
org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Preconditions;
    +import 
com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
    +import com.google.common.util.concurrent.AbstractService;
    +import com.google.common.util.concurrent.UncheckedExecutionException;
    +
    +import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
    +import edu.umd.cs.findbugs.annotations.NonNull;
    +
    +/**
    + * A service for managing {@link StreamsQuery} running on a Rya Streams 
system.
    + * <p>
    + * Only one QueryManager needs to be running to manage any number of rya
    + * instances/rya streams instances.
    + */
    +@DefaultAnnotation(NonNull.class)
    +public class QueryManager extends AbstractService {
    +    private static final Logger log = 
LoggerFactory.getLogger(QueryManager.class);
    +
    +    /**
    +     * The source of {@link QueryChangeLog}s. Each log discovered is bound 
to a specific
    +     * Rya instnace.
    +     */
    +    private final QueryChangeLogSource changeLogSource;
    +
    +    /**
    +     * The engine that is responsible for executing {@link StreamsQuery}s.
    +     */
    +    private final QueryExecutor queryExecutor;
    +
    +    /**
    +     * How long blocking operations will be attempted before potentially 
trying again.
    +     */
    +    private final long blockingValue;
    +
    +    /**
    +     * The units for {@link #blockingValue}.
    +     */
    +    private final TimeUnit blockingUnits;
    +
    +    /**
    +     * Used to inform threads that the application is shutting down, so 
they must stop work.
    +     */
    +    private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
    +
    +    /**
    +     * This thread pool manages the two thread used to work the {@link 
LogEvent}s
    +     * and the {@link QueryEvent}s.
    +     */
    +    private final ExecutorService executor = 
Executors.newFixedThreadPool(2);
    +
    +    /**
    +     * Creates a new {@link QueryManager}.
    +     *
    +     * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not 
null)
    +     * @param source - The {@link QueryChangeLogSource} of 
QueryChangeLogs. (not null)
    +     * @param blockingValue - How long blocking operations will try before 
looping. (> 0)
    +     * @param blockingUnits - The units of the {@code blockingValue}. (not 
null)
    +     */
    +    public QueryManager(
    +            final QueryExecutor queryExecutor,
    +            final QueryChangeLogSource source,
    +            final long blockingValue,
    +            final TimeUnit blockingUnits) {
    +        this.changeLogSource = requireNonNull(source);
    +        this.queryExecutor = requireNonNull(queryExecutor);
    +        Preconditions.checkArgument(blockingValue > 0, "The blocking value 
must be > 0. Was: " + blockingValue);
    +        this.blockingValue = blockingValue;
    +        this.blockingUnits = requireNonNull(blockingUnits);
    +    }
    +
    +    @Override
    +    protected void doStart() {
    +        log.info("Starting a QueryManager.");
    +
    +        // A work queue of discovered Query Change Logs that need to be 
handled.
    +        // This queue exists so that the source notifying thread may be 
released
    +        // immediately instead of calling into blocking functions.
    +        final BlockingQueue<LogEvent> logEvents = new 
ArrayBlockingQueue<>(1024);
    +
    +        // A work queue of discovered Query Changes from the monitored 
Query Change Logs
    +        // that need to be handled. This queue exists so that the Query 
Repository notifying
    +        // thread may be released immediately instead of calling into 
blocking functions.
    +        final BlockingQueue<QueryEvent> queryEvents = new 
ArrayBlockingQueue<>(1024);
    +
    +        try {
    +            // Start up a LogEventWorker using the executor service.
    +            executor.submit(new LogEventWorker(logEvents, queryEvents, 
blockingValue, blockingUnits, shutdownSignal));
    +
    +            // Start up a QueryEvent Worker using the executor service.
    +            executor.submit(new QueryEventWorker(queryEvents, 
queryExecutor, blockingValue, blockingUnits, shutdownSignal));
    +
    +            // Start up the query execution framework.
    +            queryExecutor.startAndWait();
    +
    +            // Startup the source that discovers new Query Change Logs.
    +            changeLogSource.startAndWait();
    +
    +            // Subscribe the source a listener that writes to the 
LogEventWorker's work queue.
    +            changeLogSource.subscribe(new LogEventWorkGenerator(logEvents, 
blockingValue, blockingUnits, shutdownSignal));
    +        } catch(final RejectedExecutionException | 
UncheckedExecutionException e) {
    +            log.error("Could not start up a QueryManager.", e);
    +            notifyFailed(e);
    +        }
    +
    +        // Notify the service was successfully started.
    +        notifyStarted();
    +
    +        log.info("QueryManager has finished starting.");
    +    }
    +
    +    @Override
    +    protected void doStop() {
    +        log.info("Stopping a QueryManager.");
    +
    +        // Set the shutdown flag so that all components that rely on that 
signal will stop processing.
    +        shutdownSignal.set(true);
    +
    +        // Stop the workers and wait for them to die.
    +        executor.shutdownNow();
    +        try {
    +            if(!executor.awaitTermination(10, TimeUnit.SECONDS)) {
    +                log.warn("Waited 10 seconds for the worker threads to die, 
but they are still running.");
    +            }
    +        } catch (final InterruptedException e) {
    +            log.warn("Waited 10 seconds for the worker threads to die, but 
they are still running.");
    +        }
    +
    +        // Stop the source of new Change Logs.
    +        try {
    +            changeLogSource.stopAndWait();
    +        } catch(final UncheckedExecutionException e) {
    +            log.warn("Could not stop the Change Log Source.", e);
    +        }
    +
    +        // Stop the query execution framework.
    +        try {
    +            queryExecutor.stopAndWait();
    +        } catch(final UncheckedExecutionException e) {
    +            log.warn("Could not stop the Query Executor", e);
    +        }
    +
    +        // Notify the service was successfully stopped.
    +        notifyStopped();
    +
    +        log.info("QueryManager has finished stopping.");
    +    }
    +
    +    /**
    +     * Offer a unit of work to a blocking queue until it is either 
accepted, or the
    +     * shutdown signal is set.
    +     *
    +     * @param workQueue - The blocking work queue to write to. (not null)
    +     * @param event - The event that will be offered to the work queue. 
(not null)
    +     * @param offerValue - How long to wait when offering new work.
    +     * @param offerUnits - The unit for the {@code offerValue}. (not null)
    +     * @param shutdownSignal - Used to signal application shutdown has 
started, so
    +     *   this method may terminate without ever placing the event on the 
queue. (not null)
    +     * @return {@code true} if the evet nwas added to the queue, otherwise 
false.
    +     */
    +    private static <T> boolean offerUntilAcceptedOrShutdown(
    +            final BlockingQueue<T> workQueue,
    +            final T event, final
    +            long offerValue,
    +            final TimeUnit offerUnits,
    +            final AtomicBoolean shutdownSignal) {
    +        requireNonNull(workQueue);
    +        requireNonNull(event);
    +        requireNonNull(shutdownSignal);
    +
    +        boolean submitted = false;
    +        while(!submitted && !shutdownSignal.get()) {
    +            try {
    +                submitted = workQueue.offer(event, offerValue, offerUnits);
    +                if(!submitted) {
    +                    log.debug("An event could not be added to a work queue 
after waiting 5 seconds. Trying again...");
    +                }
    +            } catch (final InterruptedException e) {
    +                log.debug("An event could not be added to a work queue 
after waiting 5 seconds. Trying again...");
    +            }
    +        }
    +        return submitted;
    +    }
    +
    +    /**
    +     * An observation that a {@link QueryChangeLog} was created within or
    +     * removed from a {@link QueryChangeLogSource}.
    +     */
    +    @DefaultAnnotation(NonNull.class)
    +    static class LogEvent {
    +
    +        /**
    +         * The types of events that may be observed.
    +         */
    +        static enum LogEventType {
    +            /**
    +             * A {@link QueryChangeLog} was created within a {@link 
QueryChangeLogSource}.
    +             */
    +            CREATE,
    +
    +            /**
    +             * A {@link QueryChangeLog} was deleted from a {@link 
QueryChangeLogSource}.
    +             */
    +            DELETE;
    +        }
    +
    +        private final String ryaInstance;
    +        private final LogEventType eventType;
    +        private final Optional<QueryChangeLog> log;
    +
    +        /**
    +         * Constructs an instance of {@link LogEvent}.
    +         *
    +         * @param ryaInstance - The Rya Instance the log is/was for. (not 
null)
    +         * @param eventType - The type of event that was observed. (not 
null)
    +         * @param log - The log if this is a create event. (not null)
    +         */
    +        private LogEvent(final String ryaInstance, final LogEventType 
eventType, final Optional<QueryChangeLog> log) {
    +            this.ryaInstance = requireNonNull(ryaInstance);
    +            this.eventType = requireNonNull(eventType);
    +            this.log = requireNonNull(log);
    +        }
    +
    +        /**
    +         * @return The Rya Instance whose log was either created or 
deleted.
    +         */
    +        public String getRyaInstanceName() {
    +            return ryaInstance;
    +        }
    +
    +        /**
    +         * @return The type of event that was observed.
    +         */
    +        public LogEventType getEventType() {
    +            return eventType;
    +        }
    +
    +        /**
    +         * @return The {@link QueryChangeLog} if this is a CREATE event.
    +         */
    +        public Optional<QueryChangeLog> getQueryChangeLog() {
    +            return log;
    +        }
    +
    +        @Override
    +        public String toString() {
    +            return "LogEvent {\n" +
    +                   "    Rya Instance: " + ryaInstance + ",\n" +
    +                   "    Event Type: " + eventType + "\n" +
    +                   "}";
    +        }
    +
    +        /**
    +         * Make a {@link LogEvent} that indicates a {@link QueryChangeLog} 
was created within a
    +         * {@link QueryChangeLogSource}.
    +         *
    +         * @param ryaInstance - The Rya Instance the created log is for. 
(not null)
    +         * @param log - The created {@link QueryChangeLog. (not null)
    +         * @return A {@link LogEvent} built using the provided values.
    +         */
    +        public static LogEvent create(final String ryaInstance, final 
QueryChangeLog log) {
    +            return new LogEvent(ryaInstance, LogEventType.CREATE 
,Optional.of(log));
    +        }
    +
    +        /**
    +         * Make a {@link LogEvent} that indicates a {@link QueryChangeLog} 
was deleted from
    +         * a {@link QueryChangeLogSource}.
    +         *
    +         * @param ryaInstance - The Rya Instance whose log was deleted. 
(not null)
    +         * @return A {@link LogEvent} built using the provided values.
    +         */
    +        public static LogEvent delete(final String ryaInstance) {
    +            return new LogEvent(ryaInstance, LogEventType.DELETE, 
Optional.empty());
    +        }
    +    }
    +
    +    /**
    +     * An observation that a {@link StreamsQuery} needs to be executing or 
not
    +     * via the provided {@link QueryExecutor}.
    +     */
    +    @DefaultAnnotation(NonNull.class)
    +    static class QueryEvent {
    +
    +        /**
    +         * The type of events that may be observed.
    +         */
    +        public static enum QueryEventType {
    +            /**
    +             * Indicates a {@link StreamsQuery} needs to be executing.
    +             */
    +            EXECUTING,
    +
    +            /**
    +             * Indicates a {@link StreamsQuery} needs to be stopped.
    +             */
    +            STOPPED,
    +
    +            /**
    +             * Indicates all {@link StreamsQuery}s for a Rya instance need 
to be stopped.
    +             */
    +            STOP_ALL;
    +        }
    +
    +        private final String ryaInstance;
    +        private final QueryEventType type;
    +        private final Optional<UUID> queryId;
    +        private final Optional<StreamsQuery> query;
    +
    +        /**
    +         * Constructs an instance of {@link QueryEvent}.
    +         *
    +         * @param ryaInstance - The Rya instance that generated the event. 
(not null)
    +         * @param type - Indicates whether the query needs to be executing 
or not. (not null)
    +         * @param queryId - If stopped, the ID of the query that must not 
be running. (not null)
    +         * @param query - If executing, the StreamsQuery that defines what 
should be executing. (not null)
    +         */
    +        private QueryEvent(
    +                final String ryaInstance,
    +                final QueryEventType type,
    +                final Optional<UUID> queryId,
    +                final Optional<StreamsQuery> query) {
    +            this.ryaInstance = requireNonNull(ryaInstance);
    +            this.type = requireNonNull(type);
    +            this.queryId = requireNonNull(queryId);
    +            this.query = requireNonNull(query);
    +        }
    +
    +        /**
    +         * @return The Rya instance that generated the event.
    +         */
    +        public String getRyaInstance() {
    +            return ryaInstance;
    +        }
    +
    +        /**
    +         * @return Indicates whether the query needs to be executing or 
not.
    +         */
    +        public QueryEventType getType() {
    +            return type;
    +        }
    +
    +        /**
    +         * @return If stopped, the ID of the query that must not be 
running. Otherwise absent.
    +         */
    +        public Optional<UUID> getQueryId() {
    +            return queryId;
    +        }
    +
    +        /**
    +         * @return If executing, the StreamsQuery that defines what should 
be executing. Otherwise absent.
    +         */
    +        public Optional<StreamsQuery> getStreamsQuery() {
    +            return query;
    +        }
    +
    +        @Override
    +        public int hashCode() {
    +            return Objects.hash(ryaInstance, type, queryId, query);
    +        }
    +
    +        @Override
    +        public boolean equals(final Object o) {
    +            if(o instanceof QueryEvent) {
    +                final QueryEvent other = (QueryEvent) o;
    +                return Objects.equals(ryaInstance, other.ryaInstance) &&
    +                        Objects.equals(type, other.type) &&
    +                        Objects.equals(queryId, other.queryId) &&
    +                        Objects.equals(query, other.query);
    +            }
    +            return false;
    +        }
    +
    +        @Override
    +        public String toString() {
    +            final StringBuilder string = new StringBuilder();
    +            string.append("Query Event {\n")
    +                  .append("    Rya 
Instance:").append(ryaInstance).append(",\n")
    +                  .append("    Type: ").append(type).append(",\n");
    +            switch(type) {
    +                case EXECUTING:
    +                    append(string, query.get());
    +                    break;
    +                case STOPPED:
    +                    string.append("    Query ID: 
").append(queryId.get()).append("\n");
    +                    break;
    +                case STOP_ALL:
    +                    break;
    +                default:
    +                    // Default to showing everything that is in the object.
    +                    string.append("    Query ID: 
").append(queryId.get()).append("\n");
    +                    append(string, query.get());
    +                    break;
    +            }
    +            string.append("}");
    +            return string.toString();
    +        }
    +
    +        private void append(final StringBuilder string, final StreamsQuery 
query) {
    +            requireNonNull(string);
    +            requireNonNull(query);
    +            string.append("    Streams Query {\n")
    +                  .append("        Query ID: 
").append(query.getQueryId()).append(",\n")
    +                  .append("        Is Active: 
").append(query.isActive()).append(",\n")
    +                  .append("        SPARQL: 
").append(query.getSparql()).append("\n")
    +                  .append("    }");
    +        }
    +
    +        /**
    +         * Create a {@link QueryEvent} that indicates a query needs to be 
executing.
    +         *
    +         * @param ryaInstance - The Rya instance that generated the event. 
(not null)
    +         * @param query - The StreamsQuery that defines what should be 
executing. (not null)
    +         * @return A {@link QueryEvent} built using the provided values.
    +         */
    +        public static QueryEvent executing(final String ryaInstance, final 
StreamsQuery query) {
    +            return new QueryEvent(ryaInstance, QueryEventType.EXECUTING, 
Optional.empty(), Optional.of(query));
    +        }
    +
    +        /**
    +         * Create a {@link QueryEvent} that indicates a query needs to be 
stopped.
    +         *
    +         * @param ryaInstance - The Rya instance that generated the event. 
(not null)
    +         * @param queryId - The ID of the query that must not be running. 
(not null)
    +         * @return A {@link QueryEvent} built using the provided values.
    +         */
    +        public static QueryEvent stopped(final String ryaInstance, final 
UUID queryId) {
    +            return new QueryEvent(ryaInstance, QueryEventType.STOPPED, 
Optional.of(queryId), Optional.empty());
    +        }
    +
    +        /**
    +         * Create a {@link QueryEvent} that indicates all queries for a 
Rya instance needs to be stopped.
    +         *
    +         * @param ryaInstance - The Rya instance that generated the event. 
(not null)
    +         * @return A {@link QueryEvent} built using the provided values.
    +         */
    +        public static QueryEvent stopALL(final String ryaInstance) {
    +            return new QueryEvent(ryaInstance, QueryEventType.STOP_ALL, 
Optional.empty(), Optional.empty());
    +        }
    +    }
    +
    +    /**
    +     * Listens to a {@link QueryChangeLogSource} and adds observations to 
the provided
    +     * work queue. It does so until the provided shutdown signal is set.
    +     */
    +    @DefaultAnnotation(NonNull.class)
    +    static class LogEventWorkGenerator implements SourceListener {
    +
    +        private final BlockingQueue<LogEvent> workQueue;
    +        private final AtomicBoolean shutdownSignal;
    +        private final long offerValue;
    +        private final TimeUnit offerUnits;
    +
    +        /**
    +         * Constructs an instance of {@link QueryManagerSourceListener}.
    +         *
    +         * @param workQueue - A blocking queue that will have {@link 
LogEvent}s offered to it. (not null)
    +         * @param offerValue - How long to wait when offering new work.
    +         * @param offerUnits - The unit for the {@code offerValue}. (not 
null)
    +         * @param shutdownSignal - Indicates to this listener that it 
needs to stop adding events
    +         *   to the work queue because the application is shutting down. 
(not null)
    +         */
    +        public LogEventWorkGenerator(
    +                final BlockingQueue<LogEvent> workQueue,
    +                final long offerValue,
    +                final TimeUnit offerUnits,
    +                final AtomicBoolean shutdownSignal) {
    +            this.workQueue = requireNonNull(workQueue);
    +            this.shutdownSignal = requireNonNull(shutdownSignal);
    +            this.offerValue = offerValue;
    +            this.offerUnits = requireNonNull(offerUnits);
    +        }
    +
    +        @Override
    +        public void notifyCreate(final String ryaInstanceName, final 
QueryChangeLog changeLog) {
    +            log.info("A new Query Change Log has been discovered for Rya 
Instance " + ryaInstanceName + ". All " +
    +                    "queries that are set to active within it will be 
started.");
    +
    +            // Create an event that summarizes this notification.
    +            final LogEvent event = LogEvent.create(ryaInstanceName, 
changeLog);
    +
    +            // Offer it to the worker until there is room for it in the 
work queue, or we are shutting down.
    +            offerUntilAcceptedOrShutdown(workQueue, event, offerValue, 
offerUnits, shutdownSignal);
    +        }
    +
    +        @Override
    +        public void notifyDelete(final String ryaInstanceName) {
    +            log.info("The Query Change Log for Rya Instance " + 
ryaInstanceName + " has been deleted. All of the " +
    +                    "queries related to that instance will be stopped.");
    +
    +            // Create an event that summarizes this notification.
    +            final LogEvent event = LogEvent.delete(ryaInstanceName);
    +
    +            // Offer it to the worker until there is room for it in the 
work queue, or we are shutting down.
    +            offerUntilAcceptedOrShutdown(workQueue, event, offerValue, 
offerUnits, shutdownSignal);
    +        }
    +    }
    +
    +    /**
    +     * Processes a work queue of {@link LogEvent}s.
    +     * <p/>
    +     * Whenever a new log has been created, then it registers a {@link 
QueryEventWorkGenerator}
    +     * that generates {@link QueryEvent}s based on the content and updates 
to the discovered
    +     * {@link QueryChagneLog}.
    +     * <p/>
    +     * Whenever a log is deleted, then the generator is stopped and a stop 
all {@link QueryEvent}
    +     * is written to the work queue.
    +     */
    +    @DefaultAnnotation(NonNull.class)
    +    static class LogEventWorker implements Runnable {
    +
    +        /**
    +         * A map of Rya Instance name to he Query Repository for that 
instance.
    +         */
    +        private final Map<String, QueryRepository> repos = new HashMap<>();
    +
    +        private final BlockingQueue<LogEvent> logWorkQueue;
    +        private final BlockingQueue<QueryEvent> queryWorkQueue;
    +        private final long blockingValue;
    +        private final TimeUnit blockingUnits;
    +        private final AtomicBoolean shutdownSignal;
    +
    +        /**
    +         * Constructs an instance of {@link LogEventWorker}.
    +         *
    +         * @param logWorkQueue - A queue of {@link LogEvent}s that will be 
worked by this object. (not null)
    +         * @param queryWorkQueue - A queue where {@link QueryEvent}s will 
be placed by this object. (not null)
    +         * @param blockingValue - How long to wait when polling/offering 
new work.
    +         * @param blockingUnits - The unit for the {@code blockingValue}. 
(not null)
    +         * @param shutdownSignal - Indicates when the application has been 
shutdown, so the executing thread
    +         *   may exit the {@link #run()} method. (not null)
    +         */
    +        public LogEventWorker(
    +                final BlockingQueue<LogEvent> logWorkQueue,
    +                final BlockingQueue<QueryEvent> queryWorkQueue,
    +                final long blockingValue,
    +                final TimeUnit blockingUnits,
    +                final AtomicBoolean shutdownSignal) {
    +            this.logWorkQueue = requireNonNull(logWorkQueue);
    +            this.queryWorkQueue = requireNonNull(queryWorkQueue);
    +            this.blockingValue = blockingValue;
    +            this.blockingUnits = requireNonNull(blockingUnits);
    +            this.shutdownSignal = requireNonNull(shutdownSignal);
    +        }
    +
    +        @Override
    +        public void run() {
    +            // Run until the shutdown signal is set.
    +            while(!shutdownSignal.get()) {
    +                try {
    +                    // Pull a unit of work from the queue.
    +                    log.debug("LogEventWorker - Polling the work queue for 
a new LogEvent.");
    +                    final LogEvent logEvent = 
logWorkQueue.poll(blockingValue, blockingUnits);
    +                    if(logEvent == null) {
    +                        // Poll again if nothing was found.
    +                        continue;
    +                    }
    +
    +                    log.info("LogEventWorker - handling: \n" + logEvent);
    +                    final String ryaInstance = 
logEvent.getRyaInstanceName();
    +
    +                    switch(logEvent.getEventType()) {
    +                        case CREATE:
    +                            // If we see a create message for a Rya 
Instance we are already maintaining,
    +                            // then don't do anything.
    +                            if(repos.containsKey(ryaInstance)) {
    +                                log.warn("LogEventWorker - A repository is 
already being managed for the Rya Instance " +
    +                                        ryaInstance + ". This message will 
be ignored.");
    +                                continue;
    +                            }
    +
    +                            // Create and start a QueryRepository for the 
discovered log. Hold onto the repository
    +                            // so that it may be shutdown later.
    +                            final Scheduler scheduler = 
Scheduler.newFixedRateSchedule(0, blockingValue, blockingUnits);
    +                            final QueryRepository repo = new 
InMemoryQueryRepository(logEvent.getQueryChangeLog().get(), scheduler);
    +                            repo.startAndWait();
    +                            repos.put(ryaInstance, repo);
    +
    +                            // Subscribe a worker that adds the Query 
Events to the queryWorkQueue queue.
    +                            // A count down latch is used to ensure the 
returned set of queries are handled
    +                            // prior to any notifications from the 
repository.
    +                            final CountDownLatch subscriptionWorkFinished 
= new CountDownLatch(1);
    +                            final QueryEventWorkGenerator 
queryWorkGenerator =
    +                                    new 
QueryEventWorkGenerator(ryaInstance, subscriptionWorkFinished, queryWorkQueue,
    +                                            blockingValue, blockingUnits, 
shutdownSignal);
    +
    +                            log.debug("LogEventWorker - Setting up a 
QueryWorkGenerator...");
    +                            final Set<StreamsQuery> queries = 
repo.subscribe(queryWorkGenerator);
    +                            log.debug("LogEventWorker - Finished setting 
up a QueryWorkGenerator.");
    +
    +                            // Handle the view of the queries within the 
repository as it existed when
    +                            // the subscription was registered.
    +                            queries.stream()
    +                            .forEach(query -> {
    +                                // Create a QueryEvent that represents the 
active state of the existing query.
    +                                final QueryEvent queryEvent = 
query.isActive() ?
    +                                        QueryEvent.executing(ryaInstance, 
query) : QueryEvent.stopped(ryaInstance, query.getQueryId());
    +                                log.debug("LogEventWorker - offering: " + 
queryEvent);
    +
    +                                // Offer it to the worker until there is 
room for it in the work queue, or we are shutting down.
    +                                
offerUntilAcceptedOrShutdown(queryWorkQueue, queryEvent, blockingValue, 
blockingUnits, shutdownSignal);
    +                            });
    +
    +                            // Indicate the subscription work is finished 
so that the registered listener may start
    +                            // adding work to the queue.
    +                            log.info("LogEventWorker - Counting down the 
subscription work latch.");
    +                            subscriptionWorkFinished.countDown();
    +                            break;
    +
    +                        case DELETE:
    +                            if(repos.containsKey(ryaInstance)) {
    +                                // Shut down the query repository for the 
Rya instance. This ensures the listener will
    +                                // not receive any more work that needs to 
be done.
    +                                final QueryRepository deletedRepo = 
repos.remove(ryaInstance);
    +                                deletedRepo.stopAndWait();
    +
    +                                // Add work that stops all of the queries 
related to the instance.
    +                                final QueryEvent stopAllEvent = 
QueryEvent.stopALL(ryaInstance);
    +                                
offerUntilAcceptedOrShutdown(queryWorkQueue, stopAllEvent, blockingValue, 
blockingUnits, shutdownSignal);
    +                            }
    +                            break;
    +                    }
    +                } catch (final InterruptedException e) {
    +                    log.debug("LogEventWorker did not see any new events 
over the past 5 seconds. Polling again...");
    +                }
    +            }
    +
    +            log.info("LogEventWorker shutting down...");
    +
    +            // Shutdown all of the QueryRepositories that were started.
    +            repos.values().forEach(repo -> repo.stopAndWait());
    +
    +            log.info("LogEventWorker shut down.");
    +        }
    +    }
    +
    +    /**
    +     * Listens to a {@link QueryRepository} and adds observations to the 
provided work queue.
    +     * It does so until the provided shutdown signal is set.
    +     */
    +    @DefaultAnnotation(NonNull.class)
    +    static class QueryEventWorkGenerator implements QueryChangeLogListener 
{
    +
    +        private final String ryaInstance;
    +        private final CountDownLatch subscriptionWorkFinished;
    +        private final BlockingQueue<QueryEvent> queryWorkQueue;
    +        private final long blockingValue;
    +        private final TimeUnit blockingUnits;
    +        private final AtomicBoolean shutdownSignal;
    +
    +        /**
    +         * Constructs an instance of {@link QueryEventWorkGenerator}.
    +         *
    +         * @param ryaInstance - The rya instance whose log this objects is 
watching. (not null)
    +         * @param subscriptionWorkFinished - Indicates when work that 
needs to be completed before this
    +         *   listener handles notifications is completed. (not null)
    +         * @param queryWorkQueue - A queue where {@link QueryEvent}s will 
be placed by this object. (not null)
    +         * @param blockingValue - How long to wait when polling/offering 
new work.
    +         * @param blockingUnits - The unit for the {@code blockingValue}. 
(not null)
    +         * @param shutdownSignal - Indicates to this listener that it 
needs to stop adding events
    +         *   to the work queue because the application is shutting down. 
(not null)
    +         */
    +        public QueryEventWorkGenerator(
    +                final String ryaInstance,
    +                final CountDownLatch subscriptionWorkFinished,
    +                final BlockingQueue<QueryEvent> queryWorkQueue,
    +                final long blockingValue,
    +                final TimeUnit blockingUnits,
    +                final AtomicBoolean shutdownSignal) {
    +            this.ryaInstance = requireNonNull(ryaInstance);
    +            this.subscriptionWorkFinished = 
requireNonNull(subscriptionWorkFinished);
    +            this.queryWorkQueue = requireNonNull(queryWorkQueue);
    +            this.blockingValue = blockingValue;
    +            this.blockingUnits = requireNonNull(blockingUnits);
    +            this.shutdownSignal = requireNonNull(shutdownSignal);
    +        }
    +
    +        @Override
    +        public void notify(final ChangeLogEntry<QueryChange> 
queryChangeEvent, final Optional<StreamsQuery> newQueryState) {
    +            requireNonNull(queryChangeEvent);
    +            requireNonNull(newQueryState);
    +
    +            // Wait for the subscription work to be finished.
    +            try {
    +                log.debug("Waiting for Subscription Work Finished latch to 
release...");
    +                while(!shutdownSignal.get() && 
!subscriptionWorkFinished.await(blockingValue, blockingUnits)) {
    +                    log.debug("Still waiting...");
    +                }
    +                log.debug("Subscription Work Finished latch to released.");
    +            } catch (final InterruptedException e) {
    +                log.warn("Interrupted while waiting for the Subscription 
Work Finished latch to be " +
    +                        "released. Shutting down?", e);
    +            }
    +
    +            // If we left the loop because of a shutdown, return 
immediately.
    +            if(shutdownSignal.get()) {
    +                log.debug("Not processing notification. Shutting down.");
    +                return;
    +            }
    +
    +            // Generate work from the notification.
    +            final QueryChange change = queryChangeEvent.getEntry();
    +            switch(change.getChangeType()) {
    +                case CREATE:
    +                    if(newQueryState.isPresent()) {
    +                        log.info("Rya Instance " + ryaInstance + " created 
Rya Streams query " + newQueryState + ".");
    +                        final StreamsQuery newQuery = newQueryState.get();
    +                        if(newQuery.isActive()) {
    +                            final QueryEvent executeNewQuery = 
QueryEvent.executing(ryaInstance, newQuery);
    +                            offerUntilAcceptedOrShutdown(queryWorkQueue, 
executeNewQuery, blockingValue, blockingUnits, shutdownSignal);
    +                        }
    +                    } else {
    +                        log.error("Received a CREATE QueryChange for Rya 
Instance: " + ryaInstance +
    +                                ", Query ID: " + change.getQueryId() + ", 
but the QueryRepository did not supply a " +
    +                                "StreamsQuery representing the created 
query. The query will not be processed.");
    +                    }
    +                    break;
    +
    +                case DELETE:
    +                    final UUID deletedQueryId = change.getQueryId();
    +                    log.info("Rya Instance " + ryaInstance + " deleted Rya 
Streams query with ID " + deletedQueryId);
    +                    final QueryEvent stopDeletedQuery = 
QueryEvent.stopped(ryaInstance, deletedQueryId);
    +                    offerUntilAcceptedOrShutdown(queryWorkQueue, 
stopDeletedQuery, blockingValue, blockingUnits, shutdownSignal);
    +                    break;
    +
    +                case UPDATE:
    +                    if(newQueryState.isPresent()) {
    +                        final StreamsQuery updatedQuery = 
newQueryState.get();
    +                        if(updatedQuery.isActive()) {
    +                            log.info("Rya Instance " + ryaInstance + " 
updated Rya Streams query with ID " +
    +                                    updatedQuery.getQueryId() + " to be 
active.");
    +                            final QueryEvent executeUpdatedQuery = 
QueryEvent.executing(ryaInstance, updatedQuery);
    +                            offerUntilAcceptedOrShutdown(queryWorkQueue, 
executeUpdatedQuery, blockingValue, blockingUnits, shutdownSignal);
    +                        } else {
    +                            log.info("Rya Instance " + ryaInstance + " 
updated Rya Streams query with ID " +
    +                                    updatedQuery.getQueryId() + " to be 
inactive.");
    +                            final QueryEvent stopUpdatedQuery = 
QueryEvent.stopped(ryaInstance, updatedQuery.getQueryId());
    +                            offerUntilAcceptedOrShutdown(queryWorkQueue, 
stopUpdatedQuery, blockingValue, blockingUnits, shutdownSignal);
    +                        }
    +                    } else {
    +                        log.error("Received an UPDATE QueryChange for Rya 
Instance: " + ryaInstance +
    +                                ", Query ID: " + change.getQueryId() + ", 
but the QueryRepository did not supply a " +
    +                                "StreamsQuery representing the created 
query. The query will not be processed.");
    +                    }
    +                    break;
    +            }
    +        }
    +    }
    +
    +    /**
    +     * Processes a work queue of {@link QueryEvent}s.
    +     * <p/>
    +     * Each type of event maps the to corresponding method on {@link 
QueryExecutor} that is called into.
    +     */
    +    @DefaultAnnotation(NonNull.class)
    +    static class QueryEventWorker implements Runnable {
    +
    +        private final BlockingQueue<QueryEvent> workQueue;
    +        private final QueryExecutor queryExecutor;
    +        private final long pollingValue;
    +        private final TimeUnit pollingUnits;
    +        private final AtomicBoolean shutdownSignal;
    +
    +        /**
    +         * Constructs an instance of {@link QueryEventWorker}.
    +         *
    +         * @param logWorkQueue - A queue of {@link QueryEvent}s that will 
be worked by this object. (not null)
    +         * @param queryExecutor - Responsible for executing the {@link 
StreamsQuery}s. (not null)
    +         * @param pollingValue - How long to wait when polling for new 
work.
    +         * @param pollingUnits - The units for the {@code pollingValue}. 
(not null)
    +         * @param shutdownSignal - Indicates when the application has been 
shutdown, so the executing thread
    +         *   may exit the {@link #run()} method. (not null)
    +         */
    +        public QueryEventWorker(
    +                final BlockingQueue<QueryEvent> workQueue,
    +                final QueryExecutor queryExecutor,
    +                final long pollingValue,
    +                final TimeUnit pollingUnits,
    +                final AtomicBoolean shutdownSignal) {
    +            this.workQueue = requireNonNull(workQueue);
    +            this.queryExecutor = requireNonNull(queryExecutor);
    +            this.pollingValue = pollingValue;
    +            this.pollingUnits = requireNonNull(pollingUnits);
    +            this.shutdownSignal = requireNonNull(shutdownSignal);
    +        }
    +
    +        @Override
    +        public void run() {
    +            log.info("QueryEventWorker starting.");
    +
    +            // Run until the shutdown signal is set.
    +            while(!shutdownSignal.get()) {
    +                // Pull a unit of work from the queue.
    +                try {
    +                    log.debug("Polling the work queue for a new 
QueryEvent.");
    +                    final QueryEvent event = workQueue.poll(pollingValue, 
pollingUnits);
    +                    if(event == null) {
    +                        // Poll again if nothing was found.
    +                        continue;
    +                    }
    +
    +                    log.info("QueryEventWorker handling:\n" + event);
    +
    +                    // Ensure the state within the executor matches the 
query event's state.
    +                    switch(event.getType()) {
    +                        case EXECUTING:
    +                            try {
    +                                
queryExecutor.startQuery(event.getRyaInstance(), event.getStreamsQuery().get());
    +                            } catch (final IllegalStateException | 
QueryExecutorException e) {
    +                                log.error("Could not start a query 
represented by the following work: " + event, e);
    +                            }
    +                            break;
    +
    +                        case STOPPED:
    +                            try {
    +                                
queryExecutor.stopQuery(event.getQueryId().get());
    +                            } catch (final IllegalStateException | 
QueryExecutorException e) {
    +                                log.error("Could not stop a query 
represented by the following work: " + event, e);
    +                            }
    +                            break;
    +
    +                        case STOP_ALL:
    +                        try {
    --- End diff --
    
    Fix the spacing for this case.


---

Reply via email to