[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-03-02 Thread kchilton2
Github user kchilton2 closed the pull request at:

https://github.com/apache/incubator-rya/pull/272


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-22 Thread ejwhite922
Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r170073123
  
--- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
 ---
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager.kafka;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
+import org.apache.rya.streams.querymanager.QueryChangeLogSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractScheduledService;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Represents a Kafka Server that manages {@link KafkaQueryChangeLog}s.
+ * 
+ * Thread safe.
--- End diff --

net.jcip.annotations.ThreadSafe

Looks like Rya is using this one:
```

com.github.stephenc.jcip
jcip-annotations
1.0-1

```


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-22 Thread ejwhite922
Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r170069054
  
--- Diff: extras/rya.streams/query-manager/pom.xml ---
@@ -0,0 +1,253 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+
+org.apache.rya
+rya.streams.parent
+3.2.12-incubating-SNAPSHOT
+
+
+4.0.0
+rya.streams.query-manager
+
+Apache Rya Streams Query Manager
+
+This module contains the Rya Streams Query Manager.
+
+
+
+
${project.build.directory}/${project.artifactId}-${project.version}-rpm-staging
+
+
+
+
+
+org.apache.rya
+rya.streams.kafka
+
+
+
+
+commons-daemon
+commons-daemon
+1.1.0
+
+
+org.slf4j
+slf4j-log4j12
+
+
+com.beust
+jcommander
+
+
+
+
+junit
+junit
+test
+
+
+org.apache.rya
+rya.test.kafka
+test
+
+
+org.mockito
+mockito-all
+test
+
+
+
+
+
+
+src/main/xsd
+
+
+
+
+
+
+org.codehaus.mojo
+jaxb2-maven-plugin
+
+
+xjc
+
+xjc
+
+
+
+
+
org.apache.rya.streams.querymanager.xml
+
+
+ 
+   
+
+com.mycila
+license-maven-plugin
+
+
${project.basedir}/src/license/header.txt
+
+
+
+update-generated-source-headers
+
+
${project.build.directory}/generated-sources/jaxb
+
+
XML_STYLE
+
+
+process-sources
+
+format
+
+
+
+
+
+
+
+org.apache.maven.plugins
+maven-shade-plugin
+
+
+package
+
+  shade
+
+
+
+
+
org.apache.rya.streams.querymanager.QueryManagerDaemon
+
+
+
+
+
+
+
+
+
+
+org.apache.maven.plugins
+maven-assembly-plugin
+
+
+
+stage-content-for-rpms
+
+single
+
+package
+
+
+false
+
+
src/assembly/rpm-staging.xml
+
+
+
+
+
+
+
+org.codehaus.mojo
+rpm-maven-plugin
+
+
+set-rpm-properties
+
+version
+
+
+
+create-rpm-distribution
--- End diff --


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168278465
  
--- Diff: 
extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
 ---
@@ -107,12 +112,11 @@ private String formatQueries(final Set 
queries) {
 sb.append("Queries in Rya Streams:\n");
 
sb.append("-\n");
 queries.forEach(query -> {
-sb.append("ID: ");
-sb.append(query.getQueryId());
-sb.append("\t\t");
-sb.append("Query: ");
-sb.append(query.getSparql());
-sb.append("\n");
+sb.append("ID: ").append(query.getQueryId())
+.append("")
+.append("Is Active: ").append(query.isActive())
+.append( query.isActive() ? " " : "" )
+.append("Query: ").append(query.getSparql()).append("\n");
--- End diff --

Should b e sanitized before it gets here.


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168278306
  
--- Diff: 
extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
 ---
@@ -85,4 +85,19 @@ public boolean equals(final Object o) {
 }
 return false;
 }
+
+@Override
+public String toString() {
+final StringBuilder sb = new StringBuilder();
+sb.append("ID: ");
+sb.append(getQueryId().toString() + "\n");
+sb.append("Query: ");
+sb.append(getSparql() + "\n");
--- End diff --

Nope.


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168278274
  
--- Diff: 
extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
 ---
@@ -107,12 +112,11 @@ private String formatQueries(final Set 
queries) {
 sb.append("Queries in Rya Streams:\n");
 
sb.append("-\n");
 queries.forEach(query -> {
-sb.append("ID: ");
-sb.append(query.getQueryId());
-sb.append("\t\t");
-sb.append("Query: ");
-sb.append(query.getSparql());
-sb.append("\n");
+sb.append("ID: ").append(query.getQueryId())
+.append("")
+.append("Is Active: ").append(query.isActive())
+.append( query.isActive() ? " " : "" )
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168274700
  
--- Diff: 
extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
 ---
@@ -243,8 +274,68 @@ private void updateCache() {
 it.close();
 }
 } catch (final QueryChangeLogException e) {
-LOG.error("Could not close the " + 
CloseableIteration.class.getName(), e);
+log.error("Could not close the " + 
CloseableIteration.class.getName(), e);
 }
+
+log.trace("updateCache() - Exit");
+}
+}
+
+@Override
+protected void runOneIteration() throws Exception {
+log.trace("runOneIteration() - Enter");
+lock.lock();
+try {
+updateCache();
+} finally {
+lock.unlock();
+}
+log.trace("runOneIteration() - Exit");
--- End diff --

Yep, done.


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168277599
  
--- Diff: 
extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java
 ---
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Utilities that are useful for interacting with {@link Thread}s while 
testing.
+ */
+public class ThreadUtil {
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168277168
  
--- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
 ---
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.concurrent.TimeUnit;
+
+import javax.xml.bind.JAXBException;
+
+import org.apache.commons.daemon.Daemon;
+import org.apache.commons.daemon.DaemonContext;
+import org.apache.commons.daemon.DaemonInitException;
+import org.apache.rya.streams.kafka.KafkaStreamsFactory;
+import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory;
+import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic;
+import org.apache.rya.streams.querymanager.kafka.KafkaQueryChangeLogSource;
+import org.apache.rya.streams.querymanager.kafka.LocalQueryExecutor;
+import org.apache.rya.streams.querymanager.xml.Kafka;
+import org.apache.rya.streams.querymanager.xml.QueryManagerConfig;
+import 
org.apache.rya.streams.querymanager.xml.QueryManagerConfig.PerformanceTunning.QueryChanngeLogDiscoveryPeriod;
+import 
org.apache.rya.streams.querymanager.xml.QueryManagerConfigUnmarshaller;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import 
com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * JSVC integration code for a {@link QueryManager} to be used as a 
non-Windows daemon.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryManagerDaemon implements Daemon {
+
+private static final Logger log = 
LoggerFactory.getLogger(QueryManagerDaemon.class);
+
+/**
+ * The default configuration file's path for the application.
+ */
+private static final Path DEFAULT_CONFIGURATION_PATH = 
Paths.get("config/configuration.xml");
+
+/**
+ * Command line parameters that are used by all commands that interact 
with Kafka.
+ */
+class DaemonParameters {
+@Parameter(names = {"--config", "-c"}, required = false, 
description = "The path to the application's configuration file.")
+public String config;
+}
+
+private QueryManager manager = null;
+
+@Override
+public void init(final DaemonContext context) throws 
DaemonInitException, Exception {
+requireNonNull(context);
+
+// Parse the command line arguments for the configuration file to 
use.
+final String[] args = context.getArguments();
+final DaemonParameters params = new DaemonParameters();
+try {
+new JCommander(params).parse(args);
+} catch(final ParameterException e) {
+throw new DaemonInitException("Unable to parse the command 
line arguments.", e);
+}
+final Path configFile = params.config != null ? 
Paths.get(params.config) : DEFAULT_CONFIGURATION_PATH;
+log.info("Loading the following configuration file: " + 
configFile);
+
+// Unmarshall the configuration file into an object.
+final QueryManagerConfig config;
+try(InputStream stream = Files.newInputStream(configFile)) {
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168277066
  
--- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
 ---
@@ -0,0 +1,884 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChangeLogListener;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import 
org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
+import 
org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import 
com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A service for managing {@link StreamsQuery} running on a Rya Streams 
system.
+ * 
+ * Only one QueryManager needs to be running to manage any number of rya
+ * instances/rya streams instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryManager extends AbstractService {
+private static final Logger log = 
LoggerFactory.getLogger(QueryManager.class);
+
+/**
+ * The source of {@link QueryChangeLog}s. Each log discovered is bound 
to a specific
+ * Rya instnace.
+ */
+private final QueryChangeLogSource changeLogSource;
+
+/**
+ * The engine that is responsible for executing {@link StreamsQuery}s.
+ */
+private final QueryExecutor queryExecutor;
+
+/**
+ * How long blocking operations will be attempted before potentially 
trying again.
+ */
+private final long blockingValue;
+
+/**
+ * The units for {@link #blockingValue}.
+ */
+private final TimeUnit blockingUnits;
+
+/**
+ * Used to inform threads that the application is shutting down, so 
they must stop work.
+ */
+private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+/**
+ * This thread pool manages the two thread used to work the {@link 
LogEvent}s
+ * and the {@link QueryEvent}s.
+ */
+private final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+
+/**
+ * Creates a new {@link QueryManager}.
+ *
+ * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not 
null)
+ * @param source - The {@link QueryChangeLogSource} of 
QueryChangeLogs. (not null)
+ * @param blockingValue - How long blocking operations will try before 
looping. (> 0)
+ * @param blockingUnits - The units of the {@code blockingValue}. (not 
null)
+ */
+

[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168276972
  
--- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
 ---
@@ -0,0 +1,884 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChangeLogListener;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import 
org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
+import 
org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import 
com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A service for managing {@link StreamsQuery} running on a Rya Streams 
system.
+ * 
+ * Only one QueryManager needs to be running to manage any number of rya
+ * instances/rya streams instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryManager extends AbstractService {
+private static final Logger log = 
LoggerFactory.getLogger(QueryManager.class);
+
+/**
+ * The source of {@link QueryChangeLog}s. Each log discovered is bound 
to a specific
+ * Rya instnace.
+ */
+private final QueryChangeLogSource changeLogSource;
+
+/**
+ * The engine that is responsible for executing {@link StreamsQuery}s.
+ */
+private final QueryExecutor queryExecutor;
+
+/**
+ * How long blocking operations will be attempted before potentially 
trying again.
+ */
+private final long blockingValue;
+
+/**
+ * The units for {@link #blockingValue}.
+ */
+private final TimeUnit blockingUnits;
+
+/**
+ * Used to inform threads that the application is shutting down, so 
they must stop work.
+ */
+private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+/**
+ * This thread pool manages the two thread used to work the {@link 
LogEvent}s
+ * and the {@link QueryEvent}s.
+ */
+private final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+
+/**
+ * Creates a new {@link QueryManager}.
+ *
+ * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not 
null)
+ * @param source - The {@link QueryChangeLogSource} of 
QueryChangeLogs. (not null)
+ * @param blockingValue - How long blocking operations will try before 
looping. (> 0)
+ * @param blockingUnits - The units of the {@code blockingValue}. (not 
null)
+ */
+

[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168276451
  
--- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
 ---
@@ -0,0 +1,884 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChangeLogListener;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import 
org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
+import 
org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import 
com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A service for managing {@link StreamsQuery} running on a Rya Streams 
system.
+ * 
+ * Only one QueryManager needs to be running to manage any number of rya
+ * instances/rya streams instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryManager extends AbstractService {
+private static final Logger log = 
LoggerFactory.getLogger(QueryManager.class);
+
+/**
+ * The source of {@link QueryChangeLog}s. Each log discovered is bound 
to a specific
+ * Rya instnace.
+ */
+private final QueryChangeLogSource changeLogSource;
+
+/**
+ * The engine that is responsible for executing {@link StreamsQuery}s.
+ */
+private final QueryExecutor queryExecutor;
+
+/**
+ * How long blocking operations will be attempted before potentially 
trying again.
+ */
+private final long blockingValue;
+
+/**
+ * The units for {@link #blockingValue}.
+ */
+private final TimeUnit blockingUnits;
+
+/**
+ * Used to inform threads that the application is shutting down, so 
they must stop work.
+ */
+private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+/**
+ * This thread pool manages the two thread used to work the {@link 
LogEvent}s
+ * and the {@link QueryEvent}s.
+ */
+private final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+
+/**
+ * Creates a new {@link QueryManager}.
+ *
+ * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not 
null)
+ * @param source - The {@link QueryChangeLogSource} of 
QueryChangeLogs. (not null)
+ * @param blockingValue - How long blocking operations will try before 
looping. (> 0)
+ * @param blockingUnits - The units of the {@code blockingValue}. (not 
null)
+ */
+

[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168276645
  
--- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
 ---
@@ -0,0 +1,884 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChangeLogListener;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import 
org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
+import 
org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import 
com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A service for managing {@link StreamsQuery} running on a Rya Streams 
system.
+ * 
+ * Only one QueryManager needs to be running to manage any number of rya
+ * instances/rya streams instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryManager extends AbstractService {
+private static final Logger log = 
LoggerFactory.getLogger(QueryManager.class);
+
+/**
+ * The source of {@link QueryChangeLog}s. Each log discovered is bound 
to a specific
+ * Rya instnace.
+ */
+private final QueryChangeLogSource changeLogSource;
+
+/**
+ * The engine that is responsible for executing {@link StreamsQuery}s.
+ */
+private final QueryExecutor queryExecutor;
+
+/**
+ * How long blocking operations will be attempted before potentially 
trying again.
+ */
+private final long blockingValue;
+
+/**
+ * The units for {@link #blockingValue}.
+ */
+private final TimeUnit blockingUnits;
+
+/**
+ * Used to inform threads that the application is shutting down, so 
they must stop work.
+ */
+private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+/**
+ * This thread pool manages the two thread used to work the {@link 
LogEvent}s
+ * and the {@link QueryEvent}s.
+ */
+private final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+
+/**
+ * Creates a new {@link QueryManager}.
+ *
+ * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not 
null)
+ * @param source - The {@link QueryChangeLogSource} of 
QueryChangeLogs. (not null)
+ * @param blockingValue - How long blocking operations will try before 
looping. (> 0)
+ * @param blockingUnits - The units of the {@code blockingValue}. (not 
null)
+ */
+

[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168276251
  
--- Diff: 
extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
 ---
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.kafka.topology.TopologyBuilderFactory;
+import 
org.apache.rya.streams.kafka.topology.TopologyBuilderFactory.TopologyBuilderException;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.openrdf.query.MalformedQueryException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Creates {@link KafkaStreams} objects that are able to process {@link 
StreamsQuery}s
+ * using a single thread of execution starting from the earliest point in 
within the
+ * input topic. The Application ID used by the client is based on the 
Query ID of the
+ * query that is being executed so that this job may resume where it left 
off if it
+ * is stopped.
+ */
+@DefaultAnnotation(NonNull.class)
+public class SingleThreadKafkaStreamsFactory implements 
KafkaStreamsFactory {
+
+private final TopologyBuilderFactory topologyFactory = new 
TopologyFactory();
+
+private final String bootstrapServersConfig;
+
+/**
+ * Constructs an instance of {@link SingleThreadKafkaStreamsFactory}.
+ *
+ * @param bootstrapServersConfig - Configures which Kafka cluster the 
jobs will interact with. (not null)
+ */
+public SingleThreadKafkaStreamsFactory(final String 
bootstrapServersConfig) {
+this.bootstrapServersConfig = 
requireNonNull(bootstrapServersConfig);
+}
+
+@Override
+public KafkaStreams make(final String ryaInstance, final StreamsQuery 
query) throws KafkaStreamsFactoryException {
+requireNonNull(ryaInstance);
+requireNonNull(query);
+
+// Setup the Kafka Stream program.
+final Properties streamsProps = new Properties();
+
+// Configure the Kafka servers that will be talked to.
+streamsProps.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServersConfig);
+
+// Use the Query ID as the Application ID to ensure we resume 
where we left off the last time this command was run.
+streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"RyaStreams-Query-" + query.getQueryId());
+
+// Always start at the beginning of the input topic.
+streamsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+
+// Setup the topology that processes the Query.
+final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
+final String resultsTopic = 
KafkaTopics.queryResultsTopic(query.getQueryId());
+
+try {
+final TopologyBuilder topologyBuilder = 
topologyFactory.build(query.getSparql(), statementsTopic, resultsTopic, new 
RandomUUIDFactory());
+return new KafkaStreams(topologyBuilder, new 
StreamsConfig(streamsProps));
+} catch (MalformedQueryException | TopologyBuilderException e) {
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168276101
  
--- Diff: 
extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
 ---
@@ -48,7 +51,28 @@
  */
 public static String queryChangeLogTopic(final String ryaInstance) {
 requireNonNull(ryaInstance);
-return ryaInstance + "-QueryChangeLog";
+return ryaInstance + QUERY_CHANGE_LOG_TOPIC_SUFFIX;
+}
+
+/**
+ * Get the Rya instance name from a Kafka topic name that has been 
used for a {@link QueryChnageLog}.
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168275881
  
--- Diff: 
extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
 ---
@@ -85,11 +92,16 @@ public void initializedWithPopulatedChangeLog() throws 
Exception {
 queries.delete( deletedMeId );
 
 // Create a new totally in memory QueryRepository.
-try(final QueryRepository initializedQueries = new 
InMemoryQueryRepository( changeLog )) {
+final QueryRepository initializedQueries = new 
InMemoryQueryRepository( changeLog, SCHEDULE );
+try {
 // Listing the queries should work using an initialized 
change log.
 final Set stored = initializedQueries.list();
 assertEquals(expected, stored);
+} finally {
+queries.stop();
--- End diff --

Removed.


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168275659
  
--- Diff: 
extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
 ---
@@ -110,6 +110,16 @@ public boolean equals(final Object o) {
 return false;
 }
 
+@Override
+public String toString() {
+return "QueryChange: {" +
+   "Query ID: " + queryId + ",\n" +
+   "Change Type: " + changeType + ",\n" +
+   "Is Active: " + isActive + ",\n" +
+   "SPARQL: " + sparql + "\n" +
--- End diff --

I'm going to assume it is sanitized by the time it reaches this line of 
code.


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168274631
  
--- Diff: 
extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
 ---
@@ -85,4 +85,19 @@ public boolean equals(final Object o) {
 }
 return false;
 }
+
+@Override
+public String toString() {
+final StringBuilder sb = new StringBuilder();
+sb.append("ID: ");
+sb.append(getQueryId().toString() + "\n");
+sb.append("Query: ");
+sb.append(getSparql() + "\n");
+sb.append("Is ");
+if (!isActive) {
+sb.append(" Not ");
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-09 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r167284717
  
--- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
 ---
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager.kafka;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
+import org.apache.rya.streams.querymanager.QueryChangeLogSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractScheduledService;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Represents a Kafka Server that manages {@link KafkaQueryChangeLog}s.
+ * 
+ * Thread safe.
--- End diff --

What's the dependency for that annotation?


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-09 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r167284418
  
--- Diff: extras/rya.streams/query-manager/pom.xml ---
@@ -0,0 +1,253 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+
+org.apache.rya
+rya.streams.parent
+3.2.12-incubating-SNAPSHOT
+
+
+4.0.0
+rya.streams.query-manager
+
+Apache Rya Streams Query Manager
+
+This module contains the Rya Streams Query Manager.
+
+
+
+
${project.build.directory}/${project.artifactId}-${project.version}-rpm-staging
+
+
+
+
+
+org.apache.rya
+rya.streams.kafka
+
+
+
+
+commons-daemon
+commons-daemon
+1.1.0
+
+
+org.slf4j
+slf4j-log4j12
+
+
+com.beust
+jcommander
+
+
+
+
+junit
+junit
+test
+
+
+org.apache.rya
+rya.test.kafka
+test
+
+
+org.mockito
+mockito-all
+test
+
+
+
+
+
+
+src/main/xsd
+
+
+
+
+
+
+org.codehaus.mojo
+jaxb2-maven-plugin
+
+
+xjc
+
+xjc
+
+
+
+
+
org.apache.rya.streams.querymanager.xml
+
+
+ 
+   
+
+com.mycila
+license-maven-plugin
+
+
${project.basedir}/src/license/header.txt
+
+
+
+update-generated-source-headers
+
+
${project.build.directory}/generated-sources/jaxb
+
+
XML_STYLE
+
+
+process-sources
+
+format
+
+
+
+
+
+
+
+org.apache.maven.plugins
+maven-shade-plugin
+
+
+package
+
+  shade
+
+
+
+
+
org.apache.rya.streams.querymanager.QueryManagerDaemon
+
+
+
+
+
+
+
+
+
+
+org.apache.maven.plugins
+maven-assembly-plugin
+
+
+
+stage-content-for-rpms
+
+single
+
+package
+
+
+false
+
+
src/assembly/rpm-staging.xml
+
+
+
+
+
+
+
+org.codehaus.mojo
+rpm-maven-plugin
+
+
+set-rpm-properties
+
+version
+
+
+
+create-rpm-distribution
--- End diff --

Do you 

[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-09 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r167283976
  
--- Diff: 
extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
 ---
@@ -110,6 +110,16 @@ public boolean equals(final Object o) {
 return false;
 }
 
+@Override
+public String toString() {
+return "QueryChange: {" +
+   "Query ID: " + queryId + ",\n" +
+   "Change Type: " + changeType + ",\n" +
+   "Is Active: " + isActive + ",\n" +
+   "SPARQL: " + sparql + "\n" +
--- End diff --

What does that function do?


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-09 Thread ejwhite922
Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r167241732
  
--- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
 ---
@@ -0,0 +1,884 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChangeLogListener;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import 
org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
+import 
org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import 
com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A service for managing {@link StreamsQuery} running on a Rya Streams 
system.
+ * 
+ * Only one QueryManager needs to be running to manage any number of rya
+ * instances/rya streams instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryManager extends AbstractService {
+private static final Logger log = 
LoggerFactory.getLogger(QueryManager.class);
+
+/**
+ * The source of {@link QueryChangeLog}s. Each log discovered is bound 
to a specific
+ * Rya instnace.
+ */
+private final QueryChangeLogSource changeLogSource;
+
+/**
+ * The engine that is responsible for executing {@link StreamsQuery}s.
+ */
+private final QueryExecutor queryExecutor;
+
+/**
+ * How long blocking operations will be attempted before potentially 
trying again.
+ */
+private final long blockingValue;
+
+/**
+ * The units for {@link #blockingValue}.
+ */
+private final TimeUnit blockingUnits;
+
+/**
+ * Used to inform threads that the application is shutting down, so 
they must stop work.
+ */
+private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+/**
+ * This thread pool manages the two thread used to work the {@link 
LogEvent}s
+ * and the {@link QueryEvent}s.
+ */
+private final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+
+/**
+ * Creates a new {@link QueryManager}.
+ *
+ * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not 
null)
+ * @param source - The {@link QueryChangeLogSource} of 
QueryChangeLogs. (not null)
+ * @param blockingValue - How long blocking operations will try before 
looping. (> 0)
+ * @param blockingUnits - The units of the {@code blockingValue}. (not 
null)
+ */
+

[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-09 Thread ejwhite922
Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r167242987
  
--- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
 ---
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.concurrent.TimeUnit;
+
+import javax.xml.bind.JAXBException;
+
+import org.apache.commons.daemon.Daemon;
+import org.apache.commons.daemon.DaemonContext;
+import org.apache.commons.daemon.DaemonInitException;
+import org.apache.rya.streams.kafka.KafkaStreamsFactory;
+import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory;
+import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic;
+import org.apache.rya.streams.querymanager.kafka.KafkaQueryChangeLogSource;
+import org.apache.rya.streams.querymanager.kafka.LocalQueryExecutor;
+import org.apache.rya.streams.querymanager.xml.Kafka;
+import org.apache.rya.streams.querymanager.xml.QueryManagerConfig;
+import 
org.apache.rya.streams.querymanager.xml.QueryManagerConfig.PerformanceTunning.QueryChanngeLogDiscoveryPeriod;
+import 
org.apache.rya.streams.querymanager.xml.QueryManagerConfigUnmarshaller;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import 
com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * JSVC integration code for a {@link QueryManager} to be used as a 
non-Windows daemon.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryManagerDaemon implements Daemon {
+
+private static final Logger log = 
LoggerFactory.getLogger(QueryManagerDaemon.class);
+
+/**
+ * The default configuration file's path for the application.
+ */
+private static final Path DEFAULT_CONFIGURATION_PATH = 
Paths.get("config/configuration.xml");
+
+/**
+ * Command line parameters that are used by all commands that interact 
with Kafka.
+ */
+class DaemonParameters {
+@Parameter(names = {"--config", "-c"}, required = false, 
description = "The path to the application's configuration file.")
+public String config;
+}
+
+private QueryManager manager = null;
+
+@Override
+public void init(final DaemonContext context) throws 
DaemonInitException, Exception {
+requireNonNull(context);
+
+// Parse the command line arguments for the configuration file to 
use.
+final String[] args = context.getArguments();
+final DaemonParameters params = new DaemonParameters();
+try {
+new JCommander(params).parse(args);
+} catch(final ParameterException e) {
+throw new DaemonInitException("Unable to parse the command 
line arguments.", e);
+}
+final Path configFile = params.config != null ? 
Paths.get(params.config) : DEFAULT_CONFIGURATION_PATH;
+log.info("Loading the following configuration file: " + 
configFile);
+
+// Unmarshall the configuration file into an object.
+final QueryManagerConfig config;
+try(InputStream stream = Files.newInputStream(configFile)) {
--- End diff --

final


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-09 Thread ejwhite922
Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r167252337
  
--- Diff: 
extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
 ---
@@ -107,12 +112,11 @@ private String formatQueries(final Set 
queries) {
 sb.append("Queries in Rya Streams:\n");
 
sb.append("-\n");
 queries.forEach(query -> {
-sb.append("ID: ");
-sb.append(query.getQueryId());
-sb.append("\t\t");
-sb.append("Query: ");
-sb.append(query.getSparql());
-sb.append("\n");
+sb.append("ID: ").append(query.getQueryId())
+.append("")
+.append("Is Active: ").append(query.isActive())
+.append( query.isActive() ? " " : "" )
--- End diff --

Maybe change to:


.append("Is Active: ")

.append(StringUtils.rightPad(query.isActive(), 9))



---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-09 Thread ejwhite922
Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r167240617
  
--- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
 ---
@@ -0,0 +1,884 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChangeLogListener;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import 
org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
+import 
org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import 
com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A service for managing {@link StreamsQuery} running on a Rya Streams 
system.
+ * 
+ * Only one QueryManager needs to be running to manage any number of rya
+ * instances/rya streams instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryManager extends AbstractService {
+private static final Logger log = 
LoggerFactory.getLogger(QueryManager.class);
+
+/**
+ * The source of {@link QueryChangeLog}s. Each log discovered is bound 
to a specific
+ * Rya instnace.
+ */
+private final QueryChangeLogSource changeLogSource;
+
+/**
+ * The engine that is responsible for executing {@link StreamsQuery}s.
+ */
+private final QueryExecutor queryExecutor;
+
+/**
+ * How long blocking operations will be attempted before potentially 
trying again.
+ */
+private final long blockingValue;
+
+/**
+ * The units for {@link #blockingValue}.
+ */
+private final TimeUnit blockingUnits;
+
+/**
+ * Used to inform threads that the application is shutting down, so 
they must stop work.
+ */
+private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+/**
+ * This thread pool manages the two thread used to work the {@link 
LogEvent}s
+ * and the {@link QueryEvent}s.
+ */
+private final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+
+/**
+ * Creates a new {@link QueryManager}.
+ *
+ * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not 
null)
+ * @param source - The {@link QueryChangeLogSource} of 
QueryChangeLogs. (not null)
+ * @param blockingValue - How long blocking operations will try before 
looping. (> 0)
+ * @param blockingUnits - The units of the {@code blockingValue}. (not 
null)
+ */
+

[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-09 Thread ejwhite922
Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r167253121
  
--- Diff: 
extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
 ---
@@ -107,12 +112,11 @@ private String formatQueries(final Set 
queries) {
 sb.append("Queries in Rya Streams:\n");
 
sb.append("-\n");
 queries.forEach(query -> {
-sb.append("ID: ");
-sb.append(query.getQueryId());
-sb.append("\t\t");
-sb.append("Query: ");
-sb.append(query.getSparql());
-sb.append("\n");
+sb.append("ID: ").append(query.getQueryId())
+.append("")
+.append("Is Active: ").append(query.isActive())
+.append( query.isActive() ? " " : "" )
+.append("Query: ").append(query.getSparql()).append("\n");
--- End diff --

LogUtils.clean(query.getSparql())


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-09 Thread ejwhite922
Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r167241001
  
--- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
 ---
@@ -0,0 +1,884 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChangeLogListener;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import 
org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
+import 
org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import 
com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A service for managing {@link StreamsQuery} running on a Rya Streams 
system.
+ * 
+ * Only one QueryManager needs to be running to manage any number of rya
+ * instances/rya streams instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryManager extends AbstractService {
+private static final Logger log = 
LoggerFactory.getLogger(QueryManager.class);
+
+/**
+ * The source of {@link QueryChangeLog}s. Each log discovered is bound 
to a specific
+ * Rya instnace.
+ */
+private final QueryChangeLogSource changeLogSource;
+
+/**
+ * The engine that is responsible for executing {@link StreamsQuery}s.
+ */
+private final QueryExecutor queryExecutor;
+
+/**
+ * How long blocking operations will be attempted before potentially 
trying again.
+ */
+private final long blockingValue;
+
+/**
+ * The units for {@link #blockingValue}.
+ */
+private final TimeUnit blockingUnits;
+
+/**
+ * Used to inform threads that the application is shutting down, so 
they must stop work.
+ */
+private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+/**
+ * This thread pool manages the two thread used to work the {@link 
LogEvent}s
+ * and the {@link QueryEvent}s.
+ */
+private final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+
+/**
+ * Creates a new {@link QueryManager}.
+ *
+ * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not 
null)
+ * @param source - The {@link QueryChangeLogSource} of 
QueryChangeLogs. (not null)
+ * @param blockingValue - How long blocking operations will try before 
looping. (> 0)
+ * @param blockingUnits - The units of the {@code blockingValue}. (not 
null)
+ */
+

[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-09 Thread ejwhite922
Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r167252837
  
--- Diff: 
extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
 ---
@@ -85,4 +85,19 @@ public boolean equals(final Object o) {
 }
 return false;
 }
+
+@Override
+public String toString() {
+final StringBuilder sb = new StringBuilder();
+sb.append("ID: ");
+sb.append(getQueryId().toString() + "\n");
+sb.append("Query: ");
+sb.append(getSparql() + "\n");
--- End diff --

Is "getSparql()" coming from user input? Might want to wrap that in 
org.apache.rya.api.log.LogUtils.clean(getSparql())


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-09 Thread ejwhite922
Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r167235009
  
--- Diff: 
extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
 ---
@@ -48,7 +51,28 @@
  */
 public static String queryChangeLogTopic(final String ryaInstance) {
 requireNonNull(ryaInstance);
-return ryaInstance + "-QueryChangeLog";
+return ryaInstance + QUERY_CHANGE_LOG_TOPIC_SUFFIX;
+}
+
+/**
+ * Get the Rya instance name from a Kafka topic name that has been 
used for a {@link QueryChnageLog}.
--- End diff --

typo.  QueryChangeLog


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-09 Thread ejwhite922
Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r167231162
  
--- Diff: 
extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
 ---
@@ -243,8 +274,68 @@ private void updateCache() {
 it.close();
 }
 } catch (final QueryChangeLogException e) {
-LOG.error("Could not close the " + 
CloseableIteration.class.getName(), e);
+log.error("Could not close the " + 
CloseableIteration.class.getName(), e);
 }
+
+log.trace("updateCache() - Exit");
+}
+}
+
+@Override
+protected void runOneIteration() throws Exception {
+log.trace("runOneIteration() - Enter");
+lock.lock();
+try {
+updateCache();
+} finally {
+lock.unlock();
+}
+log.trace("runOneIteration() - Exit");
--- End diff --

Should this log message be moved up into the finally block so it's called 
if an Exception is thrown?


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-09 Thread ejwhite922
Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r167249191
  
--- Diff: 
extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java
 ---
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Utilities that are useful for interacting with {@link Thread}s while 
testing.
+ */
+public class ThreadUtil {
--- End diff --

Add a private constructor to prevent instantiation


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-09 Thread ejwhite922
Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r167230725
  
--- Diff: 
extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
 ---
@@ -85,4 +85,19 @@ public boolean equals(final Object o) {
 }
 return false;
 }
+
+@Override
+public String toString() {
+final StringBuilder sb = new StringBuilder();
+sb.append("ID: ");
+sb.append(getQueryId().toString() + "\n");
+sb.append("Query: ");
+sb.append(getSparql() + "\n");
+sb.append("Is ");
+if (!isActive) {
+sb.append(" Not ");
--- End diff --

No space is needed before "Not".  The previous append has one.


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-09 Thread ejwhite922
Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r167235195
  
--- Diff: 
extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
 ---
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.kafka.topology.TopologyBuilderFactory;
+import 
org.apache.rya.streams.kafka.topology.TopologyBuilderFactory.TopologyBuilderException;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.openrdf.query.MalformedQueryException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Creates {@link KafkaStreams} objects that are able to process {@link 
StreamsQuery}s
+ * using a single thread of execution starting from the earliest point in 
within the
+ * input topic. The Application ID used by the client is based on the 
Query ID of the
+ * query that is being executed so that this job may resume where it left 
off if it
+ * is stopped.
+ */
+@DefaultAnnotation(NonNull.class)
+public class SingleThreadKafkaStreamsFactory implements 
KafkaStreamsFactory {
+
+private final TopologyBuilderFactory topologyFactory = new 
TopologyFactory();
+
+private final String bootstrapServersConfig;
+
+/**
+ * Constructs an instance of {@link SingleThreadKafkaStreamsFactory}.
+ *
+ * @param bootstrapServersConfig - Configures which Kafka cluster the 
jobs will interact with. (not null)
+ */
+public SingleThreadKafkaStreamsFactory(final String 
bootstrapServersConfig) {
+this.bootstrapServersConfig = 
requireNonNull(bootstrapServersConfig);
+}
+
+@Override
+public KafkaStreams make(final String ryaInstance, final StreamsQuery 
query) throws KafkaStreamsFactoryException {
+requireNonNull(ryaInstance);
+requireNonNull(query);
+
+// Setup the Kafka Stream program.
+final Properties streamsProps = new Properties();
+
+// Configure the Kafka servers that will be talked to.
+streamsProps.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServersConfig);
+
+// Use the Query ID as the Application ID to ensure we resume 
where we left off the last time this command was run.
+streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"RyaStreams-Query-" + query.getQueryId());
+
+// Always start at the beginning of the input topic.
+streamsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+
+// Setup the topology that processes the Query.
+final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
+final String resultsTopic = 
KafkaTopics.queryResultsTopic(query.getQueryId());
+
+try {
+final TopologyBuilder topologyBuilder = 
topologyFactory.build(query.getSparql(), statementsTopic, resultsTopic, new 
RandomUUIDFactory());
+return new KafkaStreams(topologyBuilder, new 
StreamsConfig(streamsProps));
+} catch (MalformedQueryException | TopologyBuilderException e) {
--- End diff --

add final to Exceptions


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-09 Thread ejwhite922
Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r167242070
  
--- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
 ---
@@ -0,0 +1,884 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChangeLogListener;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import 
org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
+import 
org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import 
com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A service for managing {@link StreamsQuery} running on a Rya Streams 
system.
+ * 
+ * Only one QueryManager needs to be running to manage any number of rya
+ * instances/rya streams instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryManager extends AbstractService {
+private static final Logger log = 
LoggerFactory.getLogger(QueryManager.class);
+
+/**
+ * The source of {@link QueryChangeLog}s. Each log discovered is bound 
to a specific
+ * Rya instnace.
+ */
+private final QueryChangeLogSource changeLogSource;
+
+/**
+ * The engine that is responsible for executing {@link StreamsQuery}s.
+ */
+private final QueryExecutor queryExecutor;
+
+/**
+ * How long blocking operations will be attempted before potentially 
trying again.
+ */
+private final long blockingValue;
+
+/**
+ * The units for {@link #blockingValue}.
+ */
+private final TimeUnit blockingUnits;
+
+/**
+ * Used to inform threads that the application is shutting down, so 
they must stop work.
+ */
+private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+/**
+ * This thread pool manages the two thread used to work the {@link 
LogEvent}s
+ * and the {@link QueryEvent}s.
+ */
+private final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+
+/**
+ * Creates a new {@link QueryManager}.
+ *
+ * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not 
null)
+ * @param source - The {@link QueryChangeLogSource} of 
QueryChangeLogs. (not null)
+ * @param blockingValue - How long blocking operations will try before 
looping. (> 0)
+ * @param blockingUnits - The units of the {@code blockingValue}. (not 
null)
+ */
+

[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-09 Thread ejwhite922
Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r167243543
  
--- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java
 ---
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager.kafka;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.rya.streams.kafka.KafkaTopics;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog;
+import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory;
+import org.apache.rya.streams.querymanager.QueryChangeLogSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractScheduledService;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Represents a Kafka Server that manages {@link KafkaQueryChangeLog}s.
+ * 
+ * Thread safe.
--- End diff --

Change to @ThreadSafe


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-09 Thread ejwhite922
Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r167240341
  
--- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
 ---
@@ -0,0 +1,884 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChangeLogListener;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import 
org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
+import 
org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import 
com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A service for managing {@link StreamsQuery} running on a Rya Streams 
system.
+ * 
+ * Only one QueryManager needs to be running to manage any number of rya
+ * instances/rya streams instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryManager extends AbstractService {
+private static final Logger log = 
LoggerFactory.getLogger(QueryManager.class);
+
+/**
+ * The source of {@link QueryChangeLog}s. Each log discovered is bound 
to a specific
+ * Rya instnace.
+ */
+private final QueryChangeLogSource changeLogSource;
+
+/**
+ * The engine that is responsible for executing {@link StreamsQuery}s.
+ */
+private final QueryExecutor queryExecutor;
+
+/**
+ * How long blocking operations will be attempted before potentially 
trying again.
+ */
+private final long blockingValue;
+
+/**
+ * The units for {@link #blockingValue}.
+ */
+private final TimeUnit blockingUnits;
+
+/**
+ * Used to inform threads that the application is shutting down, so 
they must stop work.
+ */
+private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+/**
+ * This thread pool manages the two thread used to work the {@link 
LogEvent}s
+ * and the {@link QueryEvent}s.
+ */
+private final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+
+/**
+ * Creates a new {@link QueryManager}.
+ *
+ * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not 
null)
+ * @param source - The {@link QueryChangeLogSource} of 
QueryChangeLogs. (not null)
+ * @param blockingValue - How long blocking operations will try before 
looping. (> 0)
+ * @param blockingUnits - The units of the {@code blockingValue}. (not 
null)
+ */
+

[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-09 Thread ejwhite922
Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r167236812
  
--- Diff: extras/rya.streams/query-manager/pom.xml ---
@@ -0,0 +1,253 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+
+
+org.apache.rya
+rya.streams.parent
+3.2.12-incubating-SNAPSHOT
+
+
+4.0.0
+rya.streams.query-manager
+
+Apache Rya Streams Query Manager
+
+This module contains the Rya Streams Query Manager.
+
+
+
+
${project.build.directory}/${project.artifactId}-${project.version}-rpm-staging
+
+
+
+
+
+org.apache.rya
+rya.streams.kafka
+
+
+
+
+commons-daemon
+commons-daemon
+1.1.0
+
+
+org.slf4j
+slf4j-log4j12
+
+
+com.beust
+jcommander
+
+
+
+
+junit
+junit
+test
+
+
+org.apache.rya
+rya.test.kafka
+test
+
+
+org.mockito
+mockito-all
+test
+
+
+
+
+
+
+src/main/xsd
+
+
+
+
+
+
+org.codehaus.mojo
+jaxb2-maven-plugin
+
+
+xjc
+
+xjc
+
+
+
+
+
org.apache.rya.streams.querymanager.xml
+
+
+ 
+   
+
+com.mycila
+license-maven-plugin
+
+
${project.basedir}/src/license/header.txt
+
+
+
+update-generated-source-headers
+
+
${project.build.directory}/generated-sources/jaxb
+
+
XML_STYLE
+
+
+process-sources
+
+format
+
+
+
+
+
+
+
+org.apache.maven.plugins
+maven-shade-plugin
+
+
+package
+
+  shade
+
+
+
+
+
org.apache.rya.streams.querymanager.QueryManagerDaemon
+
+
+
+
+
+
+
+
+
+
+org.apache.maven.plugins
+maven-assembly-plugin
+
+
+
+stage-content-for-rpms
+
+single
+
+package
+
+
+false
+
+
src/assembly/rpm-staging.xml
+
+
+
+
+
+
+
+org.codehaus.mojo
+rpm-maven-plugin
+
+
+set-rpm-properties
+
+version
+
+
+
+create-rpm-distribution
--- End diff --

We'll 

[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-09 Thread ejwhite922
Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r167242868
  
--- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
 ---
@@ -0,0 +1,884 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChangeLogListener;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import 
org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
+import 
org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import 
com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A service for managing {@link StreamsQuery} running on a Rya Streams 
system.
+ * 
+ * Only one QueryManager needs to be running to manage any number of rya
+ * instances/rya streams instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryManager extends AbstractService {
+private static final Logger log = 
LoggerFactory.getLogger(QueryManager.class);
+
+/**
+ * The source of {@link QueryChangeLog}s. Each log discovered is bound 
to a specific
+ * Rya instnace.
+ */
+private final QueryChangeLogSource changeLogSource;
+
+/**
+ * The engine that is responsible for executing {@link StreamsQuery}s.
+ */
+private final QueryExecutor queryExecutor;
+
+/**
+ * How long blocking operations will be attempted before potentially 
trying again.
+ */
+private final long blockingValue;
+
+/**
+ * The units for {@link #blockingValue}.
+ */
+private final TimeUnit blockingUnits;
+
+/**
+ * Used to inform threads that the application is shutting down, so 
they must stop work.
+ */
+private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+/**
+ * This thread pool manages the two thread used to work the {@link 
LogEvent}s
+ * and the {@link QueryEvent}s.
+ */
+private final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+
+/**
+ * Creates a new {@link QueryManager}.
+ *
+ * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not 
null)
+ * @param source - The {@link QueryChangeLogSource} of 
QueryChangeLogs. (not null)
+ * @param blockingValue - How long blocking operations will try before 
looping. (> 0)
+ * @param blockingUnits - The units of the {@code blockingValue}. (not 
null)
+ */
+

[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-09 Thread ejwhite922
Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r167232266
  
--- Diff: 
extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
 ---
@@ -110,6 +110,16 @@ public boolean equals(final Object o) {
 return false;
 }
 
+@Override
+public String toString() {
+return "QueryChange: {" +
+   "Query ID: " + queryId + ",\n" +
+   "Change Type: " + changeType + ",\n" +
+   "Is Active: " + isActive + ",\n" +
+   "SPARQL: " + sparql + "\n" +
--- End diff --

Is "sparql" coming from user input?  Might want to wrap that in 
org.apache.rya.api.log.LogUtils.clean(sparql)


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-09 Thread ejwhite922
Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r167234267
  
--- Diff: 
extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
 ---
@@ -85,11 +92,16 @@ public void initializedWithPopulatedChangeLog() throws 
Exception {
 queries.delete( deletedMeId );
 
 // Create a new totally in memory QueryRepository.
-try(final QueryRepository initializedQueries = new 
InMemoryQueryRepository( changeLog )) {
+final QueryRepository initializedQueries = new 
InMemoryQueryRepository( changeLog, SCHEDULE );
+try {
 // Listing the queries should work using an initialized 
change log.
 final Set stored = initializedQueries.list();
 assertEquals(expected, stored);
+} finally {
+queries.stop();
--- End diff --

This seems redundant with the outer try-finally block calling stop().  
Unless the intent is to test it handling 2 consecutive calls to stop().  The 
inner try-finally can probably be removed.


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-08 Thread kchilton2
GitHub user kchilton2 opened a pull request:

https://github.com/apache/incubator-rya/pull/272

RYA-443 Rya Streams Query Manager daemon program.

## Description
Implemented an RPM installed CentOS 7 daemon service used to execute Kafka 
Streams jobs on the local machine. This is part of Rya Streams.

### Tests
Wrote automated tests and executed the application on a CentOS 7 machine 
using the RPM.

### Links
https://issues.apache.org/jira/browse/RYA-443

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kchilton2/incubator-rya 
RYA-443_rya-streams-querymanager

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-rya/pull/272.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #272


commit 6688c2ece024132d3dc6903af9d1003079d330eb
Author: Andrew Smith 
Date:   2018-01-16T17:39:34Z

RYA-446 Daemonifying Query Manager

commit 8803d7d99f8c7189991be181dd11bd0fb9f1e704
Author: Andrew Smith 
Date:   2018-01-18T22:51:40Z

RYA-449 Create QueryChangeLogSource

commit c048db73a5211a18ab649a2647dfa7f7fb929166
Author: Andrew Smith 
Date:   2018-01-19T20:43:55Z

Rya 454 added QueryExecutor interface

commit 0dad1290d593065561fae079b837fcbc479795ed
Author: Andrew Smith 
Date:   2018-01-23T20:20:50Z

Rya 452 Updated QueryRepository

Updated QueryRepository to be a Service
Updated InMemoryQueryRepository to be an AbstractScheduledService
Added listeners to InMemoryQueryRepository

commit 3810fb709a4a97ce729e93733edd9af10d6e5445
Author: Andrew Smith 
Date:   2018-01-23T20:44:32Z

RYA-455 stopAll queries for a rya instance

commit e774ef02161686d9c2f0c6e99ea3b8a438fd240a
Author: kchilton2 
Date:   2018-01-23T20:50:17Z

RYA-448 Implement JAXB marshalling code for the Query Manager's XML 
configuration file.

commit 13cfd43ba3609c0690aeebe27eb7b76a88d2ddbd
Author: kchilton2 
Date:   2018-01-23T21:17:06Z

RYA-450 Implemented a Kafka backed QueryChangeLogSource.

commit eee8d830abc95e4082d906fe7e0dad5944d96b87
Author: kchilton2 
Date:   2018-01-26T17:06:07Z

RYA-458 Updating the configuration XML so that you may only specify a 
single QueryChangeLogSource.

commit 815d995c359ae15b5669c1e7c7e375d7c67f8dec
Author: kchilton2 
Date:   2018-01-26T20:55:59Z

RYA-456 Implement a Single Node implementation of QueryExecutor.

commit ba2898a8c4ba58ad3e38323f28389f75ab262c83
Author: Andrew Smith 
Date:   2018-01-30T19:01:54Z

Rya 451 Query manager

QueryManager with tests
updated InMemoryQueryRepository and its tests

commit 34e616ce63ac4b93a64ca11a10cde148760afc52
Author: kchilton2 
Date:   2018-01-30T19:44:58Z

RYA-453 Implement the Query Manager's Daemon that controls the application.

commit 8d35b4541bd39734cc68c7e870027760e537d1e7
Author: kchilton2 
Date:   2018-01-25T22:19:58Z

RYA-446 Create a bin and rpm distribution for the Rya Streams Query Manager 
application.

commit 4f4e0f4ce0d2eb2d8651b8f569a8f59c8eaaf571
Author: Andrew Smith 
Date:   2018-02-01T21:04:48Z

RYA-446 Service unit file for systemd

commit 4a4456e7cfd18fdd9044b67152252b3623ebe466
Author: kchilton2 
Date:   2018-02-01T23:31:41Z

RYA-446 Making the Rya Streams Query Manager run as a service on CentOS 7.

commit b889bc0d60e3884ea31edbcb12fcc21331f77963
Author: kchilton2 
Date:   2018-02-03T03:47:59Z

RYA-451 Fixing threading issues with the QueryManager class.




---