[jira] [Commented] (RYA-443) Implement a single node query manager
[ https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364768#comment-16364768 ] ASF GitHub Bot commented on RYA-443: Github user asfgit commented on the issue: https://github.com/apache/incubator-rya/pull/272 Refer to this link for build results (access rights to CI server needed): https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/691/Build result: FAILURE[...truncated 3.03 MB...][INFO] Apache Rya Spark Support ... SKIPPED[INFO] Apache Rya Web Projects SKIPPED[INFO] Apache Rya Web Implementation .. SKIPPED[INFO] [INFO] BUILD FAILURE[INFO] [INFO] Total time: 37:36 min[INFO] Finished at: 2018-02-14T20:56:07+00:00[INFO] Final Memory: 535M/2904M[INFO] [ERROR] Failed to execute goal org.codehaus.mojo:rpm-maven-plugin:2.1.5:attached-rpm (create-rpm-distribution) on project rya.streams.query-manager: Unable to query for default vendor from RPM: Error while executing process. Cannot run program "rpm": error=2, No such file or directory -> [Help 1][ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.[ERROR] Re-run Maven using the -X switch to enable full debug logging.[ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles:[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException[ERROR] [ERROR] After correcting the problems, you can resume the build with the command[ERROR] mvn -rf :rya.streams.query-managerchannel stoppedSetting status of 6428ca81a5cb0ef5bd2e762452f1a6e46dff88fc to FAILURE with url https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/691/ and message: 'FAILURE 'Using context: Jenkins: clean package -Pgeoindexing > Implement a single node query manager > - > > Key: RYA-443 > URL: https://issues.apache.org/jira/browse/RYA-443 > Project: Rya > Issue Type: Task >Reporter: Andrew Smith >Assignee: Kevin Chilton >Priority: Major > > We need an application that watches the QueryChangeLog to see when the > isActive state of queries changes and then reacts to the state change. If > isActive goes to true, the system must start processing the query. If it is > false, then it must stop processing the query. This application needs to > start when the host machine starts. We plan to support CentOS 7. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] incubator-rya issue #272: RYA-443 Rya Streams Query Manager daemon program.
Github user asfgit commented on the issue: https://github.com/apache/incubator-rya/pull/272 Refer to this link for build results (access rights to CI server needed): https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/691/Build result: FAILURE[...truncated 3.03 MB...][INFO] Apache Rya Spark Support ... SKIPPED[INFO] Apache Rya Web Projects SKIPPED[INFO] Apache Rya Web Implementation .. SKIPPED[INFO] [INFO] BUILD FAILURE[INFO] [INFO] Total time: 37:36 min[INFO] Finished at: 2018-02-14T20:56:07+00:00[INFO] Final Memory: 535M/2904M[INFO] [ERROR] Failed to execute goal org.codehaus.mojo:rpm-maven-plugin:2.1.5:attached-rpm (create-rpm-distribution) on project rya.streams.query-manager: Unable to query for default vendor from RPM: Error while executing process. Cannot run program "rpm": error=2, No such file or directory -> [Help 1][ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.[ERROR] Re-run Maven using the -X switch to enable full debug logging.[ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles:[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException[ERROR] [ERROR] After correcting the problems, you can resume the build with the command[ERROR] mvn -rf :rya.streams.query-managerchannel stoppedSetting status of 6428ca81a5cb0ef5bd2e762452f1a6e46dff88fc to FAILURE with url https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/691/ and message: 'FAILURE 'Using context: Jenkins: clean package -Pgeoindexing ---
[jira] [Commented] (RYA-443) Implement a single node query manager
[ https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364619#comment-16364619 ] ASF GitHub Bot commented on RYA-443: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168276645 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java --- @@ -0,0 +1,884 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.ChangeLogEntry; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryChangeLogListener; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; +import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A service for managing {@link StreamsQuery} running on a Rya Streams system. + * + * Only one QueryManager needs to be running to manage any number of rya + * instances/rya streams instances. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManager extends AbstractService { +private static final Logger log = LoggerFactory.getLogger(QueryManager.class); + +/** + * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific + * Rya instnace. + */ +private final QueryChangeLogSource changeLogSource; + +/** + * The engine that is responsible for executing {@link StreamsQuery}s. + */ +private final QueryExecutor queryExecutor; + +/** + * How long blocking operations will be attempted before potentially trying again. + */ +private final long blockingValue; + +/** + * The units for {@link #blockingValue}. + */ +private final TimeUnit blockingUnits; + +/** + * Used to inform threads that the application is shutting down, so they must stop work. + */ +private final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + +/** + * This thread pool manages the two thread used to work the {@link LogEvent}s + * and the {@link QueryEvent}s. + */ +private final ExecutorService executor = Executors.newFixedThreadPool(2); + +/** + * Creates a new {@link QueryManager}. + * + * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null) + * @param source - The {@link QueryChangeLogSource} of
[jira] [Commented] (RYA-443) Implement a single node query manager
[ https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364607#comment-16364607 ] ASF GitHub Bot commented on RYA-443: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168274700 --- Diff: extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java --- @@ -243,8 +274,68 @@ private void updateCache() { it.close(); } } catch (final QueryChangeLogException e) { -LOG.error("Could not close the " + CloseableIteration.class.getName(), e); +log.error("Could not close the " + CloseableIteration.class.getName(), e); } + +log.trace("updateCache() - Exit"); +} +} + +@Override +protected void runOneIteration() throws Exception { +log.trace("runOneIteration() - Enter"); +lock.lock(); +try { +updateCache(); +} finally { +lock.unlock(); +} +log.trace("runOneIteration() - Exit"); --- End diff -- Yep, done. > Implement a single node query manager > - > > Key: RYA-443 > URL: https://issues.apache.org/jira/browse/RYA-443 > Project: Rya > Issue Type: Task >Reporter: Andrew Smith >Assignee: Kevin Chilton >Priority: Major > > We need an application that watches the QueryChangeLog to see when the > isActive state of queries changes and then reacts to the state change. If > isActive goes to true, the system must start processing the query. If it is > false, then it must stop processing the query. This application needs to > start when the host machine starts. We plan to support CentOS 7. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-443) Implement a single node query manager
[ https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364633#comment-16364633 ] ASF GitHub Bot commented on RYA-443: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168278465 --- Diff: extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java --- @@ -107,12 +112,11 @@ private String formatQueries(final Set queries) { sb.append("Queries in Rya Streams:\n"); sb.append("-\n"); queries.forEach(query -> { -sb.append("ID: "); -sb.append(query.getQueryId()); -sb.append("\t\t"); -sb.append("Query: "); -sb.append(query.getSparql()); -sb.append("\n"); +sb.append("ID: ").append(query.getQueryId()) +.append("") +.append("Is Active: ").append(query.isActive()) +.append( query.isActive() ? " " : "" ) +.append("Query: ").append(query.getSparql()).append("\n"); --- End diff -- Should b e sanitized before it gets here. > Implement a single node query manager > - > > Key: RYA-443 > URL: https://issues.apache.org/jira/browse/RYA-443 > Project: Rya > Issue Type: Task >Reporter: Andrew Smith >Assignee: Kevin Chilton >Priority: Major > > We need an application that watches the QueryChangeLog to see when the > isActive state of queries changes and then reacts to the state change. If > isActive goes to true, the system must start processing the query. If it is > false, then it must stop processing the query. This application needs to > start when the host machine starts. We plan to support CentOS 7. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-443) Implement a single node query manager
[ https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364623#comment-16364623 ] ASF GitHub Bot commented on RYA-443: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168277168 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java --- @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.JAXBException; + +import org.apache.commons.daemon.Daemon; +import org.apache.commons.daemon.DaemonContext; +import org.apache.commons.daemon.DaemonInitException; +import org.apache.rya.streams.kafka.KafkaStreamsFactory; +import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory; +import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic; +import org.apache.rya.streams.querymanager.kafka.KafkaQueryChangeLogSource; +import org.apache.rya.streams.querymanager.kafka.LocalQueryExecutor; +import org.apache.rya.streams.querymanager.xml.Kafka; +import org.apache.rya.streams.querymanager.xml.QueryManagerConfig; +import org.apache.rya.streams.querymanager.xml.QueryManagerConfig.PerformanceTunning.QueryChanngeLogDiscoveryPeriod; +import org.apache.rya.streams.querymanager.xml.QueryManagerConfigUnmarshaller; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xml.sax.SAXException; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * JSVC integration code for a {@link QueryManager} to be used as a non-Windows daemon. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManagerDaemon implements Daemon { + +private static final Logger log = LoggerFactory.getLogger(QueryManagerDaemon.class); + +/** + * The default configuration file's path for the application. + */ +private static final Path DEFAULT_CONFIGURATION_PATH = Paths.get("config/configuration.xml"); + +/** + * Command line parameters that are used by all commands that interact with Kafka. + */ +class DaemonParameters { +@Parameter(names = {"--config", "-c"}, required = false, description = "The path to the application's configuration file.") +public String config; +} + +private QueryManager manager = null; + +@Override +public void init(final DaemonContext context) throws DaemonInitException, Exception { +requireNonNull(context); + +// Parse the command line arguments for the configuration file to use. +final String[] args = context.getArguments(); +final DaemonParameters params = new DaemonParameters(); +try { +new JCommander(params).parse(args); +} catch(final ParameterException e) { +throw new DaemonInitException("Unable to parse the command line arguments.", e); +} +final Path configFile = params.config != null ? Paths.get(params.config) : DEFAULT_CONFIGURATION_PATH; +log.info("Loading the following configuration file: " + configFile); + +// Unmarshall the configuration file into an object. +final QueryManagerConfig config; +try(InputStream stream =
[jira] [Commented] (RYA-443) Implement a single node query manager
[ https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364616#comment-16364616 ] ASF GitHub Bot commented on RYA-443: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168276451 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java --- @@ -0,0 +1,884 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.ChangeLogEntry; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryChangeLogListener; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; +import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A service for managing {@link StreamsQuery} running on a Rya Streams system. + * + * Only one QueryManager needs to be running to manage any number of rya + * instances/rya streams instances. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManager extends AbstractService { +private static final Logger log = LoggerFactory.getLogger(QueryManager.class); + +/** + * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific + * Rya instnace. + */ +private final QueryChangeLogSource changeLogSource; + +/** + * The engine that is responsible for executing {@link StreamsQuery}s. + */ +private final QueryExecutor queryExecutor; + +/** + * How long blocking operations will be attempted before potentially trying again. + */ +private final long blockingValue; + +/** + * The units for {@link #blockingValue}. + */ +private final TimeUnit blockingUnits; + +/** + * Used to inform threads that the application is shutting down, so they must stop work. + */ +private final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + +/** + * This thread pool manages the two thread used to work the {@link LogEvent}s + * and the {@link QueryEvent}s. + */ +private final ExecutorService executor = Executors.newFixedThreadPool(2); + +/** + * Creates a new {@link QueryManager}. + * + * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null) + * @param source - The {@link QueryChangeLogSource} of
[jira] [Commented] (RYA-443) Implement a single node query manager
[ https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364613#comment-16364613 ] ASF GitHub Bot commented on RYA-443: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168276101 --- Diff: extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java --- @@ -48,7 +51,28 @@ */ public static String queryChangeLogTopic(final String ryaInstance) { requireNonNull(ryaInstance); -return ryaInstance + "-QueryChangeLog"; +return ryaInstance + QUERY_CHANGE_LOG_TOPIC_SUFFIX; +} + +/** + * Get the Rya instance name from a Kafka topic name that has been used for a {@link QueryChnageLog}. --- End diff -- Done. > Implement a single node query manager > - > > Key: RYA-443 > URL: https://issues.apache.org/jira/browse/RYA-443 > Project: Rya > Issue Type: Task >Reporter: Andrew Smith >Assignee: Kevin Chilton >Priority: Major > > We need an application that watches the QueryChangeLog to see when the > isActive state of queries changes and then reacts to the state change. If > isActive goes to true, the system must start processing the query. If it is > false, then it must stop processing the query. This application needs to > start when the host machine starts. We plan to support CentOS 7. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-443) Implement a single node query manager
[ https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364629#comment-16364629 ] ASF GitHub Bot commented on RYA-443: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168278274 --- Diff: extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java --- @@ -107,12 +112,11 @@ private String formatQueries(final Set queries) { sb.append("Queries in Rya Streams:\n"); sb.append("-\n"); queries.forEach(query -> { -sb.append("ID: "); -sb.append(query.getQueryId()); -sb.append("\t\t"); -sb.append("Query: "); -sb.append(query.getSparql()); -sb.append("\n"); +sb.append("ID: ").append(query.getQueryId()) +.append("") +.append("Is Active: ").append(query.isActive()) +.append( query.isActive() ? " " : "" ) --- End diff -- Done. > Implement a single node query manager > - > > Key: RYA-443 > URL: https://issues.apache.org/jira/browse/RYA-443 > Project: Rya > Issue Type: Task >Reporter: Andrew Smith >Assignee: Kevin Chilton >Priority: Major > > We need an application that watches the QueryChangeLog to see when the > isActive state of queries changes and then reacts to the state change. If > isActive goes to true, the system must start processing the query. If it is > false, then it must stop processing the query. This application needs to > start when the host machine starts. We plan to support CentOS 7. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168278465 --- Diff: extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java --- @@ -107,12 +112,11 @@ private String formatQueries(final Set queries) { sb.append("Queries in Rya Streams:\n"); sb.append("-\n"); queries.forEach(query -> { -sb.append("ID: "); -sb.append(query.getQueryId()); -sb.append("\t\t"); -sb.append("Query: "); -sb.append(query.getSparql()); -sb.append("\n"); +sb.append("ID: ").append(query.getQueryId()) +.append("") +.append("Is Active: ").append(query.isActive()) +.append( query.isActive() ? " " : "" ) +.append("Query: ").append(query.getSparql()).append("\n"); --- End diff -- Should b e sanitized before it gets here. ---
[jira] [Commented] (RYA-443) Implement a single node query manager
[ https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364630#comment-16364630 ] ASF GitHub Bot commented on RYA-443: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168278306 --- Diff: extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java --- @@ -85,4 +85,19 @@ public boolean equals(final Object o) { } return false; } + +@Override +public String toString() { +final StringBuilder sb = new StringBuilder(); +sb.append("ID: "); +sb.append(getQueryId().toString() + "\n"); +sb.append("Query: "); +sb.append(getSparql() + "\n"); --- End diff -- Nope. > Implement a single node query manager > - > > Key: RYA-443 > URL: https://issues.apache.org/jira/browse/RYA-443 > Project: Rya > Issue Type: Task >Reporter: Andrew Smith >Assignee: Kevin Chilton >Priority: Major > > We need an application that watches the QueryChangeLog to see when the > isActive state of queries changes and then reacts to the state change. If > isActive goes to true, the system must start processing the query. If it is > false, then it must stop processing the query. This application needs to > start when the host machine starts. We plan to support CentOS 7. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168278306 --- Diff: extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java --- @@ -85,4 +85,19 @@ public boolean equals(final Object o) { } return false; } + +@Override +public String toString() { +final StringBuilder sb = new StringBuilder(); +sb.append("ID: "); +sb.append(getQueryId().toString() + "\n"); +sb.append("Query: "); +sb.append(getSparql() + "\n"); --- End diff -- Nope. ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168278274 --- Diff: extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java --- @@ -107,12 +112,11 @@ private String formatQueries(final Set queries) { sb.append("Queries in Rya Streams:\n"); sb.append("-\n"); queries.forEach(query -> { -sb.append("ID: "); -sb.append(query.getQueryId()); -sb.append("\t\t"); -sb.append("Query: "); -sb.append(query.getSparql()); -sb.append("\n"); +sb.append("ID: ").append(query.getQueryId()) +.append("") +.append("Is Active: ").append(query.isActive()) +.append( query.isActive() ? " " : "" ) --- End diff -- Done. ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168274700 --- Diff: extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java --- @@ -243,8 +274,68 @@ private void updateCache() { it.close(); } } catch (final QueryChangeLogException e) { -LOG.error("Could not close the " + CloseableIteration.class.getName(), e); +log.error("Could not close the " + CloseableIteration.class.getName(), e); } + +log.trace("updateCache() - Exit"); +} +} + +@Override +protected void runOneIteration() throws Exception { +log.trace("runOneIteration() - Enter"); +lock.lock(); +try { +updateCache(); +} finally { +lock.unlock(); +} +log.trace("runOneIteration() - Exit"); --- End diff -- Yep, done. ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168277599 --- Diff: extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java --- @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +/** + * Utilities that are useful for interacting with {@link Thread}s while testing. + */ +public class ThreadUtil { --- End diff -- Done. ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168277168 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java --- @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.JAXBException; + +import org.apache.commons.daemon.Daemon; +import org.apache.commons.daemon.DaemonContext; +import org.apache.commons.daemon.DaemonInitException; +import org.apache.rya.streams.kafka.KafkaStreamsFactory; +import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory; +import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic; +import org.apache.rya.streams.querymanager.kafka.KafkaQueryChangeLogSource; +import org.apache.rya.streams.querymanager.kafka.LocalQueryExecutor; +import org.apache.rya.streams.querymanager.xml.Kafka; +import org.apache.rya.streams.querymanager.xml.QueryManagerConfig; +import org.apache.rya.streams.querymanager.xml.QueryManagerConfig.PerformanceTunning.QueryChanngeLogDiscoveryPeriod; +import org.apache.rya.streams.querymanager.xml.QueryManagerConfigUnmarshaller; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xml.sax.SAXException; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * JSVC integration code for a {@link QueryManager} to be used as a non-Windows daemon. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManagerDaemon implements Daemon { + +private static final Logger log = LoggerFactory.getLogger(QueryManagerDaemon.class); + +/** + * The default configuration file's path for the application. + */ +private static final Path DEFAULT_CONFIGURATION_PATH = Paths.get("config/configuration.xml"); + +/** + * Command line parameters that are used by all commands that interact with Kafka. + */ +class DaemonParameters { +@Parameter(names = {"--config", "-c"}, required = false, description = "The path to the application's configuration file.") +public String config; +} + +private QueryManager manager = null; + +@Override +public void init(final DaemonContext context) throws DaemonInitException, Exception { +requireNonNull(context); + +// Parse the command line arguments for the configuration file to use. +final String[] args = context.getArguments(); +final DaemonParameters params = new DaemonParameters(); +try { +new JCommander(params).parse(args); +} catch(final ParameterException e) { +throw new DaemonInitException("Unable to parse the command line arguments.", e); +} +final Path configFile = params.config != null ? Paths.get(params.config) : DEFAULT_CONFIGURATION_PATH; +log.info("Loading the following configuration file: " + configFile); + +// Unmarshall the configuration file into an object. +final QueryManagerConfig config; +try(InputStream stream = Files.newInputStream(configFile)) { --- End diff -- Done. ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168277066 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java --- @@ -0,0 +1,884 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.ChangeLogEntry; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryChangeLogListener; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; +import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A service for managing {@link StreamsQuery} running on a Rya Streams system. + * + * Only one QueryManager needs to be running to manage any number of rya + * instances/rya streams instances. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManager extends AbstractService { +private static final Logger log = LoggerFactory.getLogger(QueryManager.class); + +/** + * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific + * Rya instnace. + */ +private final QueryChangeLogSource changeLogSource; + +/** + * The engine that is responsible for executing {@link StreamsQuery}s. + */ +private final QueryExecutor queryExecutor; + +/** + * How long blocking operations will be attempted before potentially trying again. + */ +private final long blockingValue; + +/** + * The units for {@link #blockingValue}. + */ +private final TimeUnit blockingUnits; + +/** + * Used to inform threads that the application is shutting down, so they must stop work. + */ +private final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + +/** + * This thread pool manages the two thread used to work the {@link LogEvent}s + * and the {@link QueryEvent}s. + */ +private final ExecutorService executor = Executors.newFixedThreadPool(2); + +/** + * Creates a new {@link QueryManager}. + * + * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null) + * @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null) + * @param blockingValue - How long blocking operations will try before looping. (> 0) + * @param blockingUnits - The units of the {@code blockingValue}. (not null) + */ +
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168276972 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java --- @@ -0,0 +1,884 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.ChangeLogEntry; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryChangeLogListener; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; +import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A service for managing {@link StreamsQuery} running on a Rya Streams system. + * + * Only one QueryManager needs to be running to manage any number of rya + * instances/rya streams instances. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManager extends AbstractService { +private static final Logger log = LoggerFactory.getLogger(QueryManager.class); + +/** + * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific + * Rya instnace. + */ +private final QueryChangeLogSource changeLogSource; + +/** + * The engine that is responsible for executing {@link StreamsQuery}s. + */ +private final QueryExecutor queryExecutor; + +/** + * How long blocking operations will be attempted before potentially trying again. + */ +private final long blockingValue; + +/** + * The units for {@link #blockingValue}. + */ +private final TimeUnit blockingUnits; + +/** + * Used to inform threads that the application is shutting down, so they must stop work. + */ +private final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + +/** + * This thread pool manages the two thread used to work the {@link LogEvent}s + * and the {@link QueryEvent}s. + */ +private final ExecutorService executor = Executors.newFixedThreadPool(2); + +/** + * Creates a new {@link QueryManager}. + * + * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null) + * @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null) + * @param blockingValue - How long blocking operations will try before looping. (> 0) + * @param blockingUnits - The units of the {@code blockingValue}. (not null) + */ +
[jira] [Commented] (RYA-443) Implement a single node query manager
[ https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364615#comment-16364615 ] ASF GitHub Bot commented on RYA-443: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168276251 --- Diff: extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java --- @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka; + +import static java.util.Objects.requireNonNull; + +import java.util.Properties; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.function.projection.RandomUUIDFactory; +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.kafka.topology.TopologyBuilderFactory; +import org.apache.rya.streams.kafka.topology.TopologyBuilderFactory.TopologyBuilderException; +import org.apache.rya.streams.kafka.topology.TopologyFactory; +import org.openrdf.query.MalformedQueryException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Creates {@link KafkaStreams} objects that are able to process {@link StreamsQuery}s + * using a single thread of execution starting from the earliest point in within the + * input topic. The Application ID used by the client is based on the Query ID of the + * query that is being executed so that this job may resume where it left off if it + * is stopped. + */ +@DefaultAnnotation(NonNull.class) +public class SingleThreadKafkaStreamsFactory implements KafkaStreamsFactory { + +private final TopologyBuilderFactory topologyFactory = new TopologyFactory(); + +private final String bootstrapServersConfig; + +/** + * Constructs an instance of {@link SingleThreadKafkaStreamsFactory}. + * + * @param bootstrapServersConfig - Configures which Kafka cluster the jobs will interact with. (not null) + */ +public SingleThreadKafkaStreamsFactory(final String bootstrapServersConfig) { +this.bootstrapServersConfig = requireNonNull(bootstrapServersConfig); +} + +@Override +public KafkaStreams make(final String ryaInstance, final StreamsQuery query) throws KafkaStreamsFactoryException { +requireNonNull(ryaInstance); +requireNonNull(query); + +// Setup the Kafka Stream program. +final Properties streamsProps = new Properties(); + +// Configure the Kafka servers that will be talked to. +streamsProps.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig); + +// Use the Query ID as the Application ID to ensure we resume where we left off the last time this command was run. +streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "RyaStreams-Query-" + query.getQueryId()); + +// Always start at the beginning of the input topic. +streamsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + +// Setup the topology that processes the Query. +final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); +final String resultsTopic = KafkaTopics.queryResultsTopic(query.getQueryId()); + +try { +final TopologyBuilder topologyBuilder = topologyFactory.build(query.getSparql(), statementsTopic, resultsTopic, new RandomUUIDFactory()); +return new KafkaStreams(topologyBuilder, new StreamsConfig(streamsProps)); +} catch (MalformedQueryException | TopologyBuilderException e) { --- End diff --
[jira] [Commented] (RYA-443) Implement a single node query manager
[ https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364617#comment-16364617 ] ASF GitHub Bot commented on RYA-443: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168276546 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java --- @@ -0,0 +1,884 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.ChangeLogEntry; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryChangeLogListener; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; +import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A service for managing {@link StreamsQuery} running on a Rya Streams system. + * + * Only one QueryManager needs to be running to manage any number of rya + * instances/rya streams instances. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManager extends AbstractService { +private static final Logger log = LoggerFactory.getLogger(QueryManager.class); + +/** + * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific + * Rya instnace. + */ +private final QueryChangeLogSource changeLogSource; + +/** + * The engine that is responsible for executing {@link StreamsQuery}s. + */ +private final QueryExecutor queryExecutor; + +/** + * How long blocking operations will be attempted before potentially trying again. + */ +private final long blockingValue; + +/** + * The units for {@link #blockingValue}. + */ +private final TimeUnit blockingUnits; + +/** + * Used to inform threads that the application is shutting down, so they must stop work. + */ +private final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + +/** + * This thread pool manages the two thread used to work the {@link LogEvent}s + * and the {@link QueryEvent}s. + */ +private final ExecutorService executor = Executors.newFixedThreadPool(2); + +/** + * Creates a new {@link QueryManager}. + * + * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null) + * @param source - The {@link QueryChangeLogSource} of
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168276451 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java --- @@ -0,0 +1,884 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.ChangeLogEntry; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryChangeLogListener; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; +import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A service for managing {@link StreamsQuery} running on a Rya Streams system. + * + * Only one QueryManager needs to be running to manage any number of rya + * instances/rya streams instances. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManager extends AbstractService { +private static final Logger log = LoggerFactory.getLogger(QueryManager.class); + +/** + * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific + * Rya instnace. + */ +private final QueryChangeLogSource changeLogSource; + +/** + * The engine that is responsible for executing {@link StreamsQuery}s. + */ +private final QueryExecutor queryExecutor; + +/** + * How long blocking operations will be attempted before potentially trying again. + */ +private final long blockingValue; + +/** + * The units for {@link #blockingValue}. + */ +private final TimeUnit blockingUnits; + +/** + * Used to inform threads that the application is shutting down, so they must stop work. + */ +private final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + +/** + * This thread pool manages the two thread used to work the {@link LogEvent}s + * and the {@link QueryEvent}s. + */ +private final ExecutorService executor = Executors.newFixedThreadPool(2); + +/** + * Creates a new {@link QueryManager}. + * + * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null) + * @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null) + * @param blockingValue - How long blocking operations will try before looping. (> 0) + * @param blockingUnits - The units of the {@code blockingValue}. (not null) + */ +
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168276645 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java --- @@ -0,0 +1,884 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.ChangeLogEntry; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryChangeLogListener; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; +import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A service for managing {@link StreamsQuery} running on a Rya Streams system. + * + * Only one QueryManager needs to be running to manage any number of rya + * instances/rya streams instances. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManager extends AbstractService { +private static final Logger log = LoggerFactory.getLogger(QueryManager.class); + +/** + * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific + * Rya instnace. + */ +private final QueryChangeLogSource changeLogSource; + +/** + * The engine that is responsible for executing {@link StreamsQuery}s. + */ +private final QueryExecutor queryExecutor; + +/** + * How long blocking operations will be attempted before potentially trying again. + */ +private final long blockingValue; + +/** + * The units for {@link #blockingValue}. + */ +private final TimeUnit blockingUnits; + +/** + * Used to inform threads that the application is shutting down, so they must stop work. + */ +private final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + +/** + * This thread pool manages the two thread used to work the {@link LogEvent}s + * and the {@link QueryEvent}s. + */ +private final ExecutorService executor = Executors.newFixedThreadPool(2); + +/** + * Creates a new {@link QueryManager}. + * + * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null) + * @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null) + * @param blockingValue - How long blocking operations will try before looping. (> 0) + * @param blockingUnits - The units of the {@code blockingValue}. (not null) + */ +
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168276251 --- Diff: extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java --- @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka; + +import static java.util.Objects.requireNonNull; + +import java.util.Properties; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.function.projection.RandomUUIDFactory; +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.kafka.topology.TopologyBuilderFactory; +import org.apache.rya.streams.kafka.topology.TopologyBuilderFactory.TopologyBuilderException; +import org.apache.rya.streams.kafka.topology.TopologyFactory; +import org.openrdf.query.MalformedQueryException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Creates {@link KafkaStreams} objects that are able to process {@link StreamsQuery}s + * using a single thread of execution starting from the earliest point in within the + * input topic. The Application ID used by the client is based on the Query ID of the + * query that is being executed so that this job may resume where it left off if it + * is stopped. + */ +@DefaultAnnotation(NonNull.class) +public class SingleThreadKafkaStreamsFactory implements KafkaStreamsFactory { + +private final TopologyBuilderFactory topologyFactory = new TopologyFactory(); + +private final String bootstrapServersConfig; + +/** + * Constructs an instance of {@link SingleThreadKafkaStreamsFactory}. + * + * @param bootstrapServersConfig - Configures which Kafka cluster the jobs will interact with. (not null) + */ +public SingleThreadKafkaStreamsFactory(final String bootstrapServersConfig) { +this.bootstrapServersConfig = requireNonNull(bootstrapServersConfig); +} + +@Override +public KafkaStreams make(final String ryaInstance, final StreamsQuery query) throws KafkaStreamsFactoryException { +requireNonNull(ryaInstance); +requireNonNull(query); + +// Setup the Kafka Stream program. +final Properties streamsProps = new Properties(); + +// Configure the Kafka servers that will be talked to. +streamsProps.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig); + +// Use the Query ID as the Application ID to ensure we resume where we left off the last time this command was run. +streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "RyaStreams-Query-" + query.getQueryId()); + +// Always start at the beginning of the input topic. +streamsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + +// Setup the topology that processes the Query. +final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); +final String resultsTopic = KafkaTopics.queryResultsTopic(query.getQueryId()); + +try { +final TopologyBuilder topologyBuilder = topologyFactory.build(query.getSparql(), statementsTopic, resultsTopic, new RandomUUIDFactory()); +return new KafkaStreams(topologyBuilder, new StreamsConfig(streamsProps)); +} catch (MalformedQueryException | TopologyBuilderException e) { --- End diff -- Done. ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168276101 --- Diff: extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java --- @@ -48,7 +51,28 @@ */ public static String queryChangeLogTopic(final String ryaInstance) { requireNonNull(ryaInstance); -return ryaInstance + "-QueryChangeLog"; +return ryaInstance + QUERY_CHANGE_LOG_TOPIC_SUFFIX; +} + +/** + * Get the Rya instance name from a Kafka topic name that has been used for a {@link QueryChnageLog}. --- End diff -- Done. ---
[jira] [Commented] (RYA-443) Implement a single node query manager
[ https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364612#comment-16364612 ] ASF GitHub Bot commented on RYA-443: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168275881 --- Diff: extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java --- @@ -85,11 +92,16 @@ public void initializedWithPopulatedChangeLog() throws Exception { queries.delete( deletedMeId ); // Create a new totally in memory QueryRepository. -try(final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog )) { +final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog, SCHEDULE ); +try { // Listing the queries should work using an initialized change log. final Set stored = initializedQueries.list(); assertEquals(expected, stored); +} finally { +queries.stop(); --- End diff -- Removed. > Implement a single node query manager > - > > Key: RYA-443 > URL: https://issues.apache.org/jira/browse/RYA-443 > Project: Rya > Issue Type: Task >Reporter: Andrew Smith >Assignee: Kevin Chilton >Priority: Major > > We need an application that watches the QueryChangeLog to see when the > isActive state of queries changes and then reacts to the state change. If > isActive goes to true, the system must start processing the query. If it is > false, then it must stop processing the query. This application needs to > start when the host machine starts. We plan to support CentOS 7. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-443) Implement a single node query manager
[ https://issues.apache.org/jira/browse/RYA-443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364610#comment-16364610 ] ASF GitHub Bot commented on RYA-443: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168275659 --- Diff: extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java --- @@ -110,6 +110,16 @@ public boolean equals(final Object o) { return false; } +@Override +public String toString() { +return "QueryChange: {" + + "Query ID: " + queryId + ",\n" + + "Change Type: " + changeType + ",\n" + + "Is Active: " + isActive + ",\n" + + "SPARQL: " + sparql + "\n" + --- End diff -- I'm going to assume it is sanitized by the time it reaches this line of code. > Implement a single node query manager > - > > Key: RYA-443 > URL: https://issues.apache.org/jira/browse/RYA-443 > Project: Rya > Issue Type: Task >Reporter: Andrew Smith >Assignee: Kevin Chilton >Priority: Major > > We need an application that watches the QueryChangeLog to see when the > isActive state of queries changes and then reacts to the state change. If > isActive goes to true, the system must start processing the query. If it is > false, then it must stop processing the query. This application needs to > start when the host machine starts. We plan to support CentOS 7. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168275881 --- Diff: extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java --- @@ -85,11 +92,16 @@ public void initializedWithPopulatedChangeLog() throws Exception { queries.delete( deletedMeId ); // Create a new totally in memory QueryRepository. -try(final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog )) { +final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog, SCHEDULE ); +try { // Listing the queries should work using an initialized change log. final Set stored = initializedQueries.list(); assertEquals(expected, stored); +} finally { +queries.stop(); --- End diff -- Removed. ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168275659 --- Diff: extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java --- @@ -110,6 +110,16 @@ public boolean equals(final Object o) { return false; } +@Override +public String toString() { +return "QueryChange: {" + + "Query ID: " + queryId + ",\n" + + "Change Type: " + changeType + ",\n" + + "Is Active: " + isActive + ",\n" + + "SPARQL: " + sparql + "\n" + --- End diff -- I'm going to assume it is sanitized by the time it reaches this line of code. ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168274631 --- Diff: extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java --- @@ -85,4 +85,19 @@ public boolean equals(final Object o) { } return false; } + +@Override +public String toString() { +final StringBuilder sb = new StringBuilder(); +sb.append("ID: "); +sb.append(getQueryId().toString() + "\n"); +sb.append("Query: "); +sb.append(getSparql() + "\n"); +sb.append("Is "); +if (!isActive) { +sb.append(" Not "); --- End diff -- Done. ---
[GitHub] incubator-rya issue #274: Added giraph profile to resolve an incompatible li...
Github user asfgit commented on the issue: https://github.com/apache/incubator-rya/pull/274 Refer to this link for build results (access rights to CI server needed): https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/690/ ---
[GitHub] incubator-rya pull request #274: Added giraph profile to resolve an incompat...
GitHub user jdasch opened a pull request: https://github.com/apache/incubator-rya/pull/274 Added giraph profile to resolve an incompatible license issue. ## Description Added a profile to make rya.giraph an optional module in the maven build. ### Tests N/A ### Checklist - [ ] Code Review - [ ] Squash Commits People To Review You can merge this pull request into a Git repository by running: $ git pull https://github.com/jdasch/incubator-rya candidate/giraph-profile Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-rya/pull/274.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #274 commit 5cdfae209d176585b40b7cedede4808ee5de9a22 Author: jdaschDate: 2018-02-13T21:49:10Z Added giraph profile to resolve an incompatible license issue. ---