[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 closed the pull request at: https://github.com/apache/incubator-rya/pull/272 ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r170073123 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java --- @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager.kafka; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory; +import org.apache.rya.streams.querymanager.QueryChangeLogSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.AbstractScheduledService; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Represents a Kafka Server that manages {@link KafkaQueryChangeLog}s. + * + * Thread safe. --- End diff -- net.jcip.annotations.ThreadSafe Looks like Rya is using this one: ``` com.github.stephenc.jcip jcip-annotations 1.0-1 ``` ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r170069054 --- Diff: extras/rya.streams/query-manager/pom.xml --- @@ -0,0 +1,253 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + +org.apache.rya +rya.streams.parent +3.2.12-incubating-SNAPSHOT + + +4.0.0 +rya.streams.query-manager + +Apache Rya Streams Query Manager + +This module contains the Rya Streams Query Manager. + + + + ${project.build.directory}/${project.artifactId}-${project.version}-rpm-staging + + + + + +org.apache.rya +rya.streams.kafka + + + + +commons-daemon +commons-daemon +1.1.0 + + +org.slf4j +slf4j-log4j12 + + +com.beust +jcommander + + + + +junit +junit +test + + +org.apache.rya +rya.test.kafka +test + + +org.mockito +mockito-all +test + + + + + + +src/main/xsd + + + + + + +org.codehaus.mojo +jaxb2-maven-plugin + + +xjc + +xjc + + + + + org.apache.rya.streams.querymanager.xml + + + + + +com.mycila +license-maven-plugin + + ${project.basedir}/src/license/header.txt + + + +update-generated-source-headers + + ${project.build.directory}/generated-sources/jaxb + + XML_STYLE + + +process-sources + +format + + + + + + + +org.apache.maven.plugins +maven-shade-plugin + + +package + + shade + + + + + org.apache.rya.streams.querymanager.QueryManagerDaemon + + + + + + + + + + +org.apache.maven.plugins +maven-assembly-plugin + + + +stage-content-for-rpms + +single + +package + + +false + + src/assembly/rpm-staging.xml + + + + + + + +org.codehaus.mojo +rpm-maven-plugin + + +set-rpm-properties + +version + + + +create-rpm-distribution --- End diff --
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168278465 --- Diff: extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java --- @@ -107,12 +112,11 @@ private String formatQueries(final Set queries) { sb.append("Queries in Rya Streams:\n"); sb.append("-\n"); queries.forEach(query -> { -sb.append("ID: "); -sb.append(query.getQueryId()); -sb.append("\t\t"); -sb.append("Query: "); -sb.append(query.getSparql()); -sb.append("\n"); +sb.append("ID: ").append(query.getQueryId()) +.append("") +.append("Is Active: ").append(query.isActive()) +.append( query.isActive() ? " " : "" ) +.append("Query: ").append(query.getSparql()).append("\n"); --- End diff -- Should b e sanitized before it gets here. ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168278306 --- Diff: extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java --- @@ -85,4 +85,19 @@ public boolean equals(final Object o) { } return false; } + +@Override +public String toString() { +final StringBuilder sb = new StringBuilder(); +sb.append("ID: "); +sb.append(getQueryId().toString() + "\n"); +sb.append("Query: "); +sb.append(getSparql() + "\n"); --- End diff -- Nope. ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168278274 --- Diff: extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java --- @@ -107,12 +112,11 @@ private String formatQueries(final Set queries) { sb.append("Queries in Rya Streams:\n"); sb.append("-\n"); queries.forEach(query -> { -sb.append("ID: "); -sb.append(query.getQueryId()); -sb.append("\t\t"); -sb.append("Query: "); -sb.append(query.getSparql()); -sb.append("\n"); +sb.append("ID: ").append(query.getQueryId()) +.append("") +.append("Is Active: ").append(query.isActive()) +.append( query.isActive() ? " " : "" ) --- End diff -- Done. ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168274700 --- Diff: extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java --- @@ -243,8 +274,68 @@ private void updateCache() { it.close(); } } catch (final QueryChangeLogException e) { -LOG.error("Could not close the " + CloseableIteration.class.getName(), e); +log.error("Could not close the " + CloseableIteration.class.getName(), e); } + +log.trace("updateCache() - Exit"); +} +} + +@Override +protected void runOneIteration() throws Exception { +log.trace("runOneIteration() - Enter"); +lock.lock(); +try { +updateCache(); +} finally { +lock.unlock(); +} +log.trace("runOneIteration() - Exit"); --- End diff -- Yep, done. ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168277599 --- Diff: extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java --- @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +/** + * Utilities that are useful for interacting with {@link Thread}s while testing. + */ +public class ThreadUtil { --- End diff -- Done. ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168277168 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java --- @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.JAXBException; + +import org.apache.commons.daemon.Daemon; +import org.apache.commons.daemon.DaemonContext; +import org.apache.commons.daemon.DaemonInitException; +import org.apache.rya.streams.kafka.KafkaStreamsFactory; +import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory; +import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic; +import org.apache.rya.streams.querymanager.kafka.KafkaQueryChangeLogSource; +import org.apache.rya.streams.querymanager.kafka.LocalQueryExecutor; +import org.apache.rya.streams.querymanager.xml.Kafka; +import org.apache.rya.streams.querymanager.xml.QueryManagerConfig; +import org.apache.rya.streams.querymanager.xml.QueryManagerConfig.PerformanceTunning.QueryChanngeLogDiscoveryPeriod; +import org.apache.rya.streams.querymanager.xml.QueryManagerConfigUnmarshaller; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xml.sax.SAXException; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * JSVC integration code for a {@link QueryManager} to be used as a non-Windows daemon. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManagerDaemon implements Daemon { + +private static final Logger log = LoggerFactory.getLogger(QueryManagerDaemon.class); + +/** + * The default configuration file's path for the application. + */ +private static final Path DEFAULT_CONFIGURATION_PATH = Paths.get("config/configuration.xml"); + +/** + * Command line parameters that are used by all commands that interact with Kafka. + */ +class DaemonParameters { +@Parameter(names = {"--config", "-c"}, required = false, description = "The path to the application's configuration file.") +public String config; +} + +private QueryManager manager = null; + +@Override +public void init(final DaemonContext context) throws DaemonInitException, Exception { +requireNonNull(context); + +// Parse the command line arguments for the configuration file to use. +final String[] args = context.getArguments(); +final DaemonParameters params = new DaemonParameters(); +try { +new JCommander(params).parse(args); +} catch(final ParameterException e) { +throw new DaemonInitException("Unable to parse the command line arguments.", e); +} +final Path configFile = params.config != null ? Paths.get(params.config) : DEFAULT_CONFIGURATION_PATH; +log.info("Loading the following configuration file: " + configFile); + +// Unmarshall the configuration file into an object. +final QueryManagerConfig config; +try(InputStream stream = Files.newInputStream(configFile)) { --- End diff -- Done. ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168277066 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java --- @@ -0,0 +1,884 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.ChangeLogEntry; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryChangeLogListener; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; +import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A service for managing {@link StreamsQuery} running on a Rya Streams system. + * + * Only one QueryManager needs to be running to manage any number of rya + * instances/rya streams instances. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManager extends AbstractService { +private static final Logger log = LoggerFactory.getLogger(QueryManager.class); + +/** + * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific + * Rya instnace. + */ +private final QueryChangeLogSource changeLogSource; + +/** + * The engine that is responsible for executing {@link StreamsQuery}s. + */ +private final QueryExecutor queryExecutor; + +/** + * How long blocking operations will be attempted before potentially trying again. + */ +private final long blockingValue; + +/** + * The units for {@link #blockingValue}. + */ +private final TimeUnit blockingUnits; + +/** + * Used to inform threads that the application is shutting down, so they must stop work. + */ +private final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + +/** + * This thread pool manages the two thread used to work the {@link LogEvent}s + * and the {@link QueryEvent}s. + */ +private final ExecutorService executor = Executors.newFixedThreadPool(2); + +/** + * Creates a new {@link QueryManager}. + * + * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null) + * @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null) + * @param blockingValue - How long blocking operations will try before looping. (> 0) + * @param blockingUnits - The units of the {@code blockingValue}. (not null) + */ +
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168276972 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java --- @@ -0,0 +1,884 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.ChangeLogEntry; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryChangeLogListener; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; +import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A service for managing {@link StreamsQuery} running on a Rya Streams system. + * + * Only one QueryManager needs to be running to manage any number of rya + * instances/rya streams instances. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManager extends AbstractService { +private static final Logger log = LoggerFactory.getLogger(QueryManager.class); + +/** + * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific + * Rya instnace. + */ +private final QueryChangeLogSource changeLogSource; + +/** + * The engine that is responsible for executing {@link StreamsQuery}s. + */ +private final QueryExecutor queryExecutor; + +/** + * How long blocking operations will be attempted before potentially trying again. + */ +private final long blockingValue; + +/** + * The units for {@link #blockingValue}. + */ +private final TimeUnit blockingUnits; + +/** + * Used to inform threads that the application is shutting down, so they must stop work. + */ +private final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + +/** + * This thread pool manages the two thread used to work the {@link LogEvent}s + * and the {@link QueryEvent}s. + */ +private final ExecutorService executor = Executors.newFixedThreadPool(2); + +/** + * Creates a new {@link QueryManager}. + * + * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null) + * @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null) + * @param blockingValue - How long blocking operations will try before looping. (> 0) + * @param blockingUnits - The units of the {@code blockingValue}. (not null) + */ +
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168276451 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java --- @@ -0,0 +1,884 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.ChangeLogEntry; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryChangeLogListener; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; +import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A service for managing {@link StreamsQuery} running on a Rya Streams system. + * + * Only one QueryManager needs to be running to manage any number of rya + * instances/rya streams instances. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManager extends AbstractService { +private static final Logger log = LoggerFactory.getLogger(QueryManager.class); + +/** + * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific + * Rya instnace. + */ +private final QueryChangeLogSource changeLogSource; + +/** + * The engine that is responsible for executing {@link StreamsQuery}s. + */ +private final QueryExecutor queryExecutor; + +/** + * How long blocking operations will be attempted before potentially trying again. + */ +private final long blockingValue; + +/** + * The units for {@link #blockingValue}. + */ +private final TimeUnit blockingUnits; + +/** + * Used to inform threads that the application is shutting down, so they must stop work. + */ +private final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + +/** + * This thread pool manages the two thread used to work the {@link LogEvent}s + * and the {@link QueryEvent}s. + */ +private final ExecutorService executor = Executors.newFixedThreadPool(2); + +/** + * Creates a new {@link QueryManager}. + * + * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null) + * @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null) + * @param blockingValue - How long blocking operations will try before looping. (> 0) + * @param blockingUnits - The units of the {@code blockingValue}. (not null) + */ +
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168276645 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java --- @@ -0,0 +1,884 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.ChangeLogEntry; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryChangeLogListener; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; +import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A service for managing {@link StreamsQuery} running on a Rya Streams system. + * + * Only one QueryManager needs to be running to manage any number of rya + * instances/rya streams instances. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManager extends AbstractService { +private static final Logger log = LoggerFactory.getLogger(QueryManager.class); + +/** + * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific + * Rya instnace. + */ +private final QueryChangeLogSource changeLogSource; + +/** + * The engine that is responsible for executing {@link StreamsQuery}s. + */ +private final QueryExecutor queryExecutor; + +/** + * How long blocking operations will be attempted before potentially trying again. + */ +private final long blockingValue; + +/** + * The units for {@link #blockingValue}. + */ +private final TimeUnit blockingUnits; + +/** + * Used to inform threads that the application is shutting down, so they must stop work. + */ +private final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + +/** + * This thread pool manages the two thread used to work the {@link LogEvent}s + * and the {@link QueryEvent}s. + */ +private final ExecutorService executor = Executors.newFixedThreadPool(2); + +/** + * Creates a new {@link QueryManager}. + * + * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null) + * @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null) + * @param blockingValue - How long blocking operations will try before looping. (> 0) + * @param blockingUnits - The units of the {@code blockingValue}. (not null) + */ +
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168276251 --- Diff: extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java --- @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka; + +import static java.util.Objects.requireNonNull; + +import java.util.Properties; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.function.projection.RandomUUIDFactory; +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.kafka.topology.TopologyBuilderFactory; +import org.apache.rya.streams.kafka.topology.TopologyBuilderFactory.TopologyBuilderException; +import org.apache.rya.streams.kafka.topology.TopologyFactory; +import org.openrdf.query.MalformedQueryException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Creates {@link KafkaStreams} objects that are able to process {@link StreamsQuery}s + * using a single thread of execution starting from the earliest point in within the + * input topic. The Application ID used by the client is based on the Query ID of the + * query that is being executed so that this job may resume where it left off if it + * is stopped. + */ +@DefaultAnnotation(NonNull.class) +public class SingleThreadKafkaStreamsFactory implements KafkaStreamsFactory { + +private final TopologyBuilderFactory topologyFactory = new TopologyFactory(); + +private final String bootstrapServersConfig; + +/** + * Constructs an instance of {@link SingleThreadKafkaStreamsFactory}. + * + * @param bootstrapServersConfig - Configures which Kafka cluster the jobs will interact with. (not null) + */ +public SingleThreadKafkaStreamsFactory(final String bootstrapServersConfig) { +this.bootstrapServersConfig = requireNonNull(bootstrapServersConfig); +} + +@Override +public KafkaStreams make(final String ryaInstance, final StreamsQuery query) throws KafkaStreamsFactoryException { +requireNonNull(ryaInstance); +requireNonNull(query); + +// Setup the Kafka Stream program. +final Properties streamsProps = new Properties(); + +// Configure the Kafka servers that will be talked to. +streamsProps.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig); + +// Use the Query ID as the Application ID to ensure we resume where we left off the last time this command was run. +streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "RyaStreams-Query-" + query.getQueryId()); + +// Always start at the beginning of the input topic. +streamsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + +// Setup the topology that processes the Query. +final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); +final String resultsTopic = KafkaTopics.queryResultsTopic(query.getQueryId()); + +try { +final TopologyBuilder topologyBuilder = topologyFactory.build(query.getSparql(), statementsTopic, resultsTopic, new RandomUUIDFactory()); +return new KafkaStreams(topologyBuilder, new StreamsConfig(streamsProps)); +} catch (MalformedQueryException | TopologyBuilderException e) { --- End diff -- Done. ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168276101 --- Diff: extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java --- @@ -48,7 +51,28 @@ */ public static String queryChangeLogTopic(final String ryaInstance) { requireNonNull(ryaInstance); -return ryaInstance + "-QueryChangeLog"; +return ryaInstance + QUERY_CHANGE_LOG_TOPIC_SUFFIX; +} + +/** + * Get the Rya instance name from a Kafka topic name that has been used for a {@link QueryChnageLog}. --- End diff -- Done. ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168275881 --- Diff: extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java --- @@ -85,11 +92,16 @@ public void initializedWithPopulatedChangeLog() throws Exception { queries.delete( deletedMeId ); // Create a new totally in memory QueryRepository. -try(final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog )) { +final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog, SCHEDULE ); +try { // Listing the queries should work using an initialized change log. final Set stored = initializedQueries.list(); assertEquals(expected, stored); +} finally { +queries.stop(); --- End diff -- Removed. ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168275659 --- Diff: extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java --- @@ -110,6 +110,16 @@ public boolean equals(final Object o) { return false; } +@Override +public String toString() { +return "QueryChange: {" + + "Query ID: " + queryId + ",\n" + + "Change Type: " + changeType + ",\n" + + "Is Active: " + isActive + ",\n" + + "SPARQL: " + sparql + "\n" + --- End diff -- I'm going to assume it is sanitized by the time it reaches this line of code. ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r168274631 --- Diff: extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java --- @@ -85,4 +85,19 @@ public boolean equals(final Object o) { } return false; } + +@Override +public String toString() { +final StringBuilder sb = new StringBuilder(); +sb.append("ID: "); +sb.append(getQueryId().toString() + "\n"); +sb.append("Query: "); +sb.append(getSparql() + "\n"); +sb.append("Is "); +if (!isActive) { +sb.append(" Not "); --- End diff -- Done. ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167284717 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java --- @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager.kafka; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory; +import org.apache.rya.streams.querymanager.QueryChangeLogSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.AbstractScheduledService; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Represents a Kafka Server that manages {@link KafkaQueryChangeLog}s. + * + * Thread safe. --- End diff -- What's the dependency for that annotation? ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167284418 --- Diff: extras/rya.streams/query-manager/pom.xml --- @@ -0,0 +1,253 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + +org.apache.rya +rya.streams.parent +3.2.12-incubating-SNAPSHOT + + +4.0.0 +rya.streams.query-manager + +Apache Rya Streams Query Manager + +This module contains the Rya Streams Query Manager. + + + + ${project.build.directory}/${project.artifactId}-${project.version}-rpm-staging + + + + + +org.apache.rya +rya.streams.kafka + + + + +commons-daemon +commons-daemon +1.1.0 + + +org.slf4j +slf4j-log4j12 + + +com.beust +jcommander + + + + +junit +junit +test + + +org.apache.rya +rya.test.kafka +test + + +org.mockito +mockito-all +test + + + + + + +src/main/xsd + + + + + + +org.codehaus.mojo +jaxb2-maven-plugin + + +xjc + +xjc + + + + + org.apache.rya.streams.querymanager.xml + + + + + +com.mycila +license-maven-plugin + + ${project.basedir}/src/license/header.txt + + + +update-generated-source-headers + + ${project.build.directory}/generated-sources/jaxb + + XML_STYLE + + +process-sources + +format + + + + + + + +org.apache.maven.plugins +maven-shade-plugin + + +package + + shade + + + + + org.apache.rya.streams.querymanager.QueryManagerDaemon + + + + + + + + + + +org.apache.maven.plugins +maven-assembly-plugin + + + +stage-content-for-rpms + +single + +package + + +false + + src/assembly/rpm-staging.xml + + + + + + + +org.codehaus.mojo +rpm-maven-plugin + + +set-rpm-properties + +version + + + +create-rpm-distribution --- End diff -- Do you
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167283976 --- Diff: extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java --- @@ -110,6 +110,16 @@ public boolean equals(final Object o) { return false; } +@Override +public String toString() { +return "QueryChange: {" + + "Query ID: " + queryId + ",\n" + + "Change Type: " + changeType + ",\n" + + "Is Active: " + isActive + ",\n" + + "SPARQL: " + sparql + "\n" + --- End diff -- What does that function do? ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167241732 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java --- @@ -0,0 +1,884 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.ChangeLogEntry; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryChangeLogListener; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; +import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A service for managing {@link StreamsQuery} running on a Rya Streams system. + * + * Only one QueryManager needs to be running to manage any number of rya + * instances/rya streams instances. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManager extends AbstractService { +private static final Logger log = LoggerFactory.getLogger(QueryManager.class); + +/** + * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific + * Rya instnace. + */ +private final QueryChangeLogSource changeLogSource; + +/** + * The engine that is responsible for executing {@link StreamsQuery}s. + */ +private final QueryExecutor queryExecutor; + +/** + * How long blocking operations will be attempted before potentially trying again. + */ +private final long blockingValue; + +/** + * The units for {@link #blockingValue}. + */ +private final TimeUnit blockingUnits; + +/** + * Used to inform threads that the application is shutting down, so they must stop work. + */ +private final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + +/** + * This thread pool manages the two thread used to work the {@link LogEvent}s + * and the {@link QueryEvent}s. + */ +private final ExecutorService executor = Executors.newFixedThreadPool(2); + +/** + * Creates a new {@link QueryManager}. + * + * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null) + * @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null) + * @param blockingValue - How long blocking operations will try before looping. (> 0) + * @param blockingUnits - The units of the {@code blockingValue}. (not null) + */ +
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167242987 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManagerDaemon.java --- @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.JAXBException; + +import org.apache.commons.daemon.Daemon; +import org.apache.commons.daemon.DaemonContext; +import org.apache.commons.daemon.DaemonInitException; +import org.apache.rya.streams.kafka.KafkaStreamsFactory; +import org.apache.rya.streams.kafka.SingleThreadKafkaStreamsFactory; +import org.apache.rya.streams.kafka.interactor.CreateKafkaTopic; +import org.apache.rya.streams.querymanager.kafka.KafkaQueryChangeLogSource; +import org.apache.rya.streams.querymanager.kafka.LocalQueryExecutor; +import org.apache.rya.streams.querymanager.xml.Kafka; +import org.apache.rya.streams.querymanager.xml.QueryManagerConfig; +import org.apache.rya.streams.querymanager.xml.QueryManagerConfig.PerformanceTunning.QueryChanngeLogDiscoveryPeriod; +import org.apache.rya.streams.querymanager.xml.QueryManagerConfigUnmarshaller; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.xml.sax.SAXException; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * JSVC integration code for a {@link QueryManager} to be used as a non-Windows daemon. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManagerDaemon implements Daemon { + +private static final Logger log = LoggerFactory.getLogger(QueryManagerDaemon.class); + +/** + * The default configuration file's path for the application. + */ +private static final Path DEFAULT_CONFIGURATION_PATH = Paths.get("config/configuration.xml"); + +/** + * Command line parameters that are used by all commands that interact with Kafka. + */ +class DaemonParameters { +@Parameter(names = {"--config", "-c"}, required = false, description = "The path to the application's configuration file.") +public String config; +} + +private QueryManager manager = null; + +@Override +public void init(final DaemonContext context) throws DaemonInitException, Exception { +requireNonNull(context); + +// Parse the command line arguments for the configuration file to use. +final String[] args = context.getArguments(); +final DaemonParameters params = new DaemonParameters(); +try { +new JCommander(params).parse(args); +} catch(final ParameterException e) { +throw new DaemonInitException("Unable to parse the command line arguments.", e); +} +final Path configFile = params.config != null ? Paths.get(params.config) : DEFAULT_CONFIGURATION_PATH; +log.info("Loading the following configuration file: " + configFile); + +// Unmarshall the configuration file into an object. +final QueryManagerConfig config; +try(InputStream stream = Files.newInputStream(configFile)) { --- End diff -- final ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167252337 --- Diff: extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java --- @@ -107,12 +112,11 @@ private String formatQueries(final Set queries) { sb.append("Queries in Rya Streams:\n"); sb.append("-\n"); queries.forEach(query -> { -sb.append("ID: "); -sb.append(query.getQueryId()); -sb.append("\t\t"); -sb.append("Query: "); -sb.append(query.getSparql()); -sb.append("\n"); +sb.append("ID: ").append(query.getQueryId()) +.append("") +.append("Is Active: ").append(query.isActive()) +.append( query.isActive() ? " " : "" ) --- End diff -- Maybe change to: .append("Is Active: ") .append(StringUtils.rightPad(query.isActive(), 9)) ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167240617 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java --- @@ -0,0 +1,884 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.ChangeLogEntry; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryChangeLogListener; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; +import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A service for managing {@link StreamsQuery} running on a Rya Streams system. + * + * Only one QueryManager needs to be running to manage any number of rya + * instances/rya streams instances. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManager extends AbstractService { +private static final Logger log = LoggerFactory.getLogger(QueryManager.class); + +/** + * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific + * Rya instnace. + */ +private final QueryChangeLogSource changeLogSource; + +/** + * The engine that is responsible for executing {@link StreamsQuery}s. + */ +private final QueryExecutor queryExecutor; + +/** + * How long blocking operations will be attempted before potentially trying again. + */ +private final long blockingValue; + +/** + * The units for {@link #blockingValue}. + */ +private final TimeUnit blockingUnits; + +/** + * Used to inform threads that the application is shutting down, so they must stop work. + */ +private final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + +/** + * This thread pool manages the two thread used to work the {@link LogEvent}s + * and the {@link QueryEvent}s. + */ +private final ExecutorService executor = Executors.newFixedThreadPool(2); + +/** + * Creates a new {@link QueryManager}. + * + * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null) + * @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null) + * @param blockingValue - How long blocking operations will try before looping. (> 0) + * @param blockingUnits - The units of the {@code blockingValue}. (not null) + */ +
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167253121 --- Diff: extras/rya.streams/client/src/main/java/org/apache/rya/streams/client/command/ListQueriesCommand.java --- @@ -107,12 +112,11 @@ private String formatQueries(final Set queries) { sb.append("Queries in Rya Streams:\n"); sb.append("-\n"); queries.forEach(query -> { -sb.append("ID: "); -sb.append(query.getQueryId()); -sb.append("\t\t"); -sb.append("Query: "); -sb.append(query.getSparql()); -sb.append("\n"); +sb.append("ID: ").append(query.getQueryId()) +.append("") +.append("Is Active: ").append(query.isActive()) +.append( query.isActive() ? " " : "" ) +.append("Query: ").append(query.getSparql()).append("\n"); --- End diff -- LogUtils.clean(query.getSparql()) ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167241001 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java --- @@ -0,0 +1,884 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.ChangeLogEntry; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryChangeLogListener; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; +import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A service for managing {@link StreamsQuery} running on a Rya Streams system. + * + * Only one QueryManager needs to be running to manage any number of rya + * instances/rya streams instances. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManager extends AbstractService { +private static final Logger log = LoggerFactory.getLogger(QueryManager.class); + +/** + * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific + * Rya instnace. + */ +private final QueryChangeLogSource changeLogSource; + +/** + * The engine that is responsible for executing {@link StreamsQuery}s. + */ +private final QueryExecutor queryExecutor; + +/** + * How long blocking operations will be attempted before potentially trying again. + */ +private final long blockingValue; + +/** + * The units for {@link #blockingValue}. + */ +private final TimeUnit blockingUnits; + +/** + * Used to inform threads that the application is shutting down, so they must stop work. + */ +private final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + +/** + * This thread pool manages the two thread used to work the {@link LogEvent}s + * and the {@link QueryEvent}s. + */ +private final ExecutorService executor = Executors.newFixedThreadPool(2); + +/** + * Creates a new {@link QueryManager}. + * + * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null) + * @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null) + * @param blockingValue - How long blocking operations will try before looping. (> 0) + * @param blockingUnits - The units of the {@code blockingValue}. (not null) + */ +
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167252837 --- Diff: extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java --- @@ -85,4 +85,19 @@ public boolean equals(final Object o) { } return false; } + +@Override +public String toString() { +final StringBuilder sb = new StringBuilder(); +sb.append("ID: "); +sb.append(getQueryId().toString() + "\n"); +sb.append("Query: "); +sb.append(getSparql() + "\n"); --- End diff -- Is "getSparql()" coming from user input? Might want to wrap that in org.apache.rya.api.log.LogUtils.clean(getSparql()) ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167235009 --- Diff: extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaTopics.java --- @@ -48,7 +51,28 @@ */ public static String queryChangeLogTopic(final String ryaInstance) { requireNonNull(ryaInstance); -return ryaInstance + "-QueryChangeLog"; +return ryaInstance + QUERY_CHANGE_LOG_TOPIC_SUFFIX; +} + +/** + * Get the Rya instance name from a Kafka topic name that has been used for a {@link QueryChnageLog}. --- End diff -- typo. QueryChangeLog ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167231162 --- Diff: extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/InMemoryQueryRepository.java --- @@ -243,8 +274,68 @@ private void updateCache() { it.close(); } } catch (final QueryChangeLogException e) { -LOG.error("Could not close the " + CloseableIteration.class.getName(), e); +log.error("Could not close the " + CloseableIteration.class.getName(), e); } + +log.trace("updateCache() - Exit"); +} +} + +@Override +protected void runOneIteration() throws Exception { +log.trace("runOneIteration() - Enter"); +lock.lock(); +try { +updateCache(); +} finally { +lock.unlock(); +} +log.trace("runOneIteration() - Exit"); --- End diff -- Should this log message be moved up into the finally block so it's called if an Exception is thrown? ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167249191 --- Diff: extras/rya.streams/query-manager/src/test/java/org/apache/rya/streams/querymanager/ThreadUtil.java --- @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +/** + * Utilities that are useful for interacting with {@link Thread}s while testing. + */ +public class ThreadUtil { --- End diff -- Add a private constructor to prevent instantiation ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167230725 --- Diff: extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/entity/StreamsQuery.java --- @@ -85,4 +85,19 @@ public boolean equals(final Object o) { } return false; } + +@Override +public String toString() { +final StringBuilder sb = new StringBuilder(); +sb.append("ID: "); +sb.append(getQueryId().toString() + "\n"); +sb.append("Query: "); +sb.append(getSparql() + "\n"); +sb.append("Is "); +if (!isActive) { +sb.append(" Not "); --- End diff -- No space is needed before "Not". The previous append has one. ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167235195 --- Diff: extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/SingleThreadKafkaStreamsFactory.java --- @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka; + +import static java.util.Objects.requireNonNull; + +import java.util.Properties; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.rya.api.function.projection.RandomUUIDFactory; +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.kafka.topology.TopologyBuilderFactory; +import org.apache.rya.streams.kafka.topology.TopologyBuilderFactory.TopologyBuilderException; +import org.apache.rya.streams.kafka.topology.TopologyFactory; +import org.openrdf.query.MalformedQueryException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Creates {@link KafkaStreams} objects that are able to process {@link StreamsQuery}s + * using a single thread of execution starting from the earliest point in within the + * input topic. The Application ID used by the client is based on the Query ID of the + * query that is being executed so that this job may resume where it left off if it + * is stopped. + */ +@DefaultAnnotation(NonNull.class) +public class SingleThreadKafkaStreamsFactory implements KafkaStreamsFactory { + +private final TopologyBuilderFactory topologyFactory = new TopologyFactory(); + +private final String bootstrapServersConfig; + +/** + * Constructs an instance of {@link SingleThreadKafkaStreamsFactory}. + * + * @param bootstrapServersConfig - Configures which Kafka cluster the jobs will interact with. (not null) + */ +public SingleThreadKafkaStreamsFactory(final String bootstrapServersConfig) { +this.bootstrapServersConfig = requireNonNull(bootstrapServersConfig); +} + +@Override +public KafkaStreams make(final String ryaInstance, final StreamsQuery query) throws KafkaStreamsFactoryException { +requireNonNull(ryaInstance); +requireNonNull(query); + +// Setup the Kafka Stream program. +final Properties streamsProps = new Properties(); + +// Configure the Kafka servers that will be talked to. +streamsProps.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServersConfig); + +// Use the Query ID as the Application ID to ensure we resume where we left off the last time this command was run. +streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "RyaStreams-Query-" + query.getQueryId()); + +// Always start at the beginning of the input topic. +streamsProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + +// Setup the topology that processes the Query. +final String statementsTopic = KafkaTopics.statementsTopic(ryaInstance); +final String resultsTopic = KafkaTopics.queryResultsTopic(query.getQueryId()); + +try { +final TopologyBuilder topologyBuilder = topologyFactory.build(query.getSparql(), statementsTopic, resultsTopic, new RandomUUIDFactory()); +return new KafkaStreams(topologyBuilder, new StreamsConfig(streamsProps)); +} catch (MalformedQueryException | TopologyBuilderException e) { --- End diff -- add final to Exceptions ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167242070 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java --- @@ -0,0 +1,884 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.ChangeLogEntry; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryChangeLogListener; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; +import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A service for managing {@link StreamsQuery} running on a Rya Streams system. + * + * Only one QueryManager needs to be running to manage any number of rya + * instances/rya streams instances. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManager extends AbstractService { +private static final Logger log = LoggerFactory.getLogger(QueryManager.class); + +/** + * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific + * Rya instnace. + */ +private final QueryChangeLogSource changeLogSource; + +/** + * The engine that is responsible for executing {@link StreamsQuery}s. + */ +private final QueryExecutor queryExecutor; + +/** + * How long blocking operations will be attempted before potentially trying again. + */ +private final long blockingValue; + +/** + * The units for {@link #blockingValue}. + */ +private final TimeUnit blockingUnits; + +/** + * Used to inform threads that the application is shutting down, so they must stop work. + */ +private final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + +/** + * This thread pool manages the two thread used to work the {@link LogEvent}s + * and the {@link QueryEvent}s. + */ +private final ExecutorService executor = Executors.newFixedThreadPool(2); + +/** + * Creates a new {@link QueryManager}. + * + * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null) + * @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null) + * @param blockingValue - How long blocking operations will try before looping. (> 0) + * @param blockingUnits - The units of the {@code blockingValue}. (not null) + */ +
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167243543 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/kafka/KafkaQueryChangeLogSource.java --- @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager.kafka; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.rya.streams.kafka.KafkaTopics; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLogFactory; +import org.apache.rya.streams.querymanager.QueryChangeLogSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.AbstractScheduledService; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Represents a Kafka Server that manages {@link KafkaQueryChangeLog}s. + * + * Thread safe. --- End diff -- Change to @ThreadSafe ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167240341 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java --- @@ -0,0 +1,884 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.ChangeLogEntry; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryChangeLogListener; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; +import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A service for managing {@link StreamsQuery} running on a Rya Streams system. + * + * Only one QueryManager needs to be running to manage any number of rya + * instances/rya streams instances. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManager extends AbstractService { +private static final Logger log = LoggerFactory.getLogger(QueryManager.class); + +/** + * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific + * Rya instnace. + */ +private final QueryChangeLogSource changeLogSource; + +/** + * The engine that is responsible for executing {@link StreamsQuery}s. + */ +private final QueryExecutor queryExecutor; + +/** + * How long blocking operations will be attempted before potentially trying again. + */ +private final long blockingValue; + +/** + * The units for {@link #blockingValue}. + */ +private final TimeUnit blockingUnits; + +/** + * Used to inform threads that the application is shutting down, so they must stop work. + */ +private final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + +/** + * This thread pool manages the two thread used to work the {@link LogEvent}s + * and the {@link QueryEvent}s. + */ +private final ExecutorService executor = Executors.newFixedThreadPool(2); + +/** + * Creates a new {@link QueryManager}. + * + * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null) + * @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null) + * @param blockingValue - How long blocking operations will try before looping. (> 0) + * @param blockingUnits - The units of the {@code blockingValue}. (not null) + */ +
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167236812 --- Diff: extras/rya.streams/query-manager/pom.xml --- @@ -0,0 +1,253 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + + +org.apache.rya +rya.streams.parent +3.2.12-incubating-SNAPSHOT + + +4.0.0 +rya.streams.query-manager + +Apache Rya Streams Query Manager + +This module contains the Rya Streams Query Manager. + + + + ${project.build.directory}/${project.artifactId}-${project.version}-rpm-staging + + + + + +org.apache.rya +rya.streams.kafka + + + + +commons-daemon +commons-daemon +1.1.0 + + +org.slf4j +slf4j-log4j12 + + +com.beust +jcommander + + + + +junit +junit +test + + +org.apache.rya +rya.test.kafka +test + + +org.mockito +mockito-all +test + + + + + + +src/main/xsd + + + + + + +org.codehaus.mojo +jaxb2-maven-plugin + + +xjc + +xjc + + + + + org.apache.rya.streams.querymanager.xml + + + + + +com.mycila +license-maven-plugin + + ${project.basedir}/src/license/header.txt + + + +update-generated-source-headers + + ${project.build.directory}/generated-sources/jaxb + + XML_STYLE + + +process-sources + +format + + + + + + + +org.apache.maven.plugins +maven-shade-plugin + + +package + + shade + + + + + org.apache.rya.streams.querymanager.QueryManagerDaemon + + + + + + + + + + +org.apache.maven.plugins +maven-assembly-plugin + + + +stage-content-for-rpms + +single + +package + + +false + + src/assembly/rpm-staging.xml + + + + + + + +org.codehaus.mojo +rpm-maven-plugin + + +set-rpm-properties + +version + + + +create-rpm-distribution --- End diff -- We'll
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167242868 --- Diff: extras/rya.streams/query-manager/src/main/java/org/apache/rya/streams/querymanager/QueryManager.java --- @@ -0,0 +1,884 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.querymanager; + +import static java.util.Objects.requireNonNull; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.queries.ChangeLogEntry; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryChangeLogListener; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.querymanager.QueryChangeLogSource.SourceListener; +import org.apache.rya.streams.querymanager.QueryExecutor.QueryExecutorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; +import com.google.common.util.concurrent.AbstractService; +import com.google.common.util.concurrent.UncheckedExecutionException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A service for managing {@link StreamsQuery} running on a Rya Streams system. + * + * Only one QueryManager needs to be running to manage any number of rya + * instances/rya streams instances. + */ +@DefaultAnnotation(NonNull.class) +public class QueryManager extends AbstractService { +private static final Logger log = LoggerFactory.getLogger(QueryManager.class); + +/** + * The source of {@link QueryChangeLog}s. Each log discovered is bound to a specific + * Rya instnace. + */ +private final QueryChangeLogSource changeLogSource; + +/** + * The engine that is responsible for executing {@link StreamsQuery}s. + */ +private final QueryExecutor queryExecutor; + +/** + * How long blocking operations will be attempted before potentially trying again. + */ +private final long blockingValue; + +/** + * The units for {@link #blockingValue}. + */ +private final TimeUnit blockingUnits; + +/** + * Used to inform threads that the application is shutting down, so they must stop work. + */ +private final AtomicBoolean shutdownSignal = new AtomicBoolean(false); + +/** + * This thread pool manages the two thread used to work the {@link LogEvent}s + * and the {@link QueryEvent}s. + */ +private final ExecutorService executor = Executors.newFixedThreadPool(2); + +/** + * Creates a new {@link QueryManager}. + * + * @param queryExecutor - Runs the active {@link StreamsQuery}s. (not null) + * @param source - The {@link QueryChangeLogSource} of QueryChangeLogs. (not null) + * @param blockingValue - How long blocking operations will try before looping. (> 0) + * @param blockingUnits - The units of the {@code blockingValue}. (not null) + */ +
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167232266 --- Diff: extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java --- @@ -110,6 +110,16 @@ public boolean equals(final Object o) { return false; } +@Override +public String toString() { +return "QueryChange: {" + + "Query ID: " + queryId + ",\n" + + "Change Type: " + changeType + ",\n" + + "Is Active: " + isActive + ",\n" + + "SPARQL: " + sparql + "\n" + --- End diff -- Is "sparql" coming from user input? Might want to wrap that in org.apache.rya.api.log.LogUtils.clean(sparql) ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/272#discussion_r167234267 --- Diff: extras/rya.streams/api/src/test/java/org/apache/rya/streams/api/queries/InMemoryQueryRepositoryTest.java --- @@ -85,11 +92,16 @@ public void initializedWithPopulatedChangeLog() throws Exception { queries.delete( deletedMeId ); // Create a new totally in memory QueryRepository. -try(final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog )) { +final QueryRepository initializedQueries = new InMemoryQueryRepository( changeLog, SCHEDULE ); +try { // Listing the queries should work using an initialized change log. final Set stored = initializedQueries.list(); assertEquals(expected, stored); +} finally { +queries.stop(); --- End diff -- This seems redundant with the outer try-finally block calling stop(). Unless the intent is to test it handling 2 consecutive calls to stop(). The inner try-finally can probably be removed. ---
[GitHub] incubator-rya pull request #272: RYA-443 Rya Streams Query Manager daemon pr...
GitHub user kchilton2 opened a pull request: https://github.com/apache/incubator-rya/pull/272 RYA-443 Rya Streams Query Manager daemon program. ## Description Implemented an RPM installed CentOS 7 daemon service used to execute Kafka Streams jobs on the local machine. This is part of Rya Streams. ### Tests Wrote automated tests and executed the application on a CentOS 7 machine using the RPM. ### Links https://issues.apache.org/jira/browse/RYA-443 You can merge this pull request into a Git repository by running: $ git pull https://github.com/kchilton2/incubator-rya RYA-443_rya-streams-querymanager Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-rya/pull/272.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #272 commit 6688c2ece024132d3dc6903af9d1003079d330eb Author: Andrew SmithDate: 2018-01-16T17:39:34Z RYA-446 Daemonifying Query Manager commit 8803d7d99f8c7189991be181dd11bd0fb9f1e704 Author: Andrew Smith Date: 2018-01-18T22:51:40Z RYA-449 Create QueryChangeLogSource commit c048db73a5211a18ab649a2647dfa7f7fb929166 Author: Andrew Smith Date: 2018-01-19T20:43:55Z Rya 454 added QueryExecutor interface commit 0dad1290d593065561fae079b837fcbc479795ed Author: Andrew Smith Date: 2018-01-23T20:20:50Z Rya 452 Updated QueryRepository Updated QueryRepository to be a Service Updated InMemoryQueryRepository to be an AbstractScheduledService Added listeners to InMemoryQueryRepository commit 3810fb709a4a97ce729e93733edd9af10d6e5445 Author: Andrew Smith Date: 2018-01-23T20:44:32Z RYA-455 stopAll queries for a rya instance commit e774ef02161686d9c2f0c6e99ea3b8a438fd240a Author: kchilton2 Date: 2018-01-23T20:50:17Z RYA-448 Implement JAXB marshalling code for the Query Manager's XML configuration file. commit 13cfd43ba3609c0690aeebe27eb7b76a88d2ddbd Author: kchilton2 Date: 2018-01-23T21:17:06Z RYA-450 Implemented a Kafka backed QueryChangeLogSource. commit eee8d830abc95e4082d906fe7e0dad5944d96b87 Author: kchilton2 Date: 2018-01-26T17:06:07Z RYA-458 Updating the configuration XML so that you may only specify a single QueryChangeLogSource. commit 815d995c359ae15b5669c1e7c7e375d7c67f8dec Author: kchilton2 Date: 2018-01-26T20:55:59Z RYA-456 Implement a Single Node implementation of QueryExecutor. commit ba2898a8c4ba58ad3e38323f28389f75ab262c83 Author: Andrew Smith Date: 2018-01-30T19:01:54Z Rya 451 Query manager QueryManager with tests updated InMemoryQueryRepository and its tests commit 34e616ce63ac4b93a64ca11a10cde148760afc52 Author: kchilton2 Date: 2018-01-30T19:44:58Z RYA-453 Implement the Query Manager's Daemon that controls the application. commit 8d35b4541bd39734cc68c7e870027760e537d1e7 Author: kchilton2 Date: 2018-01-25T22:19:58Z RYA-446 Create a bin and rpm distribution for the Rya Streams Query Manager application. commit 4f4e0f4ce0d2eb2d8651b8f569a8f59c8eaaf571 Author: Andrew Smith Date: 2018-02-01T21:04:48Z RYA-446 Service unit file for systemd commit 4a4456e7cfd18fdd9044b67152252b3623ebe466 Author: kchilton2 Date: 2018-02-01T23:31:41Z RYA-446 Making the Rya Streams Query Manager run as a service on CentOS 7. commit b889bc0d60e3884ea31edbcb12fcc21331f77963 Author: kchilton2 Date: 2018-02-03T03:47:59Z RYA-451 Fixing threading issues with the QueryManager class. ---