[jira] [Commented] (RYA-440) Rya Streams: Integration with the existing Rya Shell.
[ https://issues.apache.org/jira/browse/RYA-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16343973#comment-16343973 ] ASF GitHub Bot commented on RYA-440: Github user asfgit closed the pull request at: https://github.com/apache/incubator-rya/pull/267 > Rya Streams: Integration with the existing Rya Shell. > - > > Key: RYA-440 > URL: https://issues.apache.org/jira/browse/RYA-440 > Project: Rya > Issue Type: Task >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > > We need a way to interact with Rya Streams. The shell should include the > following commands: > * *streams-configure*: Lets you change which Rya Streams system an instance > of Rya is configured to use. For example, if you are using Kafka Streams, > this command would let you configure where the Kafka services are found. > * *streams-details*: Prints to the screen the currently configured values. > * *streams-queries-add*: Tells Rya Streams that it needs to maintain a query > * *streams-queries-delete*: Tells Rya Streams that it no longer needs to > maintain a query. > * *streams-queries-start*: Tells Rya Streams to start maintaining a query. > * *streams-queries-stop*: Tells Rya Streams to stop maintaining a query. > * *streams-queries-list*: Lists the queries that are being maintained by Rya > Streams as well as their start/stop state. > * *streams-queries-details*: Prints details about the query. > For this iteration, no PCJ integration will be supported. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-440) Rya Streams: Integration with the existing Rya Shell.
[ https://issues.apache.org/jira/browse/RYA-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16343817#comment-16343817 ] ASF GitHub Bot commented on RYA-440: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/267#discussion_r164528601 --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfigurationBase.java --- @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client; + +import static java.util.Objects.requireNonNull; + +import org.apache.rya.api.instance.RyaDetails; +import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails; +import org.apache.rya.api.instance.RyaDetailsRepository; +import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; +import org.apache.rya.api.instance.RyaDetailsUpdater; +import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A base class that implements the core functionality of the {@link SetRyaStreamsConfiguration} interactor. + * Subclasses just need to implement {@link #getRyaDetailsRepo(String)} so that the common code may update + * any implementation of that repository. + */ +@DefaultAnnotation(NonNull.class) +public abstract class SetRyaStreamsConfigurationBase implements SetRyaStreamsConfiguration { + +private final InstanceExists instanceExists; + +/** + * Constructs an instance of {@link SetRyaStreamsConfigurationBase}. + * + * @param instanceExists - The interactor used to verify Rya instances exist. (not null) + */ +public SetRyaStreamsConfigurationBase(final InstanceExists instanceExists) { +this.instanceExists = requireNonNull(instanceExists); +} + +/** + * Get a {@link RyaDetailsRepository} that is connected to a specific instance of Rya. + * + * @param ryaInstance - The Rya instance the repository must be connected to. (not null) + * @return A {@link RyaDetailsRepository} connected to the specified Rya instance. + */ +protected abstract RyaDetailsRepository getRyaDetailsRepo(String ryaInstance); + +@Override +public void setRyaStreamsConfiguration(final String ryaInstance, final RyaStreamsDetails streamsDetails) throws InstanceDoesNotExistException, RyaClientException{ --- End diff -- done > Rya Streams: Integration with the existing Rya Shell. > - > > Key: RYA-440 > URL: https://issues.apache.org/jira/browse/RYA-440 > Project: Rya > Issue Type: Task >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > > We need a way to interact with Rya Streams. The shell should include the > following commands: > * *streams-configure*: Lets you change which Rya Streams system an instance > of Rya is configured to use. For example, if you are using Kafka Streams, > this command would let you configure where the Kafka services are found. > * *streams-details*: Prints to the screen the currently configured values. > * *streams-queries-add*: Tells Rya Streams that it needs to maintain a query > * *streams-queries-delete*: Tells Rya Streams that it no longer needs to > maintain a query. > * *streams-queries-start*: Tells Rya Streams to start maintaining a query. > * *streams-queries-stop*: Tells Rya Streams to stop maintaining a query. > * *streams-queries-list*: Lists the queries that are being maintained by Rya > Streams as well as their start/stop state. > * *streams-queries-details*: Prints details about the query. > For this iteration, no PCJ integration will be supported. -- This message was
[jira] [Commented] (RYA-440) Rya Streams: Integration with the existing Rya Shell.
[ https://issues.apache.org/jira/browse/RYA-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16343818#comment-16343818 ] ASF GitHub Bot commented on RYA-440: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/267#discussion_r164528626 --- Diff: extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java --- @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka; + +import static java.util.Objects.requireNonNull; + +import java.util.Properties; +import java.util.UUID; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.streams.api.RyaStreamsClient; +import org.apache.rya.streams.api.interactor.defaults.DefaultAddQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultDeleteQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultGetQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultListQueries; +import org.apache.rya.streams.api.interactor.defaults.DefaultStartQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultStopQuery; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Constructs instances of {@link RyaStreamsClient} that are connected to a Kafka cluster. + */ +@DefaultAnnotation(NonNull.class) +public final class KafkaRyaStreamsClientFactory { +private static final Logger log = LoggerFactory.getLogger(KafkaRyaStreamsClientFactory.class); + +/** + * Initialize a {@link RyaStreamsClient} that will interact with an instance of Rya Streams + * that is backed by Kafka. + * + * @param ryaInstance - The name of the Rya Instance the client is connected to. (not null) + * @param kafkaHostname - The hostname of the Kafka Broker. + * @param kafkaPort - The port of the Kafka Broker. + * @return The initialized commands. + */ +public static RyaStreamsClient make( +final String ryaInstance, +final String kafkaHostname, +final int kafkaPort) { +requireNonNull(kafkaHostname); --- End diff -- done > Rya Streams: Integration with the existing Rya Shell. > - > > Key: RYA-440 > URL: https://issues.apache.org/jira/browse/RYA-440 >
[jira] [Commented] (RYA-440) Rya Streams: Integration with the existing Rya Shell.
[ https://issues.apache.org/jira/browse/RYA-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16341591#comment-16341591 ] ASF GitHub Bot commented on RYA-440: Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/267#discussion_r164219745 --- Diff: extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java --- @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka; + +import static java.util.Objects.requireNonNull; + +import java.util.Properties; +import java.util.UUID; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.streams.api.RyaStreamsClient; +import org.apache.rya.streams.api.interactor.defaults.DefaultAddQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultDeleteQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultGetQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultListQueries; +import org.apache.rya.streams.api.interactor.defaults.DefaultStartQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultStopQuery; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Constructs instances of {@link RyaStreamsClient} that are connected to a Kafka cluster. + */ +@DefaultAnnotation(NonNull.class) +public final class KafkaRyaStreamsClientFactory { +private static final Logger log = LoggerFactory.getLogger(KafkaRyaStreamsClientFactory.class); + +/** + * Initialize a {@link RyaStreamsClient} that will interact with an instance of Rya Streams + * that is backed by Kafka. + * + * @param ryaInstance - The name of the Rya Instance the client is connected to. (not null) + * @param kafkaHostname - The hostname of the Kafka Broker. + * @param kafkaPort - The port of the Kafka Broker. + * @return The initialized commands. + */ +public static RyaStreamsClient make( +final String ryaInstance, +final String kafkaHostname, +final int kafkaPort) { +requireNonNull(kafkaHostname); --- End diff -- Null check on ryaInstance > Rya Streams: Integration with the existing Rya Shell. > - > > Key: RYA-440 > URL:
[jira] [Commented] (RYA-440) Rya Streams: Integration with the existing Rya Shell.
[ https://issues.apache.org/jira/browse/RYA-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16341592#comment-16341592 ] ASF GitHub Bot commented on RYA-440: Github user meiercaleb commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/267#discussion_r164218885 --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/SetRyaStreamsConfigurationBase.java --- @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client; + +import static java.util.Objects.requireNonNull; + +import org.apache.rya.api.instance.RyaDetails; +import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails; +import org.apache.rya.api.instance.RyaDetailsRepository; +import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; +import org.apache.rya.api.instance.RyaDetailsUpdater; +import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A base class that implements the core functionality of the {@link SetRyaStreamsConfiguration} interactor. + * Subclasses just need to implement {@link #getRyaDetailsRepo(String)} so that the common code may update + * any implementation of that repository. + */ +@DefaultAnnotation(NonNull.class) +public abstract class SetRyaStreamsConfigurationBase implements SetRyaStreamsConfiguration { + +private final InstanceExists instanceExists; + +/** + * Constructs an instance of {@link SetRyaStreamsConfigurationBase}. + * + * @param instanceExists - The interactor used to verify Rya instances exist. (not null) + */ +public SetRyaStreamsConfigurationBase(final InstanceExists instanceExists) { +this.instanceExists = requireNonNull(instanceExists); +} + +/** + * Get a {@link RyaDetailsRepository} that is connected to a specific instance of Rya. + * + * @param ryaInstance - The Rya instance the repository must be connected to. (not null) + * @return A {@link RyaDetailsRepository} connected to the specified Rya instance. + */ +protected abstract RyaDetailsRepository getRyaDetailsRepo(String ryaInstance); + +@Override +public void setRyaStreamsConfiguration(final String ryaInstance, final RyaStreamsDetails streamsDetails) throws InstanceDoesNotExistException, RyaClientException{ --- End diff -- Null check on ryaInstance > Rya Streams: Integration with the existing Rya Shell. > - > > Key: RYA-440 > URL: https://issues.apache.org/jira/browse/RYA-440 > Project: Rya > Issue Type: Task >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > > We need a way to interact with Rya Streams. The shell should include the > following commands: > * *streams-configure*: Lets you change which Rya Streams system an instance > of Rya is configured to use. For example, if you are using Kafka Streams, > this command would let you configure where the Kafka services are found. > * *streams-details*: Prints to the screen the currently configured values. > * *streams-queries-add*: Tells Rya Streams that it needs to maintain a query > * *streams-queries-delete*: Tells Rya Streams that it no longer needs to > maintain a query. > * *streams-queries-start*: Tells Rya Streams to start maintaining a query. > * *streams-queries-stop*: Tells Rya Streams to stop maintaining a query. > * *streams-queries-list*: Lists the queries that are being maintained by Rya > Streams as well as their start/stop state. > * *streams-queries-details*: Prints details about the query. > For this iteration, no PCJ integration will be supported.
[jira] [Commented] (RYA-440) Rya Streams: Integration with the existing Rya Shell.
[ https://issues.apache.org/jira/browse/RYA-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335180#comment-16335180 ] ASF GitHub Bot commented on RYA-440: Github user asfgit commented on the issue: https://github.com/apache/incubator-rya/pull/267 Refer to this link for build results (access rights to CI server needed): https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/677/ > Rya Streams: Integration with the existing Rya Shell. > - > > Key: RYA-440 > URL: https://issues.apache.org/jira/browse/RYA-440 > Project: Rya > Issue Type: Task >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > > We need a way to interact with Rya Streams. The shell should include the > following commands: > * *streams-configure*: Lets you change which Rya Streams system an instance > of Rya is configured to use. For example, if you are using Kafka Streams, > this command would let you configure where the Kafka services are found. > * *streams-details*: Prints to the screen the currently configured values. > * *streams-queries-add*: Tells Rya Streams that it needs to maintain a query > * *streams-queries-delete*: Tells Rya Streams that it no longer needs to > maintain a query. > * *streams-queries-start*: Tells Rya Streams to start maintaining a query. > * *streams-queries-stop*: Tells Rya Streams to stop maintaining a query. > * *streams-queries-list*: Lists the queries that are being maintained by Rya > Streams as well as their start/stop state. > * *streams-queries-details*: Prints details about the query. > For this iteration, no PCJ integration will be supported. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-440) Rya Streams: Integration with the existing Rya Shell.
[ https://issues.apache.org/jira/browse/RYA-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335151#comment-16335151 ] ASF GitHub Bot commented on RYA-440: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/267#discussion_r163101913 --- Diff: extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfigurationIT.java --- @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.accumulo; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import org.apache.rya.accumulo.AccumuloITBase; +import org.apache.rya.api.client.Install; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.InstanceDoesNotExistException; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails; +import org.junit.Test; + +/** + * Integration tests the methods of {@link AccumuloSetRyaStreamsConfiguration}. + */ +public class AccumuloSetRyaStreamsConfigurationIT extends AccumuloITBase { + +@Test(expected = InstanceDoesNotExistException.class) +public void instanceDoesNotExist() throws Exception { +final String ryaInstance = getRyaInstanceName(); +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +getUsername(), +getPassword().toCharArray(), +getInstanceName(), +getZookeepers()); +final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector()); + +// Skip the install step to create error causing situation. +final RyaStreamsDetails details = new RyaStreamsDetails("localhost", 6); + ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, details); +} + +@Test +public void updatesRyaDetails() throws Exception { +final String ryaInstance = getRyaInstanceName(); +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +getUsername(), +getPassword().toCharArray(), +getInstanceName(), +getZookeepers()); +final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector()); + +// Install an instance of Rya. +final Install installRya = ryaClient.getInstall(); +final InstallConfiguration installConf = InstallConfiguration.builder() +.build(); +installRya.install(ryaInstance, installConf); + +// Fetch its details and show they do not have any RyaStreamsDetails. +com.google.common.base.Optional streamsDetails = --- End diff -- You can open a ticket to change that if you have an improvement in mind. > Rya Streams: Integration with the existing Rya Shell. > - > > Key: RYA-440 > URL: https://issues.apache.org/jira/browse/RYA-440 > Project: Rya > Issue Type: Task >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > > We need a way to interact with Rya Streams. The shell should include the > following commands: > * *streams-configure*: Lets you change which Rya Streams system an instance > of Rya is configured to use. For example, if you are using Kafka Streams, > this command would let you configure where the Kafka services are found. > * *streams-details*: Prints to the screen the currently configured values. > * *streams-queries-add*: Tells Rya Streams that it needs to maintain a query > * *streams-queries-delete*: Tells Rya Streams that it no longer needs to
[jira] [Commented] (RYA-440) Rya Streams: Integration with the existing Rya Shell.
[ https://issues.apache.org/jira/browse/RYA-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335154#comment-16335154 ] ASF GitHub Bot commented on RYA-440: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/267#discussion_r163101966 --- Diff: extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java --- @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka; + +import static java.util.Objects.requireNonNull; + +import java.util.Properties; +import java.util.UUID; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.streams.api.RyaStreamsClient; +import org.apache.rya.streams.api.interactor.defaults.DefaultAddQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultDeleteQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultGetQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultListQueries; +import org.apache.rya.streams.api.interactor.defaults.DefaultStartQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultStopQuery; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Constructs instances of {@link RyaStreamsClient} that are connected to a Kafka cluster. + */ +@DefaultAnnotation(NonNull.class) +public final class KafkaRyaStreamsClientFactory { +private static final Logger log = LoggerFactory.getLogger(KafkaRyaStreamsClientFactory.class); + +/** + * Initialize a {@link RyaStreamsClient} that will interact with an instance of Rya Streams + * that is backed by Kafka. + * + * @param ryaInstance - The name of the Rya Instance the client is connected to. (not null) + * @param kafkaHostname - The hostname of the Kafka Broker. + * @param kafkaPort - The port of the Kafka Broker. + * @return The initialized commands. + */ +public static RyaStreamsClient make( +final String ryaInstance, +final String kafkaHostname, +final int kafkaPort) { +requireNonNull(kafkaHostname); + +// Setup Query Repository used by the Kafka Rya Streams subsystem. +final Producer queryProducer = +makeProducer(kafkaHostname, kafkaPort, StringSerializer.class, QueryChangeSerializer.class); +
[jira] [Commented] (RYA-440) Rya Streams: Integration with the existing Rya Shell.
[ https://issues.apache.org/jira/browse/RYA-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335141#comment-16335141 ] ASF GitHub Bot commented on RYA-440: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/267#discussion_r163100423 --- Diff: extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java --- @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka; + +import static java.util.Objects.requireNonNull; + +import java.util.Properties; +import java.util.UUID; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.streams.api.RyaStreamsClient; +import org.apache.rya.streams.api.interactor.defaults.DefaultAddQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultDeleteQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultGetQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultListQueries; +import org.apache.rya.streams.api.interactor.defaults.DefaultStartQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultStopQuery; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Constructs instances of {@link RyaStreamsClient} that are connected to a Kafka cluster. + */ +@DefaultAnnotation(NonNull.class) +public final class KafkaRyaStreamsClientFactory { +private static final Logger log = LoggerFactory.getLogger(KafkaRyaStreamsClientFactory.class); + +/** + * Initialize a {@link RyaStreamsClient} that will interact with an instance of Rya Streams + * that is backed by Kafka. + * + * @param ryaInstance - The name of the Rya Instance the client is connected to. (not null) + * @param kafkaHostname - The hostname of the Kafka Broker. + * @param kafkaPort - The port of the Kafka Broker. + * @return The initialized commands. + */ +public static RyaStreamsClient make( +final String ryaInstance, +final String kafkaHostname, +final int kafkaPort) { +requireNonNull(kafkaHostname); + +// Setup Query Repository used by the Kafka Rya Streams subsystem. +final Producer queryProducer = +makeProducer(kafkaHostname, kafkaPort, StringSerializer.class, QueryChangeSerializer.class); +
[jira] [Commented] (RYA-440) Rya Streams: Integration with the existing Rya Shell.
[ https://issues.apache.org/jira/browse/RYA-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335140#comment-16335140 ] ASF GitHub Bot commented on RYA-440: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/267#discussion_r163100365 --- Diff: extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfigurationIT.java --- @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.accumulo; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import org.apache.rya.accumulo.AccumuloITBase; +import org.apache.rya.api.client.Install; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.InstanceDoesNotExistException; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails; +import org.junit.Test; + +/** + * Integration tests the methods of {@link AccumuloSetRyaStreamsConfiguration}. + */ +public class AccumuloSetRyaStreamsConfigurationIT extends AccumuloITBase { + +@Test(expected = InstanceDoesNotExistException.class) +public void instanceDoesNotExist() throws Exception { +final String ryaInstance = getRyaInstanceName(); +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +getUsername(), +getPassword().toCharArray(), +getInstanceName(), +getZookeepers()); +final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector()); + +// Skip the install step to create error causing situation. +final RyaStreamsDetails details = new RyaStreamsDetails("localhost", 6); + ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, details); +} + +@Test +public void updatesRyaDetails() throws Exception { +final String ryaInstance = getRyaInstanceName(); +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +getUsername(), +getPassword().toCharArray(), +getInstanceName(), +getZookeepers()); +final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector()); + +// Install an instance of Rya. +final Install installRya = ryaClient.getInstall(); +final InstallConfiguration installConf = InstallConfiguration.builder() +.build(); +installRya.install(ryaInstance, installConf); + +// Fetch its details and show they do not have any RyaStreamsDetails. +com.google.common.base.Optional streamsDetails = --- End diff -- yeah, still annoying we use 2 > Rya Streams: Integration with the existing Rya Shell. > - > > Key: RYA-440 > URL: https://issues.apache.org/jira/browse/RYA-440 > Project: Rya > Issue Type: Task >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > > We need a way to interact with Rya Streams. The shell should include the > following commands: > * *streams-configure*: Lets you change which Rya Streams system an instance > of Rya is configured to use. For example, if you are using Kafka Streams, > this command would let you configure where the Kafka services are found. > * *streams-details*: Prints to the screen the currently configured values. > * *streams-queries-add*: Tells Rya Streams that it needs to maintain a query > * *streams-queries-delete*: Tells Rya Streams that it no longer needs to > maintain a query. > *
[jira] [Commented] (RYA-440) Rya Streams: Integration with the existing Rya Shell.
[ https://issues.apache.org/jira/browse/RYA-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335139#comment-16335139 ] ASF GitHub Bot commented on RYA-440: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/267#discussion_r163100285 --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfiguration.java --- @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.mongo; + +import static java.util.Objects.requireNonNull; + +import org.apache.rya.api.client.InstanceExists; +import org.apache.rya.api.client.SetRyaStreamsConfiguration; +import org.apache.rya.api.client.SetRyaStreamsConfigurationBase; +import org.apache.rya.api.instance.RyaDetailsRepository; +import org.apache.rya.mongodb.instance.MongoRyaInstanceDetailsRepository; + +import com.mongodb.MongoClient; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A MongoDB implementation of {@link SetRyaStreamsConfiguration}. + */ +@DefaultAnnotation(NonNull.class) +public class MongoSetRyaStreamsConfiguration extends SetRyaStreamsConfigurationBase { + +private final MongoClient client; + +/** + * Constructs an instance of {@link MongoSetRyaStreamsConfiguration}. + * + * @param instanceExists - The interactor used to verify Rya instances exist. (not null) + * @param client - The MongoDB client used to connect to the Rya storage. (not null) + */ +public MongoSetRyaStreamsConfiguration( +final InstanceExists instanceExists, +final MongoClient client) { +super(instanceExists); +this.client = requireNonNull(client); +} + +@Override +protected RyaDetailsRepository getRyaDetailsRepo(final String ryaInstance) { +requireNonNull(ryaInstance); +return new MongoRyaInstanceDetailsRepository(client, ryaInstance); --- End diff -- fair enough > Rya Streams: Integration with the existing Rya Shell. > - > > Key: RYA-440 > URL: https://issues.apache.org/jira/browse/RYA-440 > Project: Rya > Issue Type: Task >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > > We need a way to interact with Rya Streams. The shell should include the > following commands: > * *streams-configure*: Lets you change which Rya Streams system an instance > of Rya is configured to use. For example, if you are using Kafka Streams, > this command would let you configure where the Kafka services are found. > * *streams-details*: Prints to the screen the currently configured values. > * *streams-queries-add*: Tells Rya Streams that it needs to maintain a query > * *streams-queries-delete*: Tells Rya Streams that it no longer needs to > maintain a query. > * *streams-queries-start*: Tells Rya Streams to start maintaining a query. > * *streams-queries-stop*: Tells Rya Streams to stop maintaining a query. > * *streams-queries-list*: Lists the queries that are being maintained by Rya > Streams as well as their start/stop state. > * *streams-queries-details*: Prints details about the query. > For this iteration, no PCJ integration will be supported. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-440) Rya Streams: Integration with the existing Rya Shell.
[ https://issues.apache.org/jira/browse/RYA-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335136#comment-16335136 ] ASF GitHub Bot commented on RYA-440: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/267#discussion_r163099916 --- Diff: extras/shell/src/test/java/org/apache/rya/shell/util/StreamsQueryFormatterTest.java --- @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.shell.util; + +import static org.junit.Assert.assertEquals; + +import java.util.Set; +import java.util.UUID; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.junit.Test; + +import com.google.common.collect.Sets; + +/** + * Unit tests the methods of {@link StreamsQueryFormatter}. + */ +public class StreamsQueryFormatterTest { + +@Test +public void formatQuery() throws Exception { +// Format the query. +final StreamsQuery query = new StreamsQuery( +UUID.fromString("da55cea5-c21c-46a5-ab79-5433eef4efaa"), +"SELECT * WHERE { ?a ?b ?c . }", +true); +final String formatted = StreamsQueryFormatter.format(query); + +// Ensure it has the expected format. +final String expected = +" Query ID: da55cea5-c21c-46a5-ab79-5433eef4efaa\n" + +"Is Active: true\n" + +" SPARQL: select ?a ?b ?c\n" + +" where {\n" + +" ?a ?b ?c.\n" + +" }\n"; + +assertEquals(expected, formatted); +} + +@Test +public void formatQueries() throws Exception { +// Format the queries. +final Set queries = Sets.newHashSet( +new StreamsQuery( + UUID.fromString("----"), +"SELECT * WHERE { ?person ?business . }", +true), +new StreamsQuery( + UUID.fromString("----"), +"SELECT * WHERE { ?a ?b ?c . }", +true), +new StreamsQuery( + UUID.fromString("----"), +"SELECT * WHERE { ?d ?e ?f . }", +false)); + +final String formatted = StreamsQueryFormatter.format(queries); +System.out.println(formatted); --- End diff -- done > Rya Streams: Integration with the existing Rya Shell. > - > > Key: RYA-440 > URL: https://issues.apache.org/jira/browse/RYA-440 > Project: Rya > Issue Type: Task >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > > We need a way to interact with Rya Streams. The shell should include the > following commands: > * *streams-configure*: Lets you change which Rya Streams system an instance > of Rya is configured to use. For example, if you are using Kafka Streams, > this command would let you configure where the Kafka services are found. > * *streams-details*: Prints to the screen the currently configured values. > * *streams-queries-add*: Tells Rya Streams that it needs to maintain a query > * *streams-queries-delete*: Tells Rya Streams that it no longer needs to > maintain a query. > * *streams-queries-start*: Tells Rya Streams to start maintaining a query. > * *streams-queries-stop*: Tells Rya Streams to stop maintaining a query. > * *streams-queries-list*: Lists the queries that are being maintained by Rya > Streams as well as their start/stop state. > * *streams-queries-details*: Prints details about
[jira] [Commented] (RYA-440) Rya Streams: Integration with the existing Rya Shell.
[ https://issues.apache.org/jira/browse/RYA-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335135#comment-16335135 ] ASF GitHub Bot commented on RYA-440: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/267#discussion_r163099833 --- Diff: extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultGetQuery.java --- @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.api.interactor.defaults; + +import static java.util.Objects.requireNonNull; + +import java.util.Optional; +import java.util.UUID; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.exception.RyaStreamsException; +import org.apache.rya.streams.api.interactor.GetQuery; +import org.apache.rya.streams.api.queries.QueryRepository; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Get a {@link StreamsQuery} from Rya Streams. + */ +@DefaultAnnotation(NonNull.class) +public class DefaultGetQuery implements GetQuery { +private final QueryRepository repository; + +public DefaultGetQuery(final QueryRepository repository) { --- End diff -- done > Rya Streams: Integration with the existing Rya Shell. > - > > Key: RYA-440 > URL: https://issues.apache.org/jira/browse/RYA-440 > Project: Rya > Issue Type: Task >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > > We need a way to interact with Rya Streams. The shell should include the > following commands: > * *streams-configure*: Lets you change which Rya Streams system an instance > of Rya is configured to use. For example, if you are using Kafka Streams, > this command would let you configure where the Kafka services are found. > * *streams-details*: Prints to the screen the currently configured values. > * *streams-queries-add*: Tells Rya Streams that it needs to maintain a query > * *streams-queries-delete*: Tells Rya Streams that it no longer needs to > maintain a query. > * *streams-queries-start*: Tells Rya Streams to start maintaining a query. > * *streams-queries-stop*: Tells Rya Streams to stop maintaining a query. > * *streams-queries-list*: Lists the queries that are being maintained by Rya > Streams as well as their start/stop state. > * *streams-queries-details*: Prints details about the query. > For this iteration, no PCJ integration will be supported. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-440) Rya Streams: Integration with the existing Rya Shell.
[ https://issues.apache.org/jira/browse/RYA-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335133#comment-16335133 ] ASF GitHub Bot commented on RYA-440: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/267#discussion_r163099651 --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfiguration.java --- @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import org.apache.accumulo.core.client.Connector; +import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; +import org.apache.rya.api.client.InstanceExists; +import org.apache.rya.api.client.SetRyaStreamsConfiguration; +import org.apache.rya.api.client.SetRyaStreamsConfigurationBase; +import org.apache.rya.api.instance.RyaDetailsRepository; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Accumulo implementation of {@link SetRyaStreamsConfiguration}. --- End diff -- done > Rya Streams: Integration with the existing Rya Shell. > - > > Key: RYA-440 > URL: https://issues.apache.org/jira/browse/RYA-440 > Project: Rya > Issue Type: Task >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > > We need a way to interact with Rya Streams. The shell should include the > following commands: > * *streams-configure*: Lets you change which Rya Streams system an instance > of Rya is configured to use. For example, if you are using Kafka Streams, > this command would let you configure where the Kafka services are found. > * *streams-details*: Prints to the screen the currently configured values. > * *streams-queries-add*: Tells Rya Streams that it needs to maintain a query > * *streams-queries-delete*: Tells Rya Streams that it no longer needs to > maintain a query. > * *streams-queries-start*: Tells Rya Streams to start maintaining a query. > * *streams-queries-stop*: Tells Rya Streams to stop maintaining a query. > * *streams-queries-list*: Lists the queries that are being maintained by Rya > Streams as well as their start/stop state. > * *streams-queries-details*: Prints details about the query. > For this iteration, no PCJ integration will be supported. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-440) Rya Streams: Integration with the existing Rya Shell.
[ https://issues.apache.org/jira/browse/RYA-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335129#comment-16335129 ] ASF GitHub Bot commented on RYA-440: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/267#discussion_r163099293 --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfiguration.java --- @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.mongo; + +import static java.util.Objects.requireNonNull; + +import org.apache.rya.api.client.InstanceExists; +import org.apache.rya.api.client.SetRyaStreamsConfiguration; +import org.apache.rya.api.client.SetRyaStreamsConfigurationBase; +import org.apache.rya.api.instance.RyaDetailsRepository; +import org.apache.rya.mongodb.instance.MongoRyaInstanceDetailsRepository; + +import com.mongodb.MongoClient; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A MongoDB implementation of {@link SetRyaStreamsConfiguration}. + */ +@DefaultAnnotation(NonNull.class) +public class MongoSetRyaStreamsConfiguration extends SetRyaStreamsConfigurationBase { + +private final MongoClient client; + +/** + * Constructs an instance of {@link MongoSetRyaStreamsConfiguration}. + * + * @param instanceExists - The interactor used to verify Rya instances exist. (not null) + * @param client - The MongoDB client used to connect to the Rya storage. (not null) + */ +public MongoSetRyaStreamsConfiguration( +final InstanceExists instanceExists, +final MongoClient client) { +super(instanceExists); +this.client = requireNonNull(client); +} + +@Override +protected RyaDetailsRepository getRyaDetailsRepo(final String ryaInstance) { +requireNonNull(ryaInstance); +return new MongoRyaInstanceDetailsRepository(client, ryaInstance); --- End diff -- I think that makes the line look more congested when it isn't an assignment line. > Rya Streams: Integration with the existing Rya Shell. > - > > Key: RYA-440 > URL: https://issues.apache.org/jira/browse/RYA-440 > Project: Rya > Issue Type: Task >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > > We need a way to interact with Rya Streams. The shell should include the > following commands: > * *streams-configure*: Lets you change which Rya Streams system an instance > of Rya is configured to use. For example, if you are using Kafka Streams, > this command would let you configure where the Kafka services are found. > * *streams-details*: Prints to the screen the currently configured values. > * *streams-queries-add*: Tells Rya Streams that it needs to maintain a query > * *streams-queries-delete*: Tells Rya Streams that it no longer needs to > maintain a query. > * *streams-queries-start*: Tells Rya Streams to start maintaining a query. > * *streams-queries-stop*: Tells Rya Streams to stop maintaining a query. > * *streams-queries-list*: Lists the queries that are being maintained by Rya > Streams as well as their start/stop state. > * *streams-queries-details*: Prints details about the query. > For this iteration, no PCJ integration will be supported. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-440) Rya Streams: Integration with the existing Rya Shell.
[ https://issues.apache.org/jira/browse/RYA-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335127#comment-16335127 ] ASF GitHub Bot commented on RYA-440: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/267#discussion_r163099099 --- Diff: extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java --- @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka; + +import static java.util.Objects.requireNonNull; + +import java.util.Properties; +import java.util.UUID; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.streams.api.RyaStreamsClient; +import org.apache.rya.streams.api.interactor.defaults.DefaultAddQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultDeleteQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultGetQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultListQueries; +import org.apache.rya.streams.api.interactor.defaults.DefaultStartQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultStopQuery; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Constructs instances of {@link RyaStreamsClient} that are connected to a Kafka cluster. + */ +@DefaultAnnotation(NonNull.class) +public final class KafkaRyaStreamsClientFactory { +private static final Logger log = LoggerFactory.getLogger(KafkaRyaStreamsClientFactory.class); + +/** + * Initialize a {@link RyaStreamsClient} that will interact with an instance of Rya Streams + * that is backed by Kafka. + * + * @param ryaInstance - The name of the Rya Instance the client is connected to. (not null) + * @param kafkaHostname - The hostname of the Kafka Broker. + * @param kafkaPort - The port of the Kafka Broker. + * @return The initialized commands. + */ +public static RyaStreamsClient make( +final String ryaInstance, +final String kafkaHostname, +final int kafkaPort) { +requireNonNull(kafkaHostname); + +// Setup Query Repository used by the Kafka Rya Streams subsystem. +final Producer queryProducer = +makeProducer(kafkaHostname, kafkaPort, StringSerializer.class, QueryChangeSerializer.class); +
[jira] [Commented] (RYA-440) Rya Streams: Integration with the existing Rya Shell.
[ https://issues.apache.org/jira/browse/RYA-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335126#comment-16335126 ] ASF GitHub Bot commented on RYA-440: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/267#discussion_r163099029 --- Diff: extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfigurationIT.java --- @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.accumulo; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import org.apache.rya.accumulo.AccumuloITBase; +import org.apache.rya.api.client.Install; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.InstanceDoesNotExistException; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails; +import org.junit.Test; + +/** + * Integration tests the methods of {@link AccumuloSetRyaStreamsConfiguration}. + */ +public class AccumuloSetRyaStreamsConfigurationIT extends AccumuloITBase { + +@Test(expected = InstanceDoesNotExistException.class) +public void instanceDoesNotExist() throws Exception { +final String ryaInstance = getRyaInstanceName(); +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +getUsername(), +getPassword().toCharArray(), +getInstanceName(), +getZookeepers()); +final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector()); + +// Skip the install step to create error causing situation. +final RyaStreamsDetails details = new RyaStreamsDetails("localhost", 6); + ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, details); +} + +@Test +public void updatesRyaDetails() throws Exception { +final String ryaInstance = getRyaInstanceName(); +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +getUsername(), +getPassword().toCharArray(), +getInstanceName(), +getZookeepers()); +final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector()); + +// Install an instance of Rya. +final Install installRya = ryaClient.getInstall(); +final InstallConfiguration installConf = InstallConfiguration.builder() +.build(); +installRya.install(ryaInstance, installConf); + +// Fetch its details and show they do not have any RyaStreamsDetails. +com.google.common.base.Optional streamsDetails = --- End diff -- Google Optional objects are Java serializable. I'm pretty sure that's why they're being used. > Rya Streams: Integration with the existing Rya Shell. > - > > Key: RYA-440 > URL: https://issues.apache.org/jira/browse/RYA-440 > Project: Rya > Issue Type: Task >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > > We need a way to interact with Rya Streams. The shell should include the > following commands: > * *streams-configure*: Lets you change which Rya Streams system an instance > of Rya is configured to use. For example, if you are using Kafka Streams, > this command would let you configure where the Kafka services are found. > * *streams-details*: Prints to the screen the currently configured values. > * *streams-queries-add*: Tells Rya Streams that it needs to maintain a query > * *streams-queries-delete*: Tells Rya Streams that
[jira] [Commented] (RYA-440) Rya Streams: Integration with the existing Rya Shell.
[ https://issues.apache.org/jira/browse/RYA-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335068#comment-16335068 ] ASF GitHub Bot commented on RYA-440: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/267#discussion_r163090091 --- Diff: extras/shell/src/test/java/org/apache/rya/shell/util/StreamsQueryFormatterTest.java --- @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.shell.util; + +import static org.junit.Assert.assertEquals; + +import java.util.Set; +import java.util.UUID; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.junit.Test; + +import com.google.common.collect.Sets; + +/** + * Unit tests the methods of {@link StreamsQueryFormatter}. + */ +public class StreamsQueryFormatterTest { + +@Test +public void formatQuery() throws Exception { +// Format the query. +final StreamsQuery query = new StreamsQuery( +UUID.fromString("da55cea5-c21c-46a5-ab79-5433eef4efaa"), +"SELECT * WHERE { ?a ?b ?c . }", +true); +final String formatted = StreamsQueryFormatter.format(query); + +// Ensure it has the expected format. +final String expected = +" Query ID: da55cea5-c21c-46a5-ab79-5433eef4efaa\n" + +"Is Active: true\n" + +" SPARQL: select ?a ?b ?c\n" + +" where {\n" + +" ?a ?b ?c.\n" + +" }\n"; + +assertEquals(expected, formatted); +} + +@Test +public void formatQueries() throws Exception { +// Format the queries. +final Set queries = Sets.newHashSet( +new StreamsQuery( + UUID.fromString("----"), +"SELECT * WHERE { ?person ?business . }", +true), +new StreamsQuery( + UUID.fromString("----"), +"SELECT * WHERE { ?a ?b ?c . }", +true), +new StreamsQuery( + UUID.fromString("----"), +"SELECT * WHERE { ?d ?e ?f . }", +false)); + +final String formatted = StreamsQueryFormatter.format(queries); +System.out.println(formatted); --- End diff -- SOP? > Rya Streams: Integration with the existing Rya Shell. > - > > Key: RYA-440 > URL: https://issues.apache.org/jira/browse/RYA-440 > Project: Rya > Issue Type: Task >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > > We need a way to interact with Rya Streams. The shell should include the > following commands: > * *streams-configure*: Lets you change which Rya Streams system an instance > of Rya is configured to use. For example, if you are using Kafka Streams, > this command would let you configure where the Kafka services are found. > * *streams-details*: Prints to the screen the currently configured values. > * *streams-queries-add*: Tells Rya Streams that it needs to maintain a query > * *streams-queries-delete*: Tells Rya Streams that it no longer needs to > maintain a query. > * *streams-queries-start*: Tells Rya Streams to start maintaining a query. > * *streams-queries-stop*: Tells Rya Streams to stop maintaining a query. > * *streams-queries-list*: Lists the queries that are being maintained by Rya > Streams as well as their start/stop state. > * *streams-queries-details*: Prints details about
[jira] [Commented] (RYA-440) Rya Streams: Integration with the existing Rya Shell.
[ https://issues.apache.org/jira/browse/RYA-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335065#comment-16335065 ] ASF GitHub Bot commented on RYA-440: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/267#discussion_r163082048 --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/mongo/MongoSetRyaStreamsConfiguration.java --- @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.mongo; + +import static java.util.Objects.requireNonNull; + +import org.apache.rya.api.client.InstanceExists; +import org.apache.rya.api.client.SetRyaStreamsConfiguration; +import org.apache.rya.api.client.SetRyaStreamsConfigurationBase; +import org.apache.rya.api.instance.RyaDetailsRepository; +import org.apache.rya.mongodb.instance.MongoRyaInstanceDetailsRepository; + +import com.mongodb.MongoClient; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A MongoDB implementation of {@link SetRyaStreamsConfiguration}. + */ +@DefaultAnnotation(NonNull.class) +public class MongoSetRyaStreamsConfiguration extends SetRyaStreamsConfigurationBase { + +private final MongoClient client; + +/** + * Constructs an instance of {@link MongoSetRyaStreamsConfiguration}. + * + * @param instanceExists - The interactor used to verify Rya instances exist. (not null) + * @param client - The MongoDB client used to connect to the Rya storage. (not null) + */ +public MongoSetRyaStreamsConfiguration( +final InstanceExists instanceExists, +final MongoClient client) { +super(instanceExists); +this.client = requireNonNull(client); +} + +@Override +protected RyaDetailsRepository getRyaDetailsRepo(final String ryaInstance) { +requireNonNull(ryaInstance); +return new MongoRyaInstanceDetailsRepository(client, ryaInstance); --- End diff -- you could one line this? > Rya Streams: Integration with the existing Rya Shell. > - > > Key: RYA-440 > URL: https://issues.apache.org/jira/browse/RYA-440 > Project: Rya > Issue Type: Task >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > > We need a way to interact with Rya Streams. The shell should include the > following commands: > * *streams-configure*: Lets you change which Rya Streams system an instance > of Rya is configured to use. For example, if you are using Kafka Streams, > this command would let you configure where the Kafka services are found. > * *streams-details*: Prints to the screen the currently configured values. > * *streams-queries-add*: Tells Rya Streams that it needs to maintain a query > * *streams-queries-delete*: Tells Rya Streams that it no longer needs to > maintain a query. > * *streams-queries-start*: Tells Rya Streams to start maintaining a query. > * *streams-queries-stop*: Tells Rya Streams to stop maintaining a query. > * *streams-queries-list*: Lists the queries that are being maintained by Rya > Streams as well as their start/stop state. > * *streams-queries-details*: Prints details about the query. > For this iteration, no PCJ integration will be supported. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-440) Rya Streams: Integration with the existing Rya Shell.
[ https://issues.apache.org/jira/browse/RYA-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335064#comment-16335064 ] ASF GitHub Bot commented on RYA-440: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/267#discussion_r163081707 --- Diff: extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfiguration.java --- @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import org.apache.accumulo.core.client.Connector; +import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; +import org.apache.rya.api.client.InstanceExists; +import org.apache.rya.api.client.SetRyaStreamsConfiguration; +import org.apache.rya.api.client.SetRyaStreamsConfigurationBase; +import org.apache.rya.api.instance.RyaDetailsRepository; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Accumulo implementation of {@link SetRyaStreamsConfiguration}. --- End diff -- nit: An* > Rya Streams: Integration with the existing Rya Shell. > - > > Key: RYA-440 > URL: https://issues.apache.org/jira/browse/RYA-440 > Project: Rya > Issue Type: Task >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > > We need a way to interact with Rya Streams. The shell should include the > following commands: > * *streams-configure*: Lets you change which Rya Streams system an instance > of Rya is configured to use. For example, if you are using Kafka Streams, > this command would let you configure where the Kafka services are found. > * *streams-details*: Prints to the screen the currently configured values. > * *streams-queries-add*: Tells Rya Streams that it needs to maintain a query > * *streams-queries-delete*: Tells Rya Streams that it no longer needs to > maintain a query. > * *streams-queries-start*: Tells Rya Streams to start maintaining a query. > * *streams-queries-stop*: Tells Rya Streams to stop maintaining a query. > * *streams-queries-list*: Lists the queries that are being maintained by Rya > Streams as well as their start/stop state. > * *streams-queries-details*: Prints details about the query. > For this iteration, no PCJ integration will be supported. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-440) Rya Streams: Integration with the existing Rya Shell.
[ https://issues.apache.org/jira/browse/RYA-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335066#comment-16335066 ] ASF GitHub Bot commented on RYA-440: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/267#discussion_r163084595 --- Diff: extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/interactor/defaults/DefaultGetQuery.java --- @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.api.interactor.defaults; + +import static java.util.Objects.requireNonNull; + +import java.util.Optional; +import java.util.UUID; + +import org.apache.rya.streams.api.entity.StreamsQuery; +import org.apache.rya.streams.api.exception.RyaStreamsException; +import org.apache.rya.streams.api.interactor.GetQuery; +import org.apache.rya.streams.api.queries.QueryRepository; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Get a {@link StreamsQuery} from Rya Streams. + */ +@DefaultAnnotation(NonNull.class) +public class DefaultGetQuery implements GetQuery { +private final QueryRepository repository; + +public DefaultGetQuery(final QueryRepository repository) { --- End diff -- doc > Rya Streams: Integration with the existing Rya Shell. > - > > Key: RYA-440 > URL: https://issues.apache.org/jira/browse/RYA-440 > Project: Rya > Issue Type: Task >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > > We need a way to interact with Rya Streams. The shell should include the > following commands: > * *streams-configure*: Lets you change which Rya Streams system an instance > of Rya is configured to use. For example, if you are using Kafka Streams, > this command would let you configure where the Kafka services are found. > * *streams-details*: Prints to the screen the currently configured values. > * *streams-queries-add*: Tells Rya Streams that it needs to maintain a query > * *streams-queries-delete*: Tells Rya Streams that it no longer needs to > maintain a query. > * *streams-queries-start*: Tells Rya Streams to start maintaining a query. > * *streams-queries-stop*: Tells Rya Streams to stop maintaining a query. > * *streams-queries-list*: Lists the queries that are being maintained by Rya > Streams as well as their start/stop state. > * *streams-queries-details*: Prints details about the query. > For this iteration, no PCJ integration will be supported. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-440) Rya Streams: Integration with the existing Rya Shell.
[ https://issues.apache.org/jira/browse/RYA-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335067#comment-16335067 ] ASF GitHub Bot commented on RYA-440: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/267#discussion_r163082381 --- Diff: extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloSetRyaStreamsConfigurationIT.java --- @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.accumulo; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import org.apache.rya.accumulo.AccumuloITBase; +import org.apache.rya.api.client.Install; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.InstanceDoesNotExistException; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.instance.RyaDetails.RyaStreamsDetails; +import org.junit.Test; + +/** + * Integration tests the methods of {@link AccumuloSetRyaStreamsConfiguration}. + */ +public class AccumuloSetRyaStreamsConfigurationIT extends AccumuloITBase { + +@Test(expected = InstanceDoesNotExistException.class) +public void instanceDoesNotExist() throws Exception { +final String ryaInstance = getRyaInstanceName(); +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +getUsername(), +getPassword().toCharArray(), +getInstanceName(), +getZookeepers()); +final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector()); + +// Skip the install step to create error causing situation. +final RyaStreamsDetails details = new RyaStreamsDetails("localhost", 6); + ryaClient.getSetRyaStreamsConfiguration().setRyaStreamsConfiguration(ryaInstance, details); +} + +@Test +public void updatesRyaDetails() throws Exception { +final String ryaInstance = getRyaInstanceName(); +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +getUsername(), +getPassword().toCharArray(), +getInstanceName(), +getZookeepers()); +final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector()); + +// Install an instance of Rya. +final Install installRya = ryaClient.getInstall(); +final InstallConfiguration installConf = InstallConfiguration.builder() +.build(); +installRya.install(ryaInstance, installConf); + +// Fetch its details and show they do not have any RyaStreamsDetails. +com.google.common.base.Optional streamsDetails = --- End diff -- we should really clean up the Optional stuff one day > Rya Streams: Integration with the existing Rya Shell. > - > > Key: RYA-440 > URL: https://issues.apache.org/jira/browse/RYA-440 > Project: Rya > Issue Type: Task >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > > We need a way to interact with Rya Streams. The shell should include the > following commands: > * *streams-configure*: Lets you change which Rya Streams system an instance > of Rya is configured to use. For example, if you are using Kafka Streams, > this command would let you configure where the Kafka services are found. > * *streams-details*: Prints to the screen the currently configured values. > * *streams-queries-add*: Tells Rya Streams that it needs to maintain a query > * *streams-queries-delete*: Tells Rya Streams that it no longer needs to > maintain a query. >
[jira] [Commented] (RYA-440) Rya Streams: Integration with the existing Rya Shell.
[ https://issues.apache.org/jira/browse/RYA-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16335069#comment-16335069 ] ASF GitHub Bot commented on RYA-440: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/267#discussion_r163085139 --- Diff: extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/KafkaRyaStreamsClientFactory.java --- @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka; + +import static java.util.Objects.requireNonNull; + +import java.util.Properties; +import java.util.UUID; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.streams.api.RyaStreamsClient; +import org.apache.rya.streams.api.interactor.defaults.DefaultAddQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultDeleteQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultGetQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultListQueries; +import org.apache.rya.streams.api.interactor.defaults.DefaultStartQuery; +import org.apache.rya.streams.api.interactor.defaults.DefaultStopQuery; +import org.apache.rya.streams.api.queries.InMemoryQueryRepository; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; +import org.apache.rya.streams.api.queries.QueryRepository; +import org.apache.rya.streams.kafka.interactor.KafkaGetQueryResultStream; +import org.apache.rya.streams.kafka.queries.KafkaQueryChangeLog; +import org.apache.rya.streams.kafka.serialization.VisibilityBindingSetDeserializer; +import org.apache.rya.streams.kafka.serialization.VisibilityStatementDeserializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Constructs instances of {@link RyaStreamsClient} that are connected to a Kafka cluster. + */ +@DefaultAnnotation(NonNull.class) +public final class KafkaRyaStreamsClientFactory { +private static final Logger log = LoggerFactory.getLogger(KafkaRyaStreamsClientFactory.class); + +/** + * Initialize a {@link RyaStreamsClient} that will interact with an instance of Rya Streams + * that is backed by Kafka. + * + * @param ryaInstance - The name of the Rya Instance the client is connected to. (not null) + * @param kafkaHostname - The hostname of the Kafka Broker. + * @param kafkaPort - The port of the Kafka Broker. + * @return The initialized commands. + */ +public static RyaStreamsClient make( +final String ryaInstance, +final String kafkaHostname, +final int kafkaPort) { +requireNonNull(kafkaHostname); + +// Setup Query Repository used by the Kafka Rya Streams subsystem. +final Producer queryProducer = +makeProducer(kafkaHostname, kafkaPort, StringSerializer.class, QueryChangeSerializer.class); +
[jira] [Commented] (RYA-440) Rya Streams: Integration with the existing Rya Shell.
[ https://issues.apache.org/jira/browse/RYA-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16333157#comment-16333157 ] ASF GitHub Bot commented on RYA-440: Github user asfgit commented on the issue: https://github.com/apache/incubator-rya/pull/267 Refer to this link for build results (access rights to CI server needed): https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/673/ > Rya Streams: Integration with the existing Rya Shell. > - > > Key: RYA-440 > URL: https://issues.apache.org/jira/browse/RYA-440 > Project: Rya > Issue Type: Task >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > > We need a way to interact with Rya Streams. The shell should include the > following commands: > * *streams-configure*: Lets you change which Rya Streams system an instance > of Rya is configured to use. For example, if you are using Kafka Streams, > this command would let you configure where the Kafka services are found. > * *streams-details*: Prints to the screen the currently configured values. > * *streams-queries-add*: Tells Rya Streams that it needs to maintain a query > * *streams-queries-delete*: Tells Rya Streams that it no longer needs to > maintain a query. > * *streams-queries-start*: Tells Rya Streams to start maintaining a query. > * *streams-queries-stop*: Tells Rya Streams to stop maintaining a query. > * *streams-queries-list*: Lists the queries that are being maintained by Rya > Streams as well as their start/stop state. > * *streams-queries-details*: Prints details about the query. > For this iteration, no PCJ integration will be supported. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-440) Rya Streams: Integration with the existing Rya Shell.
[ https://issues.apache.org/jira/browse/RYA-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16333147#comment-16333147 ] ASF GitHub Bot commented on RYA-440: GitHub user kchilton2 opened a pull request: https://github.com/apache/incubator-rya/pull/267 RYA-440 Added commands to Rya Shell used to interact with Rya Streams. ## Description Integrated the Rya Shell with Rya Streams. ### Tests Added tests. ### Links https://issues.apache.org/jira/browse/RYA-440 You can merge this pull request into a Git repository by running: $ git pull https://github.com/kchilton2/incubator-rya RYA-440 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-rya/pull/267.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #267 commit f4e132cf8aac2467af3e0758f6eab4aeee806e83 Author: kchilton2Date: 2018-01-16T22:40:08Z RYA-440 Added commands to Rya Shell used to interact with Rya Streams. > Rya Streams: Integration with the existing Rya Shell. > - > > Key: RYA-440 > URL: https://issues.apache.org/jira/browse/RYA-440 > Project: Rya > Issue Type: Task >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > > We need a way to interact with Rya Streams. The shell should include the > following commands: > * *streams-configure*: Lets you change which Rya Streams system an instance > of Rya is configured to use. For example, if you are using Kafka Streams, > this command would let you configure where the Kafka services are found. > * *streams-details*: Prints to the screen the currently configured values. > * *streams-queries-add*: Tells Rya Streams that it needs to maintain a query > * *streams-queries-delete*: Tells Rya Streams that it no longer needs to > maintain a query. > * *streams-queries-start*: Tells Rya Streams to start maintaining a query. > * *streams-queries-stop*: Tells Rya Streams to stop maintaining a query. > * *streams-queries-list*: Lists the queries that are being maintained by Rya > Streams as well as their start/stop state. > * *streams-queries-details*: Prints details about the query. > For this iteration, no PCJ integration will be supported. -- This message was sent by Atlassian JIRA (v7.6.3#76005)