[jira] [Commented] (RYA-443) Implement a single node query manager

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364768#comment-16364768
 ] 

ASF GitHub Bot commented on RYA-443:


Github user asfgit commented on the issue:

https://github.com/apache/incubator-rya/pull/272
  

Refer to this link for build results (access rights to CI server needed): 

https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/691/Build
 result: FAILURE[...truncated 3.03 MB...][INFO] Apache Rya Spark 
Support ... SKIPPED[INFO] Apache Rya Web Projects 
 SKIPPED[INFO] Apache Rya Web Implementation 
.. SKIPPED[INFO] 
[INFO] 
BUILD FAILURE[INFO] 
[INFO] 
Total time: 37:36 min[INFO] Finished at: 2018-02-14T20:56:07+00:00[INFO] Final 
Memory: 535M/2904M[INFO] 
[ERROR] 
Failed to execute goal org.codehaus.mojo:rpm-maven-plugin:2.1.5:attached-rpm 
(create-rpm-distribution) on project rya.streams.query-manager: Unable to query 
for default vendor from RPM: Error while executing process. Cannot run program 
"rpm": error=2, No such file or directory -> [Help 1][ERROR] [ERROR] To see the 
full stack trace of the errors, re-run Maven with the -e switch.[ERROR] Re-run 
Maven using the -X switch to enable full debug logging.[ERROR] [ERROR] For more 
information about the errors and possible solutions, please read the following 
articles:[ERROR] [Help 1] 
http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException[ERROR] 
[ERROR] After correcting the problems, you can resume the build with the 
command[ERROR]   mvn  -rf :rya.streams.query-managerchannel 
stoppedSetting status of 6428ca81a5cb0ef5bd2e762452f1a6e46dff88fc to FAILURE 
with url 
https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/691/
 and message: 'FAILURE 'Using context: Jenkins: clean package -Pgeoindexing



> Implement a single node query manager
> -
>
> Key: RYA-443
> URL: https://issues.apache.org/jira/browse/RYA-443
> Project: Rya
>  Issue Type: Task
>Reporter: Andrew Smith
>Assignee: Kevin Chilton
>Priority: Major
>
> We need an application that watches the QueryChangeLog to see when the 
> isActive state of queries changes and then reacts to the state change. If 
> isActive goes to true, the system must start processing the query. If it is 
> false, then it must stop processing the query. This application needs to 
> start when the host machine starts. We plan to support CentOS 7.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] incubator-rya issue #272: RYA-443 Rya Streams Query Manager daemon program.

2018-02-14 Thread asfgit
Github user asfgit commented on the issue:

https://github.com/apache/incubator-rya/pull/272
  

Refer to this link for build results (access rights to CI server needed): 

https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/691/Build
 result: FAILURE[...truncated 3.03 MB...][INFO] Apache Rya Spark 
Support ... SKIPPED[INFO] Apache Rya Web Projects 
 SKIPPED[INFO] Apache Rya Web Implementation 
.. SKIPPED[INFO] 
[INFO] 
BUILD FAILURE[INFO] 
[INFO] 
Total time: 37:36 min[INFO] Finished at: 2018-02-14T20:56:07+00:00[INFO] Final 
Memory: 535M/2904M[INFO] 
[ERROR] 
Failed to execute goal org.codehaus.mojo:rpm-maven-plugin:2.1.5:attached-rpm 
(create-rpm-distribution) on project rya.streams.query-manager: Unable to query 
for default vendor from RPM: Error while executing process. Cannot run program 
"rpm": error=2, No such file or directory 
 -> [Help 1][ERROR] [ERROR] To see the full stack trace of the errors, re-run 
Maven with the -e switch.[ERROR] Re-run Maven using the -X switch to enable 
full debug logging.[ERROR] [ERROR] For more information about the errors and 
possible solutions, please read the following articles:[ERROR] [Help 1] 
http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException[ERROR] 
[ERROR] After correcting the problems, you can resume the build with the 
command[ERROR]   mvn  -rf :rya.streams.query-managerchannel 
stoppedSetting status of 6428ca81a5cb0ef5bd2e762452f1a6e46dff88fc to FAILURE 
with url 
https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/691/
 and message: 'FAILURE 'Using context: Jenkins: clean package -Pgeoindexing



---


[jira] [Commented] (RYA-443) Implement a single node query manager

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364619#comment-16364619
 ] 

ASF GitHub Bot commented on RYA-443:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168276645
  
--- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
 ---
@@ -0,0 +1,884 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChangeLogListener;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import 
org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
+import 
org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import 
com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A service for managing {@link StreamsQuery} running on a Rya Streams 
system.
+ * 
+ * Only one QueryManager needs to be running to manage any number of rya
+ * instances/rya streams instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryManager extends AbstractService {
+private static final Logger log = 
LoggerFactory.getLogger(QueryManager.class);
+
+/**
+ * The source of {@link QueryChangeLog}s. Each log discovered is bound 
to a specific
+ * Rya instnace.
+ */
+private final QueryChangeLogSource changeLogSource;
+
+/**
+ * The engine that is responsible for executing {@link StreamsQuery}s.
+ */
+private final QueryExecutor queryExecutor;
+
+/**
+ * How long blocking operations will be attempted before potentially 
trying again.
+ */
+private final long blockingValue;
+
+/**
+ * The units for {@link #blockingValue}.
+ */
+private final TimeUnit blockingUnits;
+
+/**
+ * Used to inform threads that the application is shutting down, so 
they must stop work.
+ */
+private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+/**
+ * This thread pool manages the two thread used to work the {@link 
LogEvent}s
+ * and the {@link QueryEvent}s.
+ */
+private final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+
+/**
+ * Creates a new {@link QueryManager}.
+ *
+ * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not 
null)
+ * @param source - The {@link QueryChangeLogSource} of 

[jira] [Commented] (RYA-443) Implement a single node query manager

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364607#comment-16364607
 ] 

ASF GitHub Bot commented on RYA-443:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168274700
  
--- Diff: 
extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
 ---
@@ -243,8 +274,68 @@ private void updateCache() {
 it.close();
 }
 } catch (final QueryChangeLogException e) {
-LOG.error("Could not close the " + 
CloseableIteration.class.getName(), e);
+log.error("Could not close the " + 
CloseableIteration.class.getName(), e);
 }
+
+log.trace("updateCache() - Exit");
+}
+}
+
+@Override
+protected void runOneIteration() throws Exception {
+log.trace("runOneIteration() - Enter");
+lock.lock();
+try {
+updateCache();
+} finally {
+lock.unlock();
+}
+log.trace("runOneIteration() - Exit");
--- End diff --

Yep, done.


> Implement a single node query manager
> -
>
> Key: RYA-443
> URL: https://issues.apache.org/jira/browse/RYA-443
> Project: Rya
>  Issue Type: Task
>Reporter: Andrew Smith
>Assignee: Kevin Chilton
>Priority: Major
>
> We need an application that watches the QueryChangeLog to see when the 
> isActive state of queries changes and then reacts to the state change. If 
> isActive goes to true, the system must start processing the query. If it is 
> false, then it must stop processing the query. This application needs to 
> start when the host machine starts. We plan to support CentOS 7.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (RYA-443) Implement a single node query manager

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364633#comment-16364633
 ] 

ASF GitHub Bot commented on RYA-443:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168278465
  
--- Diff: 
extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
 ---
@@ -107,12 +112,11 @@ private String formatQueries(final Set 
queries) {
 sb.append("Queries in Rya Streams:\n");
 
sb.append("-\n");
 queries.forEach(query -> {
-sb.append("ID: ");
-sb.append(query.getQueryId());
-sb.append("\t\t");
-sb.append("Query: ");
-sb.append(query.getSparql());
-sb.append("\n");
+sb.append("ID: ").append(query.getQueryId())
+.append("")
+.append("Is Active: ").append(query.isActive())
+.append( query.isActive() ? " " : "" )
+.append("Query: ").append(query.getSparql()).append("\n");
--- End diff --

Should b e sanitized before it gets here.


> Implement a single node query manager
> -
>
> Key: RYA-443
> URL: https://issues.apache.org/jira/browse/RYA-443
> Project: Rya
>  Issue Type: Task
>Reporter: Andrew Smith
>Assignee: Kevin Chilton
>Priority: Major
>
> We need an application that watches the QueryChangeLog to see when the 
> isActive state of queries changes and then reacts to the state change. If 
> isActive goes to true, the system must start processing the query. If it is 
> false, then it must stop processing the query. This application needs to 
> start when the host machine starts. We plan to support CentOS 7.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (RYA-443) Implement a single node query manager

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364623#comment-16364623
 ] 

ASF GitHub Bot commented on RYA-443:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168277168
  
--- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
 ---
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.concurrent.TimeUnit;
+
+import javax.xml.bind.JAXBException;
+
+import org.apache.commons.daemon.Daemon;
+import org.apache.commons.daemon.DaemonContext;
+import org.apache.commons.daemon.DaemonInitException;
+import org.apache.rya.streams.kafka.KafkaStreamsFactory;
+import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory;
+import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic;
+import org.apache.rya.streams.querymanager.kafka.KafkaQueryChangeLogSource;
+import org.apache.rya.streams.querymanager.kafka.LocalQueryExecutor;
+import org.apache.rya.streams.querymanager.xml.Kafka;
+import org.apache.rya.streams.querymanager.xml.QueryManagerConfig;
+import 
org.apache.rya.streams.querymanager.xml.QueryManagerConfig.PerformanceTunning.QueryChanngeLogDiscoveryPeriod;
+import 
org.apache.rya.streams.querymanager.xml.QueryManagerConfigUnmarshaller;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import 
com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * JSVC integration code for a {@link QueryManager} to be used as a 
non-Windows daemon.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryManagerDaemon implements Daemon {
+
+private static final Logger log = 
LoggerFactory.getLogger(QueryManagerDaemon.class);
+
+/**
+ * The default configuration file's path for the application.
+ */
+private static final Path DEFAULT_CONFIGURATION_PATH = 
Paths.get("config/configuration.xml");
+
+/**
+ * Command line parameters that are used by all commands that interact 
with Kafka.
+ */
+class DaemonParameters {
+@Parameter(names = {"--config", "-c"}, required = false, 
description = "The path to the application's configuration file.")
+public String config;
+}
+
+private QueryManager manager = null;
+
+@Override
+public void init(final DaemonContext context) throws 
DaemonInitException, Exception {
+requireNonNull(context);
+
+// Parse the command line arguments for the configuration file to 
use.
+final String[] args = context.getArguments();
+final DaemonParameters params = new DaemonParameters();
+try {
+new JCommander(params).parse(args);
+} catch(final ParameterException e) {
+throw new DaemonInitException("Unable to parse the command 
line arguments.", e);
+}
+final Path configFile = params.config != null ? 
Paths.get(params.config) : DEFAULT_CONFIGURATION_PATH;
+log.info("Loading the following configuration file: " + 
configFile);
+
+// Unmarshall the configuration file into an object.
+final QueryManagerConfig config;
+try(InputStream stream = 

[jira] [Commented] (RYA-443) Implement a single node query manager

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364616#comment-16364616
 ] 

ASF GitHub Bot commented on RYA-443:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168276451
  
--- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
 ---
@@ -0,0 +1,884 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChangeLogListener;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import 
org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
+import 
org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import 
com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A service for managing {@link StreamsQuery} running on a Rya Streams 
system.
+ * 
+ * Only one QueryManager needs to be running to manage any number of rya
+ * instances/rya streams instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryManager extends AbstractService {
+private static final Logger log = 
LoggerFactory.getLogger(QueryManager.class);
+
+/**
+ * The source of {@link QueryChangeLog}s. Each log discovered is bound 
to a specific
+ * Rya instnace.
+ */
+private final QueryChangeLogSource changeLogSource;
+
+/**
+ * The engine that is responsible for executing {@link StreamsQuery}s.
+ */
+private final QueryExecutor queryExecutor;
+
+/**
+ * How long blocking operations will be attempted before potentially 
trying again.
+ */
+private final long blockingValue;
+
+/**
+ * The units for {@link #blockingValue}.
+ */
+private final TimeUnit blockingUnits;
+
+/**
+ * Used to inform threads that the application is shutting down, so 
they must stop work.
+ */
+private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+/**
+ * This thread pool manages the two thread used to work the {@link 
LogEvent}s
+ * and the {@link QueryEvent}s.
+ */
+private final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+
+/**
+ * Creates a new {@link QueryManager}.
+ *
+ * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not 
null)
+ * @param source - The {@link QueryChangeLogSource} of 

[jira] [Commented] (RYA-443) Implement a single node query manager

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364613#comment-16364613
 ] 

ASF GitHub Bot commented on RYA-443:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168276101
  
--- Diff: 
extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
 ---
@@ -48,7 +51,28 @@
  */
 public static String queryChangeLogTopic(final String ryaInstance) {
 requireNonNull(ryaInstance);
-return ryaInstance + "-QueryChangeLog";
+return ryaInstance + QUERY_CHANGE_LOG_TOPIC_SUFFIX;
+}
+
+/**
+ * Get the Rya instance name from a Kafka topic name that has been 
used for a {@link QueryChnageLog}.
--- End diff --

Done.


> Implement a single node query manager
> -
>
> Key: RYA-443
> URL: https://issues.apache.org/jira/browse/RYA-443
> Project: Rya
>  Issue Type: Task
>Reporter: Andrew Smith
>Assignee: Kevin Chilton
>Priority: Major
>
> We need an application that watches the QueryChangeLog to see when the 
> isActive state of queries changes and then reacts to the state change. If 
> isActive goes to true, the system must start processing the query. If it is 
> false, then it must stop processing the query. This application needs to 
> start when the host machine starts. We plan to support CentOS 7.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (RYA-443) Implement a single node query manager

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364629#comment-16364629
 ] 

ASF GitHub Bot commented on RYA-443:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168278274
  
--- Diff: 
extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
 ---
@@ -107,12 +112,11 @@ private String formatQueries(final Set 
queries) {
 sb.append("Queries in Rya Streams:\n");
 
sb.append("-\n");
 queries.forEach(query -> {
-sb.append("ID: ");
-sb.append(query.getQueryId());
-sb.append("\t\t");
-sb.append("Query: ");
-sb.append(query.getSparql());
-sb.append("\n");
+sb.append("ID: ").append(query.getQueryId())
+.append("")
+.append("Is Active: ").append(query.isActive())
+.append( query.isActive() ? " " : "" )
--- End diff --

Done.


> Implement a single node query manager
> -
>
> Key: RYA-443
> URL: https://issues.apache.org/jira/browse/RYA-443
> Project: Rya
>  Issue Type: Task
>Reporter: Andrew Smith
>Assignee: Kevin Chilton
>Priority: Major
>
> We need an application that watches the QueryChangeLog to see when the 
> isActive state of queries changes and then reacts to the state change. If 
> isActive goes to true, the system must start processing the query. If it is 
> false, then it must stop processing the query. This application needs to 
> start when the host machine starts. We plan to support CentOS 7.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168278465
  
--- Diff: 
extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
 ---
@@ -107,12 +112,11 @@ private String formatQueries(final Set 
queries) {
 sb.append("Queries in Rya Streams:\n");
 
sb.append("-\n");
 queries.forEach(query -> {
-sb.append("ID: ");
-sb.append(query.getQueryId());
-sb.append("\t\t");
-sb.append("Query: ");
-sb.append(query.getSparql());
-sb.append("\n");
+sb.append("ID: ").append(query.getQueryId())
+.append("")
+.append("Is Active: ").append(query.isActive())
+.append( query.isActive() ? " " : "" )
+.append("Query: ").append(query.getSparql()).append("\n");
--- End diff --

Should b e sanitized before it gets here.


---


[jira] [Commented] (RYA-443) Implement a single node query manager

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364630#comment-16364630
 ] 

ASF GitHub Bot commented on RYA-443:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168278306
  
--- Diff: 
extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
 ---
@@ -85,4 +85,19 @@ public boolean equals(final Object o) {
 }
 return false;
 }
+
+@Override
+public String toString() {
+final StringBuilder sb = new StringBuilder();
+sb.append("ID: ");
+sb.append(getQueryId().toString() + "\n");
+sb.append("Query: ");
+sb.append(getSparql() + "\n");
--- End diff --

Nope.


> Implement a single node query manager
> -
>
> Key: RYA-443
> URL: https://issues.apache.org/jira/browse/RYA-443
> Project: Rya
>  Issue Type: Task
>Reporter: Andrew Smith
>Assignee: Kevin Chilton
>Priority: Major
>
> We need an application that watches the QueryChangeLog to see when the 
> isActive state of queries changes and then reacts to the state change. If 
> isActive goes to true, the system must start processing the query. If it is 
> false, then it must stop processing the query. This application needs to 
> start when the host machine starts. We plan to support CentOS 7.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168278306
  
--- Diff: 
extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
 ---
@@ -85,4 +85,19 @@ public boolean equals(final Object o) {
 }
 return false;
 }
+
+@Override
+public String toString() {
+final StringBuilder sb = new StringBuilder();
+sb.append("ID: ");
+sb.append(getQueryId().toString() + "\n");
+sb.append("Query: ");
+sb.append(getSparql() + "\n");
--- End diff --

Nope.


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168278274
  
--- Diff: 
extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java
 ---
@@ -107,12 +112,11 @@ private String formatQueries(final Set 
queries) {
 sb.append("Queries in Rya Streams:\n");
 
sb.append("-\n");
 queries.forEach(query -> {
-sb.append("ID: ");
-sb.append(query.getQueryId());
-sb.append("\t\t");
-sb.append("Query: ");
-sb.append(query.getSparql());
-sb.append("\n");
+sb.append("ID: ").append(query.getQueryId())
+.append("")
+.append("Is Active: ").append(query.isActive())
+.append( query.isActive() ? " " : "" )
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168274700
  
--- Diff: 
extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java
 ---
@@ -243,8 +274,68 @@ private void updateCache() {
 it.close();
 }
 } catch (final QueryChangeLogException e) {
-LOG.error("Could not close the " + 
CloseableIteration.class.getName(), e);
+log.error("Could not close the " + 
CloseableIteration.class.getName(), e);
 }
+
+log.trace("updateCache() - Exit");
+}
+}
+
+@Override
+protected void runOneIteration() throws Exception {
+log.trace("runOneIteration() - Enter");
+lock.lock();
+try {
+updateCache();
+} finally {
+lock.unlock();
+}
+log.trace("runOneIteration() - Exit");
--- End diff --

Yep, done.


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168277599
  
--- Diff: 
extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java
 ---
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Utilities that are useful for interacting with {@link Thread}s while 
testing.
+ */
+public class ThreadUtil {
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168277168
  
--- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java
 ---
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.concurrent.TimeUnit;
+
+import javax.xml.bind.JAXBException;
+
+import org.apache.commons.daemon.Daemon;
+import org.apache.commons.daemon.DaemonContext;
+import org.apache.commons.daemon.DaemonInitException;
+import org.apache.rya.streams.kafka.KafkaStreamsFactory;
+import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory;
+import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic;
+import org.apache.rya.streams.querymanager.kafka.KafkaQueryChangeLogSource;
+import org.apache.rya.streams.querymanager.kafka.LocalQueryExecutor;
+import org.apache.rya.streams.querymanager.xml.Kafka;
+import org.apache.rya.streams.querymanager.xml.QueryManagerConfig;
+import 
org.apache.rya.streams.querymanager.xml.QueryManagerConfig.PerformanceTunning.QueryChanngeLogDiscoveryPeriod;
+import 
org.apache.rya.streams.querymanager.xml.QueryManagerConfigUnmarshaller;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import 
com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * JSVC integration code for a {@link QueryManager} to be used as a 
non-Windows daemon.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryManagerDaemon implements Daemon {
+
+private static final Logger log = 
LoggerFactory.getLogger(QueryManagerDaemon.class);
+
+/**
+ * The default configuration file's path for the application.
+ */
+private static final Path DEFAULT_CONFIGURATION_PATH = 
Paths.get("config/configuration.xml");
+
+/**
+ * Command line parameters that are used by all commands that interact 
with Kafka.
+ */
+class DaemonParameters {
+@Parameter(names = {"--config", "-c"}, required = false, 
description = "The path to the application's configuration file.")
+public String config;
+}
+
+private QueryManager manager = null;
+
+@Override
+public void init(final DaemonContext context) throws 
DaemonInitException, Exception {
+requireNonNull(context);
+
+// Parse the command line arguments for the configuration file to 
use.
+final String[] args = context.getArguments();
+final DaemonParameters params = new DaemonParameters();
+try {
+new JCommander(params).parse(args);
+} catch(final ParameterException e) {
+throw new DaemonInitException("Unable to parse the command 
line arguments.", e);
+}
+final Path configFile = params.config != null ? 
Paths.get(params.config) : DEFAULT_CONFIGURATION_PATH;
+log.info("Loading the following configuration file: " + 
configFile);
+
+// Unmarshall the configuration file into an object.
+final QueryManagerConfig config;
+try(InputStream stream = Files.newInputStream(configFile)) {
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168277066
  
--- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
 ---
@@ -0,0 +1,884 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChangeLogListener;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import 
org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
+import 
org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import 
com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A service for managing {@link StreamsQuery} running on a Rya Streams 
system.
+ * 
+ * Only one QueryManager needs to be running to manage any number of rya
+ * instances/rya streams instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryManager extends AbstractService {
+private static final Logger log = 
LoggerFactory.getLogger(QueryManager.class);
+
+/**
+ * The source of {@link QueryChangeLog}s. Each log discovered is bound 
to a specific
+ * Rya instnace.
+ */
+private final QueryChangeLogSource changeLogSource;
+
+/**
+ * The engine that is responsible for executing {@link StreamsQuery}s.
+ */
+private final QueryExecutor queryExecutor;
+
+/**
+ * How long blocking operations will be attempted before potentially 
trying again.
+ */
+private final long blockingValue;
+
+/**
+ * The units for {@link #blockingValue}.
+ */
+private final TimeUnit blockingUnits;
+
+/**
+ * Used to inform threads that the application is shutting down, so 
they must stop work.
+ */
+private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+/**
+ * This thread pool manages the two thread used to work the {@link 
LogEvent}s
+ * and the {@link QueryEvent}s.
+ */
+private final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+
+/**
+ * Creates a new {@link QueryManager}.
+ *
+ * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not 
null)
+ * @param source - The {@link QueryChangeLogSource} of 
QueryChangeLogs. (not null)
+ * @param blockingValue - How long blocking operations will try before 
looping. (> 0)
+ * @param blockingUnits - The units of the {@code blockingValue}. (not 
null)
+ */
+

[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168276972
  
--- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
 ---
@@ -0,0 +1,884 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChangeLogListener;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import 
org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
+import 
org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import 
com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A service for managing {@link StreamsQuery} running on a Rya Streams 
system.
+ * 
+ * Only one QueryManager needs to be running to manage any number of rya
+ * instances/rya streams instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryManager extends AbstractService {
+private static final Logger log = 
LoggerFactory.getLogger(QueryManager.class);
+
+/**
+ * The source of {@link QueryChangeLog}s. Each log discovered is bound 
to a specific
+ * Rya instnace.
+ */
+private final QueryChangeLogSource changeLogSource;
+
+/**
+ * The engine that is responsible for executing {@link StreamsQuery}s.
+ */
+private final QueryExecutor queryExecutor;
+
+/**
+ * How long blocking operations will be attempted before potentially 
trying again.
+ */
+private final long blockingValue;
+
+/**
+ * The units for {@link #blockingValue}.
+ */
+private final TimeUnit blockingUnits;
+
+/**
+ * Used to inform threads that the application is shutting down, so 
they must stop work.
+ */
+private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+/**
+ * This thread pool manages the two thread used to work the {@link 
LogEvent}s
+ * and the {@link QueryEvent}s.
+ */
+private final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+
+/**
+ * Creates a new {@link QueryManager}.
+ *
+ * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not 
null)
+ * @param source - The {@link QueryChangeLogSource} of 
QueryChangeLogs. (not null)
+ * @param blockingValue - How long blocking operations will try before 
looping. (> 0)
+ * @param blockingUnits - The units of the {@code blockingValue}. (not 
null)
+ */
+

[jira] [Commented] (RYA-443) Implement a single node query manager

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364615#comment-16364615
 ] 

ASF GitHub Bot commented on RYA-443:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168276251
  
--- Diff: 
extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
 ---
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.kafka.topology.TopologyBuilderFactory;
+import 
org.apache.rya.streams.kafka.topology.TopologyBuilderFactory.TopologyBuilderException;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.openrdf.query.MalformedQueryException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Creates {@link KafkaStreams} objects that are able to process {@link 
StreamsQuery}s
+ * using a single thread of execution starting from the earliest point in 
within the
+ * input topic. The Application ID used by the client is based on the 
Query ID of the
+ * query that is being executed so that this job may resume where it left 
off if it
+ * is stopped.
+ */
+@DefaultAnnotation(NonNull.class)
+public class SingleThreadKafkaStreamsFactory implements 
KafkaStreamsFactory {
+
+private final TopologyBuilderFactory topologyFactory = new 
TopologyFactory();
+
+private final String bootstrapServersConfig;
+
+/**
+ * Constructs an instance of {@link SingleThreadKafkaStreamsFactory}.
+ *
+ * @param bootstrapServersConfig - Configures which Kafka cluster the 
jobs will interact with. (not null)
+ */
+public SingleThreadKafkaStreamsFactory(final String 
bootstrapServersConfig) {
+this.bootstrapServersConfig = 
requireNonNull(bootstrapServersConfig);
+}
+
+@Override
+public KafkaStreams make(final String ryaInstance, final StreamsQuery 
query) throws KafkaStreamsFactoryException {
+requireNonNull(ryaInstance);
+requireNonNull(query);
+
+// Setup the Kafka Stream program.
+final Properties streamsProps = new Properties();
+
+// Configure the Kafka servers that will be talked to.
+streamsProps.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServersConfig);
+
+// Use the Query ID as the Application ID to ensure we resume 
where we left off the last time this command was run.
+streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"RyaStreams-Query-" + query.getQueryId());
+
+// Always start at the beginning of the input topic.
+streamsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+
+// Setup the topology that processes the Query.
+final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
+final String resultsTopic = 
KafkaTopics.queryResultsTopic(query.getQueryId());
+
+try {
+final TopologyBuilder topologyBuilder = 
topologyFactory.build(query.getSparql(), statementsTopic, resultsTopic, new 
RandomUUIDFactory());
+return new KafkaStreams(topologyBuilder, new 
StreamsConfig(streamsProps));
+} catch (MalformedQueryException | TopologyBuilderException e) {
--- End diff --
 

[jira] [Commented] (RYA-443) Implement a single node query manager

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364617#comment-16364617
 ] 

ASF GitHub Bot commented on RYA-443:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168276546
  
--- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
 ---
@@ -0,0 +1,884 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChangeLogListener;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import 
org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
+import 
org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import 
com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A service for managing {@link StreamsQuery} running on a Rya Streams 
system.
+ * 
+ * Only one QueryManager needs to be running to manage any number of rya
+ * instances/rya streams instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryManager extends AbstractService {
+private static final Logger log = 
LoggerFactory.getLogger(QueryManager.class);
+
+/**
+ * The source of {@link QueryChangeLog}s. Each log discovered is bound 
to a specific
+ * Rya instnace.
+ */
+private final QueryChangeLogSource changeLogSource;
+
+/**
+ * The engine that is responsible for executing {@link StreamsQuery}s.
+ */
+private final QueryExecutor queryExecutor;
+
+/**
+ * How long blocking operations will be attempted before potentially 
trying again.
+ */
+private final long blockingValue;
+
+/**
+ * The units for {@link #blockingValue}.
+ */
+private final TimeUnit blockingUnits;
+
+/**
+ * Used to inform threads that the application is shutting down, so 
they must stop work.
+ */
+private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+/**
+ * This thread pool manages the two thread used to work the {@link 
LogEvent}s
+ * and the {@link QueryEvent}s.
+ */
+private final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+
+/**
+ * Creates a new {@link QueryManager}.
+ *
+ * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not 
null)
+ * @param source - The {@link QueryChangeLogSource} of 

[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168276451
  
--- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
 ---
@@ -0,0 +1,884 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChangeLogListener;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import 
org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
+import 
org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import 
com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A service for managing {@link StreamsQuery} running on a Rya Streams 
system.
+ * 
+ * Only one QueryManager needs to be running to manage any number of rya
+ * instances/rya streams instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryManager extends AbstractService {
+private static final Logger log = 
LoggerFactory.getLogger(QueryManager.class);
+
+/**
+ * The source of {@link QueryChangeLog}s. Each log discovered is bound 
to a specific
+ * Rya instnace.
+ */
+private final QueryChangeLogSource changeLogSource;
+
+/**
+ * The engine that is responsible for executing {@link StreamsQuery}s.
+ */
+private final QueryExecutor queryExecutor;
+
+/**
+ * How long blocking operations will be attempted before potentially 
trying again.
+ */
+private final long blockingValue;
+
+/**
+ * The units for {@link #blockingValue}.
+ */
+private final TimeUnit blockingUnits;
+
+/**
+ * Used to inform threads that the application is shutting down, so 
they must stop work.
+ */
+private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+/**
+ * This thread pool manages the two thread used to work the {@link 
LogEvent}s
+ * and the {@link QueryEvent}s.
+ */
+private final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+
+/**
+ * Creates a new {@link QueryManager}.
+ *
+ * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not 
null)
+ * @param source - The {@link QueryChangeLogSource} of 
QueryChangeLogs. (not null)
+ * @param blockingValue - How long blocking operations will try before 
looping. (> 0)
+ * @param blockingUnits - The units of the {@code blockingValue}. (not 
null)
+ */
+

[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168276645
  
--- Diff: 
extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java
 ---
@@ -0,0 +1,884 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.querymanager;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.InMemoryQueryRepository;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+import org.apache.rya.streams.api.queries.QueryChangeLogListener;
+import org.apache.rya.streams.api.queries.QueryRepository;
+import 
org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener;
+import 
org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import 
com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
+import com.google.common.util.concurrent.AbstractService;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A service for managing {@link StreamsQuery} running on a Rya Streams 
system.
+ * 
+ * Only one QueryManager needs to be running to manage any number of rya
+ * instances/rya streams instances.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryManager extends AbstractService {
+private static final Logger log = 
LoggerFactory.getLogger(QueryManager.class);
+
+/**
+ * The source of {@link QueryChangeLog}s. Each log discovered is bound 
to a specific
+ * Rya instnace.
+ */
+private final QueryChangeLogSource changeLogSource;
+
+/**
+ * The engine that is responsible for executing {@link StreamsQuery}s.
+ */
+private final QueryExecutor queryExecutor;
+
+/**
+ * How long blocking operations will be attempted before potentially 
trying again.
+ */
+private final long blockingValue;
+
+/**
+ * The units for {@link #blockingValue}.
+ */
+private final TimeUnit blockingUnits;
+
+/**
+ * Used to inform threads that the application is shutting down, so 
they must stop work.
+ */
+private final AtomicBoolean shutdownSignal = new AtomicBoolean(false);
+
+/**
+ * This thread pool manages the two thread used to work the {@link 
LogEvent}s
+ * and the {@link QueryEvent}s.
+ */
+private final ExecutorService executor = 
Executors.newFixedThreadPool(2);
+
+/**
+ * Creates a new {@link QueryManager}.
+ *
+ * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not 
null)
+ * @param source - The {@link QueryChangeLogSource} of 
QueryChangeLogs. (not null)
+ * @param blockingValue - How long blocking operations will try before 
looping. (> 0)
+ * @param blockingUnits - The units of the {@code blockingValue}. (not 
null)
+ */
+

[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168276251
  
--- Diff: 
extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java
 ---
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Properties;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TopologyBuilder;
+import org.apache.rya.api.function.projection.RandomUUIDFactory;
+import org.apache.rya.streams.api.entity.StreamsQuery;
+import org.apache.rya.streams.kafka.topology.TopologyBuilderFactory;
+import 
org.apache.rya.streams.kafka.topology.TopologyBuilderFactory.TopologyBuilderException;
+import org.apache.rya.streams.kafka.topology.TopologyFactory;
+import org.openrdf.query.MalformedQueryException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Creates {@link KafkaStreams} objects that are able to process {@link 
StreamsQuery}s
+ * using a single thread of execution starting from the earliest point in 
within the
+ * input topic. The Application ID used by the client is based on the 
Query ID of the
+ * query that is being executed so that this job may resume where it left 
off if it
+ * is stopped.
+ */
+@DefaultAnnotation(NonNull.class)
+public class SingleThreadKafkaStreamsFactory implements 
KafkaStreamsFactory {
+
+private final TopologyBuilderFactory topologyFactory = new 
TopologyFactory();
+
+private final String bootstrapServersConfig;
+
+/**
+ * Constructs an instance of {@link SingleThreadKafkaStreamsFactory}.
+ *
+ * @param bootstrapServersConfig - Configures which Kafka cluster the 
jobs will interact with. (not null)
+ */
+public SingleThreadKafkaStreamsFactory(final String 
bootstrapServersConfig) {
+this.bootstrapServersConfig = 
requireNonNull(bootstrapServersConfig);
+}
+
+@Override
+public KafkaStreams make(final String ryaInstance, final StreamsQuery 
query) throws KafkaStreamsFactoryException {
+requireNonNull(ryaInstance);
+requireNonNull(query);
+
+// Setup the Kafka Stream program.
+final Properties streamsProps = new Properties();
+
+// Configure the Kafka servers that will be talked to.
+streamsProps.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServersConfig);
+
+// Use the Query ID as the Application ID to ensure we resume 
where we left off the last time this command was run.
+streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"RyaStreams-Query-" + query.getQueryId());
+
+// Always start at the beginning of the input topic.
+streamsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+
+// Setup the topology that processes the Query.
+final String statementsTopic = 
KafkaTopics.statementsTopic(ryaInstance);
+final String resultsTopic = 
KafkaTopics.queryResultsTopic(query.getQueryId());
+
+try {
+final TopologyBuilder topologyBuilder = 
topologyFactory.build(query.getSparql(), statementsTopic, resultsTopic, new 
RandomUUIDFactory());
+return new KafkaStreams(topologyBuilder, new 
StreamsConfig(streamsProps));
+} catch (MalformedQueryException | TopologyBuilderException e) {
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168276101
  
--- Diff: 
extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java
 ---
@@ -48,7 +51,28 @@
  */
 public static String queryChangeLogTopic(final String ryaInstance) {
 requireNonNull(ryaInstance);
-return ryaInstance + "-QueryChangeLog";
+return ryaInstance + QUERY_CHANGE_LOG_TOPIC_SUFFIX;
+}
+
+/**
+ * Get the Rya instance name from a Kafka topic name that has been 
used for a {@link QueryChnageLog}.
--- End diff --

Done.


---


[jira] [Commented] (RYA-443) Implement a single node query manager

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364612#comment-16364612
 ] 

ASF GitHub Bot commented on RYA-443:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168275881
  
--- Diff: 
extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
 ---
@@ -85,11 +92,16 @@ public void initializedWithPopulatedChangeLog() throws 
Exception {
 queries.delete( deletedMeId );
 
 // Create a new totally in memory QueryRepository.
-try(final QueryRepository initializedQueries = new 
InMemoryQueryRepository( changeLog )) {
+final QueryRepository initializedQueries = new 
InMemoryQueryRepository( changeLog, SCHEDULE );
+try {
 // Listing the queries should work using an initialized 
change log.
 final Set stored = initializedQueries.list();
 assertEquals(expected, stored);
+} finally {
+queries.stop();
--- End diff --

Removed.


> Implement a single node query manager
> -
>
> Key: RYA-443
> URL: https://issues.apache.org/jira/browse/RYA-443
> Project: Rya
>  Issue Type: Task
>Reporter: Andrew Smith
>Assignee: Kevin Chilton
>Priority: Major
>
> We need an application that watches the QueryChangeLog to see when the 
> isActive state of queries changes and then reacts to the state change. If 
> isActive goes to true, the system must start processing the query. If it is 
> false, then it must stop processing the query. This application needs to 
> start when the host machine starts. We plan to support CentOS 7.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (RYA-443) Implement a single node query manager

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364610#comment-16364610
 ] 

ASF GitHub Bot commented on RYA-443:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168275659
  
--- Diff: 
extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
 ---
@@ -110,6 +110,16 @@ public boolean equals(final Object o) {
 return false;
 }
 
+@Override
+public String toString() {
+return "QueryChange: {" +
+   "Query ID: " + queryId + ",\n" +
+   "Change Type: " + changeType + ",\n" +
+   "Is Active: " + isActive + ",\n" +
+   "SPARQL: " + sparql + "\n" +
--- End diff --

I'm going to assume it is sanitized by the time it reaches this line of 
code.


> Implement a single node query manager
> -
>
> Key: RYA-443
> URL: https://issues.apache.org/jira/browse/RYA-443
> Project: Rya
>  Issue Type: Task
>Reporter: Andrew Smith
>Assignee: Kevin Chilton
>Priority: Major
>
> We need an application that watches the QueryChangeLog to see when the 
> isActive state of queries changes and then reacts to the state change. If 
> isActive goes to true, the system must start processing the query. If it is 
> false, then it must stop processing the query. This application needs to 
> start when the host machine starts. We plan to support CentOS 7.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168275881
  
--- Diff: 
extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java
 ---
@@ -85,11 +92,16 @@ public void initializedWithPopulatedChangeLog() throws 
Exception {
 queries.delete( deletedMeId );
 
 // Create a new totally in memory QueryRepository.
-try(final QueryRepository initializedQueries = new 
InMemoryQueryRepository( changeLog )) {
+final QueryRepository initializedQueries = new 
InMemoryQueryRepository( changeLog, SCHEDULE );
+try {
 // Listing the queries should work using an initialized 
change log.
 final Set stored = initializedQueries.list();
 assertEquals(expected, stored);
+} finally {
+queries.stop();
--- End diff --

Removed.


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168275659
  
--- Diff: 
extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
 ---
@@ -110,6 +110,16 @@ public boolean equals(final Object o) {
 return false;
 }
 
+@Override
+public String toString() {
+return "QueryChange: {" +
+   "Query ID: " + queryId + ",\n" +
+   "Change Type: " + changeType + ",\n" +
+   "Is Active: " + isActive + ",\n" +
+   "SPARQL: " + sparql + "\n" +
--- End diff --

I'm going to assume it is sanitized by the time it reaches this line of 
code.


---


[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...

2018-02-14 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/272#discussion_r168274631
  
--- Diff: 
extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java
 ---
@@ -85,4 +85,19 @@ public boolean equals(final Object o) {
 }
 return false;
 }
+
+@Override
+public String toString() {
+final StringBuilder sb = new StringBuilder();
+sb.append("ID: ");
+sb.append(getQueryId().toString() + "\n");
+sb.append("Query: ");
+sb.append(getSparql() + "\n");
+sb.append("Is ");
+if (!isActive) {
+sb.append(" Not ");
--- End diff --

Done.


---


[GitHub] incubator-rya issue #274: Added giraph profile to resolve an incompatible li...

2018-02-14 Thread asfgit
Github user asfgit commented on the issue:

https://github.com/apache/incubator-rya/pull/274
  

Refer to this link for build results (access rights to CI server needed): 

https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/690/



---


[GitHub] incubator-rya pull request #274: Added giraph profile to resolve an incompat...

2018-02-14 Thread jdasch
GitHub user jdasch opened a pull request:

https://github.com/apache/incubator-rya/pull/274

Added giraph profile to resolve an incompatible license issue.


## Description
Added a profile to make rya.giraph an optional module in the maven build.

### Tests
N/A

### Checklist
- [ ] Code Review
- [ ] Squash Commits

 People To Review


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jdasch/incubator-rya candidate/giraph-profile

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-rya/pull/274.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #274


commit 5cdfae209d176585b40b7cedede4808ee5de9a22
Author: jdasch 
Date:   2018-02-13T21:49:10Z

Added giraph profile to resolve an incompatible license issue.




---