[GitHub] incubator-rya issue #296: RYA-487 Kafka Connect Sinks
Github user asfgit commented on the issue: https://github.com/apache/incubator-rya/pull/296 Refer to this link for build results (access rights to CI server needed): https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/761/ ---
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472721#comment-16472721 ] ASF GitHub Bot commented on RYA-487: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187742000 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java --- @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api; + +import java.io.ByteArrayOutputStream; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.serialization.Serializer; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.rio.RDFWriter; +import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s + * using the RDF4J Rio Binary format. --- End diff -- I mean that's true, but I don't think it's worth documenting. > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187742000 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java --- @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api; + +import java.io.ByteArrayOutputStream; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.serialization.Serializer; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.rio.RDFWriter; +import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s + * using the RDF4J Rio Binary format. --- End diff -- I mean that's true, but I don't think it's worth documenting. ---
[GitHub] incubator-rya issue #296: RYA-487 Kafka Connect Sinks
Github user asfgit commented on the issue: https://github.com/apache/incubator-rya/pull/296 Refer to this link for build results (access rights to CI server needed): https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/760/ ---
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472707#comment-16472707 ] ASF GitHub Bot commented on RYA-487: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187739885 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java --- @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import static java.util.Objects.requireNonNull; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.sail.Sail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.jcabi.manifests.Manifests; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Handles the common components required to write {@link Statement}s to Rya. + * + * Implementations of this class only need to specify functionality that is specific to the + * Rya implementation. + */ +@DefaultAnnotation(NonNull.class) --- End diff -- gotcha, thanks for the doc paste dump > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187739974 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java --- @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import static java.util.Objects.requireNonNull; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.sail.Sail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.jcabi.manifests.Manifests; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Handles the common components required to write {@link Statement}s to Rya. + * + * Implementations of this class only need to specify functionality that is specific to the + * Rya implementation. + */ +@DefaultAnnotation(NonNull.class) +public abstract class RyaSinkTask extends SinkTask { +private static final Logger log = LoggerFactory.getLogger(RyaSinkTask.class); + +@Nullable +private SailRepository sailRepo = null; + +@Nullable +private SailRepositoryConnection conn = null; + +/** + * Throws an exception if the configured Rya Instance is not already installed + * within the configured database. + * + * @param taskConfig - The configuration values that were provided to the task. (not null) + * @throws ConnectException The configured Rya Instance is not installed to the configured database + * or we were unable to figure out if it is installed. + */ +protected abstract void checkRyaInstanceExists(final MaptaskConfig) throws ConnectException; + +/** + * Creates an initialized {@link Sail} object that may be used to write {@link Statement}s to the configured + * Rya Instance. + * + * @param taskConfig - Configures how the Sail object will be created. (not null) + * @return The created Sail object. + * @throws ConnectException The Sail object could not be made. + */ +protected abstract Sail makeSail(final Map taskConfig) throws ConnectException; + +@Override +public String version() { +return Manifests.exists("Build-Version") ? Manifests.read("Build-Version"): "UNKNOWN"; +} + +@Override +public void start(final Map props) throws ConnectException { +requireNonNull(props); + +// Ensure the configured Rya Instance is installed within the configured database. +checkRyaInstanceExists(props); + +// Create the Sail object that is connected to the Rya Instance. +final Sail sail = makeSail(props); +sailRepo = new SailRepository( sail ); +conn = sailRepo.getConnection(); +} + +@Override +public void put(final Collection records) { +requireNonNull(records); + +// Return immediately if there are no records to handle. +if(records.isEmpty()) { +return; +} + +// If a transaction has not been started yet, then start one. +if(!conn.isActive()) { +conn.begin(); +} + +
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187739885 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java --- @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import static java.util.Objects.requireNonNull; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.sail.Sail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.jcabi.manifests.Manifests; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Handles the common components required to write {@link Statement}s to Rya. + * + * Implementations of this class only need to specify functionality that is specific to the + * Rya implementation. + */ +@DefaultAnnotation(NonNull.class) --- End diff -- gotcha, thanks for the doc paste dump ---
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472705#comment-16472705 ] ASF GitHub Bot commented on RYA-487: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187739762 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java --- @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api; + +import java.io.ByteArrayOutputStream; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.serialization.Serializer; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.rio.RDFWriter; +import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s + * using the RDF4J Rio Binary format. --- End diff -- fair enough, it just seemed like you are delegating the actual serialization to something else. > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-494) Shell insert and query bug.
[ https://issues.apache.org/jira/browse/RYA-494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472706#comment-16472706 ] ASF GitHub Bot commented on RYA-494: Github user asfgit commented on the issue: https://github.com/apache/incubator-rya/pull/295 Refer to this link for build results (access rights to CI server needed): https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/759/ > Shell insert and query bug. > --- > > Key: RYA-494 > URL: https://issues.apache.org/jira/browse/RYA-494 > Project: Rya > Issue Type: Bug >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > > Using the Rya Shell: > # Connect to a cluster of Accumulo. > # Install an instance of Rya that has all secondary indexers and the > sharding turned off. > # Connect to that instance of rya. > # Load the following N-Triples file into it: > ## > {code:java} > . > . > . > . > . > .{code} > # Query for all statements using the following query: > ## select * where \{ ?s ?p ?o .} > You will see the following results: > {code:java} > p,s,o > urn:talksTo,urn:Alice,urn:Bob > urn:talksTo,urn:Bob,urn:Charlie > urn:talksTo,urn:Charlie,urn:Alice > urn:talksTo,urn:David,urn:Eve > urn:listensTo,urn:Eve,urn:Bob > urn:org.apache.rya/2012/05#version,urn:org.apache.rya/2012/05#rts,"3.0.0"{code} > The following statement is missing: > {code:java} > .{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] incubator-rya issue #295: RYA-494 Fixed a bug where the shell was not loadin...
Github user asfgit commented on the issue: https://github.com/apache/incubator-rya/pull/295 Refer to this link for build results (access rights to CI server needed): https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/759/ ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187739762 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java --- @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api; + +import java.io.ByteArrayOutputStream; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.serialization.Serializer; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.rio.RDFWriter; +import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s + * using the RDF4J Rio Binary format. --- End diff -- fair enough, it just seemed like you are delegating the actual serialization to something else. ---
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472704#comment-16472704 ] ASF GitHub Bot commented on RYA-487: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187739445 --- Diff: extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java --- @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig; +import org.junit.Test; + +/** + * Unit tests the methods of {@link AccumuloRyaSinkConfig}. + */ +public class AccumuloRyaSinkConfigTest { + +@Test +public void parses() { --- End diff -- gotcha, the test just seemed lacking an error case > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187739297 --- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java --- @@ -438,6 +438,9 @@ public static boolean getUsePcjUpdaterIndex(final Configuration conf) { return Optional.fromNullable(conf.get(FLUO_APP_NAME)); } +public static void setUseMongo(final Configuration conf, final boolean useMongo) { --- End diff -- sure makes sense ---
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472703#comment-16472703 ] ASF GitHub Bot commented on RYA-487: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187739297 --- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java --- @@ -438,6 +438,9 @@ public static boolean getUsePcjUpdaterIndex(final Configuration conf) { return Optional.fromNullable(conf.get(FLUO_APP_NAME)); } +public static void setUseMongo(final Configuration conf, final boolean useMongo) { --- End diff -- sure makes sense > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187739254 --- Diff: dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java --- @@ -62,14 +62,14 @@ public AccumuloRdfConfiguration() { super(); } -public AccumuloRdfConfiguration(Configuration other) { +public AccumuloRdfConfiguration(final Configuration other) { super(other); } -public AccumuloRdfConfigurationBuilder getBuilder() { +public static AccumuloRdfConfigurationBuilder getBuilder() { --- End diff -- ð ---
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472702#comment-16472702 ] ASF GitHub Bot commented on RYA-487: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187739254 --- Diff: dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java --- @@ -62,14 +62,14 @@ public AccumuloRdfConfiguration() { super(); } -public AccumuloRdfConfiguration(Configuration other) { +public AccumuloRdfConfiguration(final Configuration other) { super(other); } -public AccumuloRdfConfigurationBuilder getBuilder() { +public static AccumuloRdfConfigurationBuilder getBuilder() { --- End diff -- > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472685#comment-16472685 ] ASF GitHub Bot commented on RYA-487: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187736288 --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java --- @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkConnector} that uses a Mongo DB Rya backend when creating tasks. + */ +@DefaultAnnotation(NonNull.class) +public class MongoRyaSinkConnector extends RyaSinkConnector { + +@Nullable +private MongoRyaSinkConfig config = null; + +@Override +public void start(final Mapprops) { +this.config = new MongoRyaSinkConfig( props ); +} + +@Override +protected AbstractConfig getConfig() { +if(config == null) { +throw new IllegalStateException("The configuration has not been set yet. Invoke start(props) first."); --- End diff -- Done. > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472680#comment-16472680 ] ASF GitHub Bot commented on RYA-487: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187735910 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java --- @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import static java.util.Objects.requireNonNull; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.sail.Sail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.jcabi.manifests.Manifests; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Handles the common components required to write {@link Statement}s to Rya. + * + * Implementations of this class only need to specify functionality that is specific to the + * Rya implementation. + */ +@DefaultAnnotation(NonNull.class) +public abstract class RyaSinkTask extends SinkTask { +private static final Logger log = LoggerFactory.getLogger(RyaSinkTask.class); + +@Nullable +private SailRepository sailRepo = null; + +@Nullable +private SailRepositoryConnection conn = null; + +/** + * Throws an exception if the configured Rya Instance is not already installed + * within the configured database. + * + * @param taskConfig - The configuration values that were provided to the task. (not null) + * @throws ConnectException The configured Rya Instance is not installed to the configured database + * or we were unable to figure out if it is installed. + */ +protected abstract void checkRyaInstanceExists(final MaptaskConfig) throws ConnectException; + +/** + * Creates an initialized {@link Sail} object that may be used to write {@link Statement}s to the configured + * Rya Instance. + * + * @param taskConfig - Configures how the Sail object will be created. (not null) + * @return The created Sail object. + * @throws ConnectException The Sail object could not be made. + */ +protected abstract Sail makeSail(final Map taskConfig) throws ConnectException; + +@Override +public String version() { +return Manifests.exists("Build-Version") ? Manifests.read("Build-Version"): "UNKNOWN"; +} + +@Override +public void start(final Map props) throws ConnectException { +requireNonNull(props); + +// Ensure the configured Rya Instance is installed within the configured database. +checkRyaInstanceExists(props); + +// Create the Sail object that is connected to the Rya Instance. +final Sail sail = makeSail(props); +sailRepo = new SailRepository( sail ); +conn = sailRepo.getConnection(); +} + +@Override +public void put(final Collection records) { +requireNonNull(records); + +// Return immediately if there are no records to handle. +
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187736164 --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java --- @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka Connect configuration that is used to configure {@link MongoRyaSinkConnector}s and {@link MongoRyaSinkTask}s. + */ +@DefaultAnnotation(NonNull.class) +public class MongoRyaSinkConfig extends RyaSinkConfig { + +public static final String HOSTNAME = "mongo.hostname"; +private static final String HOSTNAME_DOC = "The Mongo DB hostname the Sail connections wlll use."; --- End diff -- Done. ---
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472672#comment-16472672 ] ASF GitHub Bot commented on RYA-487: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187735067 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java --- @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import static java.util.Objects.requireNonNull; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.sail.Sail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.jcabi.manifests.Manifests; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Handles the common components required to write {@link Statement}s to Rya. + * + * Implementations of this class only need to specify functionality that is specific to the + * Rya implementation. + */ +@DefaultAnnotation(NonNull.class) --- End diff -- "/** * Indicates that all members of the class or package should be annotated with the default value of the supplied * annotation class. This would be used for behavior annotations such as @NonNull, @CheckForNull, * or @CheckReturnValue. In particular, you can use @DefaultAnnotation(NonNull.class) on a class or package, * and then use @Nullable only on those parameters, methods or fields that you want to allow to be null. */" > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187735067 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java --- @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import static java.util.Objects.requireNonNull; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.sail.Sail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.jcabi.manifests.Manifests; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Handles the common components required to write {@link Statement}s to Rya. + * + * Implementations of this class only need to specify functionality that is specific to the + * Rya implementation. + */ +@DefaultAnnotation(NonNull.class) --- End diff -- "/** * Indicates that all members of the class or package should be annotated with the default value of the supplied * annotation class. This would be used for behavior annotations such as @NonNull, @CheckForNull, * or @CheckReturnValue. In particular, you can use @DefaultAnnotation(NonNull.class) on a class or package, * and then use @Nullable only on those parameters, methods or fields that you want to allow to be null. */" ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187734887 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java --- @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import static java.util.Objects.requireNonNull; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.sail.Sail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.jcabi.manifests.Manifests; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Handles the common components required to write {@link Statement}s to Rya. + * + * Implementations of this class only need to specify functionality that is specific to the + * Rya implementation. + */ +@DefaultAnnotation(NonNull.class) --- End diff -- It indicates by default the parameters are not null. ---
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472669#comment-16472669 ] ASF GitHub Bot commented on RYA-487: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187734692 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java --- @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when creating tasks. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkConnector extends RyaSinkConnector { + +@Nullable +private AccumuloRyaSinkConfig config = null; + +@Override +public void start(final Mapprops) { +this.config = new AccumuloRyaSinkConfig( props ); +} + +@Override +protected AbstractConfig getConfig() { +if(config == null) { +throw new IllegalStateException("The configuration has not been set yet. Invoke start(props) first."); --- End diff -- Done. > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187734692 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java --- @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when creating tasks. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkConnector extends RyaSinkConnector { + +@Nullable +private AccumuloRyaSinkConfig config = null; + +@Override +public void start(final Mapprops) { +this.config = new AccumuloRyaSinkConfig( props ); +} + +@Override +protected AbstractConfig getConfig() { +if(config == null) { +throw new IllegalStateException("The configuration has not been set yet. Invoke start(props) first."); --- End diff -- Done. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187734448 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java --- @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api; + +import java.io.ByteArrayOutputStream; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.serialization.Serializer; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.rio.RDFWriter; +import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s + * using the RDF4J Rio Binary format. --- End diff -- I think saying I'm using the RDF4J Rio Binary format is more useful than indicating how I went about doing that since that's what the code is. ---
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472665#comment-16472665 ] ASF GitHub Bot commented on RYA-487: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187734176 --- Diff: extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java --- @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig; +import org.junit.Test; + +/** + * Unit tests the methods of {@link AccumuloRyaSinkConfig}. + */ +public class AccumuloRyaSinkConfigTest { + +@Test +public void parses() { --- End diff -- Could you give an example of a malformed field? Do you just mean fields that are not part of the schema? That's not illegal. They just get ignored. > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187733815 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java --- @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final MaptaskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + +// Connect to the instance of Accumulo. +final Connector connector; +try { +final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers()); +connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() )); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e); +} + +// Use a RyaClient to see if the configured instance exists. +try { +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( --- End diff -- That's true. Feel free to write an improvement ticket for that. ---
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472658#comment-16472658 ] ASF GitHub Bot commented on RYA-487: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187733573 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java --- @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when creating tasks. + */ +@DefaultAnnotation(NonNull.class) --- End diff -- That field is nullable because this is a stateful object, but the parameters into the start(...) function may not be null. I'll add a null check there. > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187733573 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java --- @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when creating tasks. + */ +@DefaultAnnotation(NonNull.class) --- End diff -- That field is nullable because this is a stateful object, but the parameters into the start(...) function may not be null. I'll add a null check there. ---
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472656#comment-16472656 ] ASF GitHub Bot commented on RYA-487: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187733216 --- Diff: extras/kafka.connect/README.md --- @@ -0,0 +1,22 @@ + + +The parent project for all Rya Kafka Connect work. All projects thare are part +of that system must use this project's pom as their parent pom. --- End diff -- Done. > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472654#comment-16472654 ] ASF GitHub Bot commented on RYA-487: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187732940 --- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java --- @@ -438,6 +438,9 @@ public static boolean getUsePcjUpdaterIndex(final Configuration conf) { return Optional.fromNullable(conf.get(FLUO_APP_NAME)); } +public static void setUseMongo(final Configuration conf, final boolean useMongo) { --- End diff -- I don't wan to mess with how our configuration objects are initialized for the scope of this ticket. > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187732940 --- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java --- @@ -438,6 +438,9 @@ public static boolean getUsePcjUpdaterIndex(final Configuration conf) { return Optional.fromNullable(conf.get(FLUO_APP_NAME)); } +public static void setUseMongo(final Configuration conf, final boolean useMongo) { --- End diff -- I don't wan to mess with how our configuration objects are initialized for the scope of this ticket. ---
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472649#comment-16472649 ] ASF GitHub Bot commented on RYA-487: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187732007 --- Diff: dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java --- @@ -62,14 +62,14 @@ public AccumuloRdfConfiguration() { super(); } -public AccumuloRdfConfiguration(Configuration other) { +public AccumuloRdfConfiguration(final Configuration other) { super(other); } -public AccumuloRdfConfigurationBuilder getBuilder() { +public static AccumuloRdfConfigurationBuilder getBuilder() { --- End diff -- For me it is. I don't really want to refactor the entire method name in this review, though. I just needed it to be static so that I could use it. > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187732007 --- Diff: dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java --- @@ -62,14 +62,14 @@ public AccumuloRdfConfiguration() { super(); } -public AccumuloRdfConfiguration(Configuration other) { +public AccumuloRdfConfiguration(final Configuration other) { super(other); } -public AccumuloRdfConfigurationBuilder getBuilder() { +public static AccumuloRdfConfigurationBuilder getBuilder() { --- End diff -- For me it is. I don't really want to refactor the entire method name in this review, though. I just needed it to be static so that I could use it. ---
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472648#comment-16472648 ] ASF GitHub Bot commented on RYA-487: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187731624 --- Diff: dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java --- @@ -274,16 +274,17 @@ public boolean getUseAggregationPipeline() { * on their child subtrees. * @param value whether to use aggregation pipeline optimization. */ -public void setUseAggregationPipeline(boolean value) { +public void setUseAggregationPipeline(final boolean value) { setBoolean(USE_AGGREGATION_PIPELINE, value); } @Override public ListgetOptimizers() { -List optimizers = super.getOptimizers(); +final List optimizers = super.getOptimizers(); if (getUseAggregationPipeline()) { -Class cl = AggregationPipelineQueryOptimizer.class; +final Class cl = AggregationPipelineQueryOptimizer.class; @SuppressWarnings("unchecked") +final --- End diff -- Done. > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187731624 --- Diff: dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java --- @@ -274,16 +274,17 @@ public boolean getUseAggregationPipeline() { * on their child subtrees. * @param value whether to use aggregation pipeline optimization. */ -public void setUseAggregationPipeline(boolean value) { +public void setUseAggregationPipeline(final boolean value) { setBoolean(USE_AGGREGATION_PIPELINE, value); } @Override public ListgetOptimizers() { -List optimizers = super.getOptimizers(); +final List optimizers = super.getOptimizers(); if (getUseAggregationPipeline()) { -Class cl = AggregationPipelineQueryOptimizer.class; +final Class cl = AggregationPipelineQueryOptimizer.class; @SuppressWarnings("unchecked") +final --- End diff -- Done. ---
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472644#comment-16472644 ] ASF GitHub Bot commented on RYA-487: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187731377 --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java --- @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import static java.util.Objects.requireNonNull; + +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.mongo.MongoConnectionDetails; +import org.apache.rya.api.client.mongo.MongoRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import com.google.common.base.Strings; +import com.mongodb.MongoClient; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class MongoRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final MaptaskConfig) throws IllegalStateException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig); +@Nullable +final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername(); +@Nullable +final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray(); + +// Connect a Mongo Client to the configured Mongo DB instance. +final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort()); +final boolean hasCredentials = username != null && password != null; + +try(MongoClient mongoClient = hasCredentials ? +new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) : +new MongoClient(serverAddr)) { +// Use a RyaClient to see if the configured instance exists. +// Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with. +final MongoConnectionDetails connectionDetails = new MongoConnectionDetails( +config.getHostname(), +config.getPort(), +Optional.ofNullable(username), +Optional.ofNullable(password)); + +final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient); +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472645#comment-16472645 ] ASF GitHub Bot commented on RYA-487: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187731454 --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java --- @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import static java.util.Objects.requireNonNull; + +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.mongo.MongoConnectionDetails; +import org.apache.rya.api.client.mongo.MongoRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import com.google.common.base.Strings; +import com.mongodb.MongoClient; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class MongoRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final MaptaskConfig) throws IllegalStateException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig); +@Nullable +final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername(); +@Nullable +final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray(); + +// Connect a Mongo Client to the configured Mongo DB instance. +final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort()); +final boolean hasCredentials = username != null && password != null; + +try(MongoClient mongoClient = hasCredentials ? +new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) : +new MongoClient(serverAddr)) { +// Use a RyaClient to see if the configured instance exists. +// Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with. +final MongoConnectionDetails connectionDetails = new MongoConnectionDetails( +config.getHostname(), +config.getPort(), +Optional.ofNullable(username), +Optional.ofNullable(password)); + +final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient); +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187731454 --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java --- @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import static java.util.Objects.requireNonNull; + +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.mongo.MongoConnectionDetails; +import org.apache.rya.api.client.mongo.MongoRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import com.google.common.base.Strings; +import com.mongodb.MongoClient; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class MongoRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final MaptaskConfig) throws IllegalStateException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig); +@Nullable +final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername(); +@Nullable +final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray(); + +// Connect a Mongo Client to the configured Mongo DB instance. +final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort()); +final boolean hasCredentials = username != null && password != null; + +try(MongoClient mongoClient = hasCredentials ? +new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) : +new MongoClient(serverAddr)) { +// Use a RyaClient to see if the configured instance exists. +// Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with. +final MongoConnectionDetails connectionDetails = new MongoConnectionDetails( +config.getHostname(), +config.getPort(), +Optional.ofNullable(username), +Optional.ofNullable(password)); + +final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient); +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +config.getRyaInstanceName() + " has not been installed."); +} +} catch(final RyaClientException e) { +throw new ConnectException("Unable to determine if the Rya
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187731377 --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java --- @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import static java.util.Objects.requireNonNull; + +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.mongo.MongoConnectionDetails; +import org.apache.rya.api.client.mongo.MongoRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import com.google.common.base.Strings; +import com.mongodb.MongoClient; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class MongoRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final MaptaskConfig) throws IllegalStateException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig); +@Nullable +final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername(); +@Nullable +final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray(); + +// Connect a Mongo Client to the configured Mongo DB instance. +final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort()); +final boolean hasCredentials = username != null && password != null; + +try(MongoClient mongoClient = hasCredentials ? +new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) : +new MongoClient(serverAddr)) { +// Use a RyaClient to see if the configured instance exists. +// Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with. +final MongoConnectionDetails connectionDetails = new MongoConnectionDetails( +config.getHostname(), +config.getPort(), +Optional.ofNullable(username), +Optional.ofNullable(password)); + +final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient); +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +config.getRyaInstanceName() + " has not been installed."); --- End diff -- Done. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187731146 --- Diff: extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.java --- @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.client.command; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.kafka.connect.api.StatementsSerializer; +import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand; +import org.apache.rya.rdftriplestore.utils.RdfFormatUtils; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.rio.RDFFormat; +import org.eclipse.rdf4j.rio.RDFHandlerException; +import org.eclipse.rdf4j.rio.RDFParseException; +import org.eclipse.rdf4j.rio.RDFParser; +import org.eclipse.rdf4j.rio.Rio; +import org.eclipse.rdf4j.rio.UnsupportedRDFormatException; +import org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Writes {@link Statement}s to a Kafka topic using the Rya Kafka Connect Sink format. + */ +@DefaultAnnotation(NonNull.class) +public class WriteStatementsCommand implements RyaKafkaClientCommand { +private static final Logger log = LoggerFactory.getLogger(WriteStatementsCommand.class); + +/** + * Command line parameters that are used by this command to configure itself. + */ +public static class WriteParameters extends KafkaParameters { +@Parameter(names = {"--statementsFile", "-f"}, required = true, description = "The file of RDF statements to load into Rya Streams.") +public String statementsFile; +} + +@Override +public String getCommand() { +return "write"; +} + +@Override +public String getDescription() { +return "Writes Statements to the specified Kafka topic."; +} + +@Override +public boolean validArguments(final String[] args) { +boolean valid = true; +try { +new JCommander(new WriteParameters(), args); +} catch(final ParameterException e) { +valid = false; +} +return valid; +} + +/** + * @return Describes what arguments may be provided to the command. + */ +@Override +public String getUsage() { +final JCommander parser = new JCommander(new WriteParameters()); + +final StringBuilder usage = new StringBuilder(); +parser.usage(usage); +return usage.toString(); +} + +@Override +public void execute(final String[] args) throws ArgumentsException, ExecutionException { +requireNonNull(args); + +// Parse the command line arguments. +final WriteParameters params = new WriteParameters(); +try { +new JCommander(params, args); +} catch(final ParameterException e) { +throw new ArgumentsException("Could not stream the query's results
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472640#comment-16472640 ] ASF GitHub Bot commented on RYA-487: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187731146 --- Diff: extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.java --- @@ -0,0 +1,187 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.client.command; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashSet; +import java.util.Properties; +import java.util.Set; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.kafka.connect.api.StatementsSerializer; +import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand; +import org.apache.rya.rdftriplestore.utils.RdfFormatUtils; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.rio.RDFFormat; +import org.eclipse.rdf4j.rio.RDFHandlerException; +import org.eclipse.rdf4j.rio.RDFParseException; +import org.eclipse.rdf4j.rio.RDFParser; +import org.eclipse.rdf4j.rio.Rio; +import org.eclipse.rdf4j.rio.UnsupportedRDFormatException; +import org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Writes {@link Statement}s to a Kafka topic using the Rya Kafka Connect Sink format. + */ +@DefaultAnnotation(NonNull.class) +public class WriteStatementsCommand implements RyaKafkaClientCommand { +private static final Logger log = LoggerFactory.getLogger(WriteStatementsCommand.class); + +/** + * Command line parameters that are used by this command to configure itself. + */ +public static class WriteParameters extends KafkaParameters { +@Parameter(names = {"--statementsFile", "-f"}, required = true, description = "The file of RDF statements to load into Rya Streams.") +public String statementsFile; +} + +@Override +public String getCommand() { +return "write"; +} + +@Override +public String getDescription() { +return "Writes Statements to the specified Kafka topic."; +} + +@Override +public boolean validArguments(final String[] args) { +boolean valid = true; +try { +new JCommander(new WriteParameters(), args); +} catch(final ParameterException e) { +valid = false; +} +return valid; +} + +/** + * @return Describes what arguments may be provided to the command. + */ +@Override +public String getUsage() { +final JCommander parser = new JCommander(new WriteParameters()); + +final StringBuilder usage = new StringBuilder(); +parser.usage(usage); +return usage.toString(); +} + +@Override +public void execute(final String[] args) throws ArgumentsException, ExecutionException { +requireNonNull(args); + +// Parse the command line arguments. +final WriteParameters
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472637#comment-16472637 ] ASF GitHub Bot commented on RYA-487: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187730865 --- Diff: extras/kafka.connect/client/pom.xml --- @@ -0,0 +1,135 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + +org.apache.rya +rya.kafka.connect.parent +4.0.0-incubating-SNAPSHOT + + +rya.kafka.connect.client + +Apache Rya Kafka Connect - Client +Contains a client that may be used to load Statements into + a Kafka topic to be read by Kafka Connect. + + + + +org.apache.rya +rya.sail + + +org.apache.rya +rya.kafka.connect.api + + + + +org.eclipse.rdf4j +rdf4j-model + + +com.google.guava +guava + + +com.beust +jcommander + + +com.github.stephenc.findbugs +findbugs-annotations + + +org.apache.kafka +kafka-clients + + + --- End diff -- It does, good call. Done. > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472639#comment-16472639 ] ASF GitHub Bot commented on RYA-487: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187731028 --- Diff: extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/CLIDriver.java --- @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.client; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ArgumentsException; +import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ExecutionException; +import org.apache.rya.kafka.connect.client.command.ReadStatementsCommand; +import org.apache.rya.kafka.connect.client.command.WriteStatementsCommand; +import org.eclipse.rdf4j.model.Statement; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A CLI tool used to read/write {@link Statement}s to/from a Kafka topic using the format + * the Rya Kafka Connect Sinks expect. + */ +@DefaultAnnotation(NonNull.class) +public class CLIDriver { + +/** + * Maps from command strings to the object that performs the command. + */ +private static final ImmutableMapCOMMANDS; +static { +final Set commandClasses = new HashSet<>(); +commandClasses.add(ReadStatementsCommand.class); +commandClasses.add(WriteStatementsCommand.class); +final ImmutableMap.Builder builder = ImmutableMap.builder(); +for(final Class commandClass : commandClasses) { +try { +final RyaKafkaClientCommand command = commandClass.newInstance(); +builder.put(command.getCommand(), command); +} catch (InstantiationException | IllegalAccessException e) { +System.err.println("Could not run the application because a RyaKafkaClientCommand is missing its empty constructor."); +e.printStackTrace(); +} +} +COMMANDS = builder.build(); +} + +private static final String USAGE = makeUsage(COMMANDS); + +public static void main(final String[] args) { +// If no command provided or the command isn't recognized, then print the usage. +if (args.length == 0 || !COMMANDS.containsKey(args[0])) { +System.out.println(USAGE); +System.exit(1); +} + +// Fetch the command that will be executed. +final String command = args[0]; +final String[] commandArgs = Arrays.copyOfRange(args, 1, args.length); +final RyaKafkaClientCommand clientCommand = COMMANDS.get(command); + +// Print usage if the arguments are invalid for the command. +if(!clientCommand.validArguments(commandArgs)) { +System.out.println(clientCommand.getUsage()); +System.exit(1); +} + +// Execute the command. +try { --- End diff -- Done. > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187731028 --- Diff: extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/CLIDriver.java --- @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.client; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ArgumentsException; +import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ExecutionException; +import org.apache.rya.kafka.connect.client.command.ReadStatementsCommand; +import org.apache.rya.kafka.connect.client.command.WriteStatementsCommand; +import org.eclipse.rdf4j.model.Statement; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A CLI tool used to read/write {@link Statement}s to/from a Kafka topic using the format + * the Rya Kafka Connect Sinks expect. + */ +@DefaultAnnotation(NonNull.class) +public class CLIDriver { + +/** + * Maps from command strings to the object that performs the command. + */ +private static final ImmutableMapCOMMANDS; +static { +final Set commandClasses = new HashSet<>(); +commandClasses.add(ReadStatementsCommand.class); +commandClasses.add(WriteStatementsCommand.class); +final ImmutableMap.Builder builder = ImmutableMap.builder(); +for(final Class commandClass : commandClasses) { +try { +final RyaKafkaClientCommand command = commandClass.newInstance(); +builder.put(command.getCommand(), command); +} catch (InstantiationException | IllegalAccessException e) { +System.err.println("Could not run the application because a RyaKafkaClientCommand is missing its empty constructor."); +e.printStackTrace(); +} +} +COMMANDS = builder.build(); +} + +private static final String USAGE = makeUsage(COMMANDS); + +public static void main(final String[] args) { +// If no command provided or the command isn't recognized, then print the usage. +if (args.length == 0 || !COMMANDS.containsKey(args[0])) { +System.out.println(USAGE); +System.exit(1); +} + +// Fetch the command that will be executed. +final String command = args[0]; +final String[] commandArgs = Arrays.copyOfRange(args, 1, args.length); +final RyaKafkaClientCommand clientCommand = COMMANDS.get(command); + +// Print usage if the arguments are invalid for the command. +if(!clientCommand.validArguments(commandArgs)) { +System.out.println(clientCommand.getUsage()); +System.exit(1); +} + +// Execute the command. +try { --- End diff -- Done. ---
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472635#comment-16472635 ] ASF GitHub Bot commented on RYA-487: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187730515 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java --- @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import static java.util.Objects.requireNonNull; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.sail.Sail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.jcabi.manifests.Manifests; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Handles the common components required to write {@link Statement}s to Rya. + * + * Implementations of this class only need to specify functionality that is specific to the + * Rya implementation. + */ +@DefaultAnnotation(NonNull.class) +public abstract class RyaSinkTask extends SinkTask { +private static final Logger log = LoggerFactory.getLogger(RyaSinkTask.class); + +@Nullable +private SailRepository sailRepo = null; + +@Nullable +private SailRepositoryConnection conn = null; + +/** + * Throws an exception if the configured Rya Instance is not already installed + * within the configured database. + * + * @param taskConfig - The configuration values that were provided to the task. (not null) + * @throws ConnectException The configured Rya Instance is not installed to the configured database + * or we were unable to figure out if it is installed. + */ +protected abstract void checkRyaInstanceExists(final MaptaskConfig) throws ConnectException; + +/** + * Creates an initialized {@link Sail} object that may be used to write {@link Statement}s to the configured + * Rya Instance. + * + * @param taskConfig - Configures how the Sail object will be created. (not null) + * @return The created Sail object. + * @throws ConnectException The Sail object could not be made. + */ +protected abstract Sail makeSail(final Map taskConfig) throws ConnectException; + +@Override +public String version() { +return Manifests.exists("Build-Version") ? Manifests.read("Build-Version"): "UNKNOWN"; +} + +@Override +public void start(final Map props) throws ConnectException { +requireNonNull(props); + +// Ensure the configured Rya Instance is installed within the configured database. +checkRyaInstanceExists(props); + +// Create the Sail object that is connected to the Rya Instance. +final Sail sail = makeSail(props); +sailRepo = new SailRepository( sail ); +conn = sailRepo.getConnection(); +} + +@Override +public void put(final Collection records) { +requireNonNull(records); + +// Return immediately if there are no records to handle. +
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187730515 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java --- @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import static java.util.Objects.requireNonNull; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.sail.Sail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.jcabi.manifests.Manifests; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Handles the common components required to write {@link Statement}s to Rya. + * + * Implementations of this class only need to specify functionality that is specific to the + * Rya implementation. + */ +@DefaultAnnotation(NonNull.class) +public abstract class RyaSinkTask extends SinkTask { +private static final Logger log = LoggerFactory.getLogger(RyaSinkTask.class); + +@Nullable +private SailRepository sailRepo = null; + +@Nullable +private SailRepositoryConnection conn = null; + +/** + * Throws an exception if the configured Rya Instance is not already installed + * within the configured database. + * + * @param taskConfig - The configuration values that were provided to the task. (not null) + * @throws ConnectException The configured Rya Instance is not installed to the configured database + * or we were unable to figure out if it is installed. + */ +protected abstract void checkRyaInstanceExists(final MaptaskConfig) throws ConnectException; + +/** + * Creates an initialized {@link Sail} object that may be used to write {@link Statement}s to the configured + * Rya Instance. + * + * @param taskConfig - Configures how the Sail object will be created. (not null) + * @return The created Sail object. + * @throws ConnectException The Sail object could not be made. + */ +protected abstract Sail makeSail(final Map taskConfig) throws ConnectException; + +@Override +public String version() { +return Manifests.exists("Build-Version") ? Manifests.read("Build-Version"): "UNKNOWN"; +} + +@Override +public void start(final Map props) throws ConnectException { +requireNonNull(props); + +// Ensure the configured Rya Instance is installed within the configured database. +checkRyaInstanceExists(props); + +// Create the Sail object that is connected to the Rya Instance. +final Sail sail = makeSail(props); +sailRepo = new SailRepository( sail ); +conn = sailRepo.getConnection(); +} + +@Override +public void put(final Collection records) { +requireNonNull(records); + +// Return immediately if there are no records to handle. +if(records.isEmpty()) { +return; +} + +// If a transaction has not been started yet, then start one. +if(!conn.isActive()) { +conn.begin(); +} + +
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472633#comment-16472633 ] ASF GitHub Bot commented on RYA-487: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187730101 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConnector.java --- @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.connect.sink.SinkConnector; + +import com.jcabi.manifests.Manifests; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Handles the common components required to task {@link RyaSinkTask}s that write to Rya. + * + * Implementations of this class only need to specify functionality that is specific to the Rya implementation. + */ +@DefaultAnnotation(NonNull.class) +public abstract class RyaSinkConnector extends SinkConnector { + +/** + * Get the configuration that will be provided to the tasks when {@link #taskConfigs(int)} is invoked. + * + * Only called after start has been invoked + * + * @return The configuration object for the connector. + * @throws IllegalStateException Thrown if {@link SinkConnector#start(Map)} has not been invoked yet. + */ +protected abstract AbstractConfig getConfig() throws IllegalStateException; + +@Override +public String version() { +return Manifests.exists("Build-Version") ? Manifests.read("Build-Version") : "UNKNOWN"; +} + +@Override +public List
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187730101 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConnector.java --- @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.connect.sink.SinkConnector; + +import com.jcabi.manifests.Manifests; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Handles the common components required to task {@link RyaSinkTask}s that write to Rya. + * + * Implementations of this class only need to specify functionality that is specific to the Rya implementation. + */ +@DefaultAnnotation(NonNull.class) +public abstract class RyaSinkConnector extends SinkConnector { + +/** + * Get the configuration that will be provided to the tasks when {@link #taskConfigs(int)} is invoked. + * + * Only called after start has been invoked + * + * @return The configuration object for the connector. + * @throws IllegalStateException Thrown if {@link SinkConnector#start(Map)} has not been invoked yet. + */ +protected abstract AbstractConfig getConfig() throws IllegalStateException; + +@Override +public String version() { +return Manifests.exists("Build-Version") ? Manifests.read("Build-Version") : "UNKNOWN"; +} + +@Override +public List> taskConfigs(final int maxTasks) { +final List > configs = new ArrayList<>(maxTasks); +for(int i = 0; i < maxTasks; i++) { +configs.add( getConfig().originalsStrings() ); +} +return configs; +} + +@Override +public void stop() { +// Nothing to do since the RyaSinkconnector has no background monitoring. --- End diff -- Done. ---
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472629#comment-16472629 ] ASF GitHub Bot commented on RYA-487: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187729543 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java --- @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final MaptaskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + +// Connect to the instance of Accumulo. +final Connector connector; +try { +final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers()); +connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() )); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e); +} + +// Use a RyaClient to see if the configured instance exists. +try { +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +config.getUsername(), +config.getPassword().toCharArray(), +config.getClusterName(), +config.getZookeepers()); +final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector); + +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +config.getRyaInstanceName() + " has not been installed."); +} + +} catch (final RyaClientException e) { +throw new ConnectException("Unable to determine if the Rya Instance named " + +config.getRyaInstanceName() + " has been installed.", e); --- End diff -- Done. > Kafka Connect Rya Sink >
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187729813 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java --- @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.serialization.Deserializer; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.rio.RDFHandler; +import org.eclipse.rdf4j.rio.RDFHandlerException; +import org.eclipse.rdf4j.rio.RDFParseException; +import org.eclipse.rdf4j.rio.RDFParser; +import org.eclipse.rdf4j.rio.binary.BinaryRDFParserFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka {@link Deserializer} that is able to deserialize an RDF4J Rio Binary format serialized + * set of {@link Statement}s. + */ +@DefaultAnnotation(NonNull.class) +public class StatementsDeserializer implements Deserializer{ +private static final Logger log = LoggerFactory.getLogger(StatementsDeserializer.class); + +private static final BinaryRDFParserFactory PARSER_FACTORY = new BinaryRDFParserFactory(); + +@Override +public void configure(final Map configs, final boolean isKey) { +// Nothing to do. +} + +@Override +public Set deserialize(final String topic, final byte[] data) { +if(data == null || data.length == 0) { +// Return null because that is the contract of this method. +return null; +} + +try { +final RDFParser parser = PARSER_FACTORY.getParser(); +final Set statements = new HashSet<>(); + +parser.setRDFHandler(new RDFHandler() { +@Override +public void handleStatement(final Statement statement) throws RDFHandlerException { +log.debug("Statement: " + statement); +statements.add( statement ); +} + +@Override public void startRDF() throws RDFHandlerException { } +@Override public void handleNamespace(final String arg0, final String arg1) throws RDFHandlerException { } +@Override public void handleComment(final String arg0) throws RDFHandlerException { } +@Override public void endRDF() throws RDFHandlerException { } +}); + +parser.parse(new ByteArrayInputStream(data), null); +return statements; + +} catch(final RDFParseException | RDFHandlerException | IOException e) { +log.error("Could not deserialize a Set of VisibilityStatement objects using the RDF4J Rio Binray format.", e); --- End diff -- Done. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187729543 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java --- @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final MaptaskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + +// Connect to the instance of Accumulo. +final Connector connector; +try { +final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers()); +connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() )); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e); +} + +// Use a RyaClient to see if the configured instance exists. +try { +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +config.getUsername(), +config.getPassword().toCharArray(), +config.getClusterName(), +config.getZookeepers()); +final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector); + +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +config.getRyaInstanceName() + " has not been installed."); +} + +} catch (final RyaClientException e) { +throw new ConnectException("Unable to determine if the Rya Instance named " + +config.getRyaInstanceName() + " has been installed.", e); --- End diff -- Done. ---
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472627#comment-16472627 ] ASF GitHub Bot commented on RYA-487: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187729286 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java --- @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final MaptaskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + +// Connect to the instance of Accumulo. +final Connector connector; +try { +final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers()); +connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() )); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e); +} + +// Use a RyaClient to see if the configured instance exists. +try { +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +config.getUsername(), +config.getPassword().toCharArray(), +config.getClusterName(), +config.getZookeepers()); +final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector); + +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +config.getRyaInstanceName() + " has not been installed."); --- End diff -- I'm assuming rya.api. Done. > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187729286 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java --- @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final MaptaskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + +// Connect to the instance of Accumulo. +final Connector connector; +try { +final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers()); +connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() )); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e); +} + +// Use a RyaClient to see if the configured instance exists. +try { +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +config.getUsername(), +config.getPassword().toCharArray(), +config.getClusterName(), +config.getZookeepers()); +final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector); + +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +config.getRyaInstanceName() + " has not been installed."); --- End diff -- I'm assuming rya.api. Done. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187729157 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java --- @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final MaptaskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + +// Connect to the instance of Accumulo. +final Connector connector; +try { +final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers()); +connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() )); +} catch (AccumuloException | AccumuloSecurityException e) { --- End diff -- Done. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187727065 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java --- @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final MaptaskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + +// Connect to the instance of Accumulo. +final Connector connector; +try { +final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers()); +connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() )); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e); +} + +// Use a RyaClient to see if the configured instance exists. +try { +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +config.getUsername(), +config.getPassword().toCharArray(), +config.getClusterName(), +config.getZookeepers()); +final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector); + +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +config.getRyaInstanceName() + " has not been installed."); +} + +} catch (final RyaClientException e) { +throw new ConnectException("Unable to determine if the Rya Instance named " + +config.getRyaInstanceName() + " has been installed.", e); +} +} + +@Override +protected Sail makeSail(final Map taskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new
[GitHub] incubator-rya pull request #295: RYA-494 Fixed a bug where the shell was not...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/295#discussion_r187725947 --- Diff: extras/shell/src/test/java/org/apache/rya/shell/AccumuloRyaCommandsIT.java --- @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.shell; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; + +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.shell.util.InstallPrompt; +import org.apache.rya.shell.util.PasswordPrompt; +import org.apache.rya.shell.util.SparqlPrompt; +import org.junit.Test; +import org.springframework.context.ApplicationContext; +import org.springframework.shell.Bootstrap; +import org.springframework.shell.core.CommandResult; +import org.springframework.shell.core.JLineShellComponent; + +import com.google.common.base.Optional; + +/** + * Integration tests the methods of {@link RyaCommands}. --- End diff -- Done. ---
[jira] [Commented] (RYA-494) Shell insert and query bug.
[ https://issues.apache.org/jira/browse/RYA-494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472608#comment-16472608 ] ASF GitHub Bot commented on RYA-494: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/295#discussion_r187725795 --- Diff: extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloExecuteSparqlQueryIT.java --- @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.accumulo; + +import static org.junit.Assert.assertEquals; + +import java.nio.file.Paths; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.test.accumulo.AccumuloITBase; +import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.model.Resource; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.model.ValueFactory; +import org.eclipse.rdf4j.model.impl.SimpleValueFactory; +import org.eclipse.rdf4j.query.BindingSet; +import org.eclipse.rdf4j.query.TupleQueryResult; +import org.eclipse.rdf4j.rio.RDFFormat; +import org.junit.Test; + +import com.google.common.collect.Sets; + +/** + * Integration tests the methods of {@link }. + */ +public class AccumuloExecuteSparqlQueryIT extends AccumuloITBase { + +@Test +public void queryFindsAllLoadedStatements_fromSet() throws Exception { +// Using the Rya Client, install an instance of Rya for the test. +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +getUsername(), +getPassword().toCharArray(), +getInstanceName(), +getZookeepers()); + +final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, super.getConnector()); + +final String ryaInstance = UUID.randomUUID().toString().replace('-', '_'); +client.getInstall().install(ryaInstance, InstallConfiguration.builder().build()); + +// Load some data into the instance. +final ValueFactory vf = SimpleValueFactory.getInstance(); +final Set statements = Sets.newHashSet( +vf.createStatement(vf.createIRI("urn:Alice"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Bob")), +vf.createStatement(vf.createIRI("urn:Bob"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Alice")), +vf.createStatement(vf.createIRI("urn:Bob"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Charlie")), +vf.createStatement(vf.createIRI("urn:Charlie"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Alice")), +vf.createStatement(vf.createIRI("urn:David"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Eve")), +vf.createStatement(vf.createIRI("urn:Eve"), vf.createIRI("urn:listensTo"), vf.createIRI("urn:Bob"))); +client.getLoadStatements().loadStatements(ryaInstance, statements); + +// Execute a query. +final Set fetched = new HashSet<>(); +try(final TupleQueryResult result = client.getExecuteSparqlQuery().executeSparqlQuery(ryaInstance, "SELECT * WHERE { ?s ?p ?o }")) { +while(result.hasNext()) { +final BindingSet bs = result.next(); + +// If this is the statement that indicates the Rya version + if(bs.getBinding("p").getValue().equals(vf.createIRI("urn:org.apache.rya/2012/05#version"))) { --- End diff -- Done. > Shell insert and query bug. > --- > > Key: RYA-494 > URL: https://issues.apache.org/jira/browse/RYA-494 > Project: Rya > Issue Type: Bug >
[GitHub] incubator-rya pull request #295: RYA-494 Fixed a bug where the shell was not...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/295#discussion_r187725865 --- Diff: extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloExecuteSparqlQueryIT.java --- @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.accumulo; + +import static org.junit.Assert.assertEquals; + +import java.nio.file.Paths; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.test.accumulo.AccumuloITBase; +import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.model.Resource; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.model.ValueFactory; +import org.eclipse.rdf4j.model.impl.SimpleValueFactory; +import org.eclipse.rdf4j.query.BindingSet; +import org.eclipse.rdf4j.query.TupleQueryResult; +import org.eclipse.rdf4j.rio.RDFFormat; +import org.junit.Test; + +import com.google.common.collect.Sets; + +/** + * Integration tests the methods of {@link }. + */ +public class AccumuloExecuteSparqlQueryIT extends AccumuloITBase { + +@Test +public void queryFindsAllLoadedStatements_fromSet() throws Exception { +// Using the Rya Client, install an instance of Rya for the test. +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +getUsername(), +getPassword().toCharArray(), +getInstanceName(), +getZookeepers()); + +final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, super.getConnector()); + +final String ryaInstance = UUID.randomUUID().toString().replace('-', '_'); +client.getInstall().install(ryaInstance, InstallConfiguration.builder().build()); + +// Load some data into the instance. +final ValueFactory vf = SimpleValueFactory.getInstance(); +final Set statements = Sets.newHashSet( +vf.createStatement(vf.createIRI("urn:Alice"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Bob")), +vf.createStatement(vf.createIRI("urn:Bob"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Alice")), +vf.createStatement(vf.createIRI("urn:Bob"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Charlie")), +vf.createStatement(vf.createIRI("urn:Charlie"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Alice")), +vf.createStatement(vf.createIRI("urn:David"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Eve")), +vf.createStatement(vf.createIRI("urn:Eve"), vf.createIRI("urn:listensTo"), vf.createIRI("urn:Bob"))); +client.getLoadStatements().loadStatements(ryaInstance, statements); + +// Execute a query. +final Set fetched = new HashSet<>(); +try(final TupleQueryResult result = client.getExecuteSparqlQuery().executeSparqlQuery(ryaInstance, "SELECT * WHERE { ?s ?p ?o }")) { +while(result.hasNext()) { +final BindingSet bs = result.next(); + +// If this is the statement that indicates the Rya version + if(bs.getBinding("p").getValue().equals(vf.createIRI("urn:org.apache.rya/2012/05#version"))) { +continue; +} + +// Otherwise add it to the list of fetched statements. +fetched.add( vf.createStatement( +(Resource)bs.getBinding("s").getValue(), +(IRI)bs.getBinding("p").getValue(), +bs.getBinding("o").getValue()) ); +} +} + +// Show it resulted in the expected
[GitHub] incubator-rya pull request #295: RYA-494 Fixed a bug where the shell was not...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/295#discussion_r187725795 --- Diff: extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloExecuteSparqlQueryIT.java --- @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.accumulo; + +import static org.junit.Assert.assertEquals; + +import java.nio.file.Paths; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.test.accumulo.AccumuloITBase; +import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.model.Resource; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.model.ValueFactory; +import org.eclipse.rdf4j.model.impl.SimpleValueFactory; +import org.eclipse.rdf4j.query.BindingSet; +import org.eclipse.rdf4j.query.TupleQueryResult; +import org.eclipse.rdf4j.rio.RDFFormat; +import org.junit.Test; + +import com.google.common.collect.Sets; + +/** + * Integration tests the methods of {@link }. + */ +public class AccumuloExecuteSparqlQueryIT extends AccumuloITBase { + +@Test +public void queryFindsAllLoadedStatements_fromSet() throws Exception { +// Using the Rya Client, install an instance of Rya for the test. +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +getUsername(), +getPassword().toCharArray(), +getInstanceName(), +getZookeepers()); + +final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, super.getConnector()); + +final String ryaInstance = UUID.randomUUID().toString().replace('-', '_'); +client.getInstall().install(ryaInstance, InstallConfiguration.builder().build()); + +// Load some data into the instance. +final ValueFactory vf = SimpleValueFactory.getInstance(); +final Set statements = Sets.newHashSet( +vf.createStatement(vf.createIRI("urn:Alice"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Bob")), +vf.createStatement(vf.createIRI("urn:Bob"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Alice")), +vf.createStatement(vf.createIRI("urn:Bob"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Charlie")), +vf.createStatement(vf.createIRI("urn:Charlie"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Alice")), +vf.createStatement(vf.createIRI("urn:David"), vf.createIRI("urn:talksTo"), vf.createIRI("urn:Eve")), +vf.createStatement(vf.createIRI("urn:Eve"), vf.createIRI("urn:listensTo"), vf.createIRI("urn:Bob"))); +client.getLoadStatements().loadStatements(ryaInstance, statements); + +// Execute a query. +final Set fetched = new HashSet<>(); +try(final TupleQueryResult result = client.getExecuteSparqlQuery().executeSparqlQuery(ryaInstance, "SELECT * WHERE { ?s ?p ?o }")) { +while(result.hasNext()) { +final BindingSet bs = result.next(); + +// If this is the statement that indicates the Rya version + if(bs.getBinding("p").getValue().equals(vf.createIRI("urn:org.apache.rya/2012/05#version"))) { --- End diff -- Done. ---
[GitHub] incubator-rya pull request #295: RYA-494 Fixed a bug where the shell was not...
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/295#discussion_r187724995 --- Diff: extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloExecuteSparqlQueryIT.java --- @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.accumulo; + +import static org.junit.Assert.assertEquals; + +import java.nio.file.Paths; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.test.accumulo.AccumuloITBase; +import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.model.Resource; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.model.ValueFactory; +import org.eclipse.rdf4j.model.impl.SimpleValueFactory; +import org.eclipse.rdf4j.query.BindingSet; +import org.eclipse.rdf4j.query.TupleQueryResult; +import org.eclipse.rdf4j.rio.RDFFormat; +import org.junit.Test; + +import com.google.common.collect.Sets; + +/** + * Integration tests the methods of {@link }. --- End diff -- Done. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187717175 --- Diff: extras/kafka.connect/mongo-it/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTaskIT.java --- @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.mongo.MongoConnectionDetails; +import org.apache.rya.api.client.mongo.MongoRyaClientFactory; +import org.apache.rya.test.mongo.MongoITBase; +import org.junit.Test; + +/** + * Integration tests the methods of {@link MongoRyaSinkTask}. + */ +public class MongoRyaSinkTaskIT extends MongoITBase { + +@Test +public void instanceExists() throws Exception { +// Install an instance of Rya. +final String ryaInstanceName = "rya"; +final MongoConnectionDetails connectionDetails = new MongoConnectionDetails( +super.getMongoHostname(), +super.getMongoPort(), +Optional.empty(), +Optional.empty()); + +final InstallConfiguration installConfig = InstallConfiguration.builder() +.setEnableTableHashPrefix(false) +.setEnableEntityCentricIndex(false) +.setEnableFreeTextIndex(false) +.setEnableTemporalIndex(false) +.setEnablePcjIndex(false) +.setEnableGeoIndex(false) +.build(); + +final RyaClient ryaClient = MongoRyaClientFactory.build(connectionDetails, super.getMongoClient()); +ryaClient.getInstall().install(ryaInstanceName, installConfig); + +// Create the task that will be tested. +final MongoRyaSinkTask task = new MongoRyaSinkTask(); + +try { +// Configure the task to use the embedded Mongo DB instance for Rya. +final Mapconfig = new HashMap<>(); +config.put(MongoRyaSinkConfig.HOSTNAME, super.getMongoHostname()); +config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort()); +config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, "rya"); + +// This will pass because the Rya instance exists. +task.start(config); +} finally { +task.stop(); +} +} + +@Test(expected = ConnectException.class) +public void instanceDoesNotExist() throws Exception { +// Create the task that will be tested. +final MongoRyaSinkTask task = new MongoRyaSinkTask(); + +try { +// Configure the task to use the embedded Mongo DB instance for Rya. +final Map config = new HashMap<>(); +config.put(MongoRyaSinkConfig.HOSTNAME, super.getMongoHostname()); +config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort()); +config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, "instance-does-not-exist"); + +// Starting the task will fail because the Rya instance does not exist. +task.start(config); +} finally { +task.stop(); +} +} + +// TODO show that inserts using visibilities work. --- End diff -- TODO? ---
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472577#comment-16472577 ] ASF GitHub Bot commented on RYA-487: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187718927 --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java --- @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkConnector} that uses a Mongo DB Rya backend when creating tasks. + */ +@DefaultAnnotation(NonNull.class) +public class MongoRyaSinkConnector extends RyaSinkConnector { + +@Nullable +private MongoRyaSinkConfig config = null; + +@Override +public void start(final Mapprops) { +this.config = new MongoRyaSinkConfig( props ); +} + +@Override +protected AbstractConfig getConfig() { +if(config == null) { +throw new IllegalStateException("The configuration has not been set yet. Invoke start(props) first."); --- End diff -- same start(Map) > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472576#comment-16472576 ] ASF GitHub Bot commented on RYA-487: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187717488 --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java --- @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka Connect configuration that is used to configure {@link MongoRyaSinkConnector}s and {@link MongoRyaSinkTask}s. + */ +@DefaultAnnotation(NonNull.class) +public class MongoRyaSinkConfig extends RyaSinkConfig { + +public static final String HOSTNAME = "mongo.hostname"; +private static final String HOSTNAME_DOC = "The Mongo DB hostname the Sail connections wlll use."; --- End diff -- typo: wlll > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187717488 --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java --- @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka Connect configuration that is used to configure {@link MongoRyaSinkConnector}s and {@link MongoRyaSinkTask}s. + */ +@DefaultAnnotation(NonNull.class) +public class MongoRyaSinkConfig extends RyaSinkConfig { + +public static final String HOSTNAME = "mongo.hostname"; +private static final String HOSTNAME_DOC = "The Mongo DB hostname the Sail connections wlll use."; --- End diff -- typo: wlll ---
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472575#comment-16472575 ] ASF GitHub Bot commented on RYA-487: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187717175 --- Diff: extras/kafka.connect/mongo-it/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTaskIT.java --- @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.mongo.MongoConnectionDetails; +import org.apache.rya.api.client.mongo.MongoRyaClientFactory; +import org.apache.rya.test.mongo.MongoITBase; +import org.junit.Test; + +/** + * Integration tests the methods of {@link MongoRyaSinkTask}. + */ +public class MongoRyaSinkTaskIT extends MongoITBase { + +@Test +public void instanceExists() throws Exception { +// Install an instance of Rya. +final String ryaInstanceName = "rya"; +final MongoConnectionDetails connectionDetails = new MongoConnectionDetails( +super.getMongoHostname(), +super.getMongoPort(), +Optional.empty(), +Optional.empty()); + +final InstallConfiguration installConfig = InstallConfiguration.builder() +.setEnableTableHashPrefix(false) +.setEnableEntityCentricIndex(false) +.setEnableFreeTextIndex(false) +.setEnableTemporalIndex(false) +.setEnablePcjIndex(false) +.setEnableGeoIndex(false) +.build(); + +final RyaClient ryaClient = MongoRyaClientFactory.build(connectionDetails, super.getMongoClient()); +ryaClient.getInstall().install(ryaInstanceName, installConfig); + +// Create the task that will be tested. +final MongoRyaSinkTask task = new MongoRyaSinkTask(); + +try { +// Configure the task to use the embedded Mongo DB instance for Rya. +final Mapconfig = new HashMap<>(); +config.put(MongoRyaSinkConfig.HOSTNAME, super.getMongoHostname()); +config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort()); +config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, "rya"); + +// This will pass because the Rya instance exists. +task.start(config); +} finally { +task.stop(); +} +} + +@Test(expected = ConnectException.class) +public void instanceDoesNotExist() throws Exception { +// Create the task that will be tested. +final MongoRyaSinkTask task = new MongoRyaSinkTask(); + +try { +// Configure the task to use the embedded Mongo DB instance for Rya. +final Map config = new HashMap<>(); +config.put(MongoRyaSinkConfig.HOSTNAME, super.getMongoHostname()); +config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort()); +config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, "instance-does-not-exist"); + +// Starting the task will fail because the Rya instance does not exist. +task.start(config); +} finally { +task.stop(); +} +} + +// TODO show that inserts using visibilities work. --- End diff -- TODO? > Kafka Connect Rya Sink > -- > > Key: RYA-487 >
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187718927 --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java --- @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkConnector} that uses a Mongo DB Rya backend when creating tasks. + */ +@DefaultAnnotation(NonNull.class) +public class MongoRyaSinkConnector extends RyaSinkConnector { + +@Nullable +private MongoRyaSinkConfig config = null; + +@Override +public void start(final Mapprops) { +this.config = new MongoRyaSinkConfig( props ); +} + +@Override +protected AbstractConfig getConfig() { +if(config == null) { +throw new IllegalStateException("The configuration has not been set yet. Invoke start(props) first."); --- End diff -- same start(Map) ---
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472535#comment-16472535 ] ASF GitHub Bot commented on RYA-487: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187698179 --- Diff: dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java --- @@ -62,14 +62,14 @@ public AccumuloRdfConfiguration() { super(); } -public AccumuloRdfConfiguration(Configuration other) { +public AccumuloRdfConfiguration(final Configuration other) { super(other); } -public AccumuloRdfConfigurationBuilder getBuilder() { +public static AccumuloRdfConfigurationBuilder getBuilder() { --- End diff -- isn't the convention usually builder() for static builder functions? > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472536#comment-16472536 ] ASF GitHub Bot commented on RYA-487: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187701955 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java --- @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final MaptaskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + +// Connect to the instance of Accumulo. +final Connector connector; +try { +final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers()); +connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() )); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e); +} + +// Use a RyaClient to see if the configured instance exists. +try { +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( --- End diff -- a quick future work nice to have idea: we have a ConnectionDetails->Configuration, but not the other way around. would make stuff like this pretty easy. > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472540#comment-16472540 ] ASF GitHub Bot commented on RYA-487: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187702555 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java --- @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final MaptaskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + +// Connect to the instance of Accumulo. +final Connector connector; +try { +final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers()); +connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() )); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e); +} + +// Use a RyaClient to see if the configured instance exists. +try { +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +config.getUsername(), +config.getPassword().toCharArray(), +config.getClusterName(), +config.getZookeepers()); +final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector); + +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +config.getRyaInstanceName() + " has not been installed."); +} + +} catch (final RyaClientException e) { +throw new ConnectException("Unable to determine if the Rya Instance named " + +config.getRyaInstanceName() + " has been installed.", e); +} +} + +@Override +protected Sail
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187698795 --- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java --- @@ -438,6 +438,9 @@ public static boolean getUsePcjUpdaterIndex(final Configuration conf) { return Optional.fromNullable(conf.get(FLUO_APP_NAME)); } +public static void setUseMongo(final Configuration conf, final boolean useMongo) { --- End diff -- can you add this to the constructor of the MongoDbRDFConfiguration constructor? if we're going to keep using this field, it would make sense for that to set it. ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187708244 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java --- @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when creating tasks. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkConnector extends RyaSinkConnector { + +@Nullable +private AccumuloRyaSinkConfig config = null; + +@Override +public void start(final Mapprops) { +this.config = new AccumuloRyaSinkConfig( props ); +} + +@Override +protected AbstractConfig getConfig() { +if(config == null) { +throw new IllegalStateException("The configuration has not been set yet. Invoke start(props) first."); --- End diff -- usually the doc'd function has the parameters be the type, not the name: start(Map) ---
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472539#comment-16472539 ] ASF GitHub Bot commented on RYA-487: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187709021 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java --- @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import static java.util.Objects.requireNonNull; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.sail.Sail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.jcabi.manifests.Manifests; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Handles the common components required to write {@link Statement}s to Rya. + * + * Implementations of this class only need to specify functionality that is specific to the + * Rya implementation. + */ +@DefaultAnnotation(NonNull.class) +public abstract class RyaSinkTask extends SinkTask { +private static final Logger log = LoggerFactory.getLogger(RyaSinkTask.class); + +@Nullable +private SailRepository sailRepo = null; + +@Nullable +private SailRepositoryConnection conn = null; + +/** + * Throws an exception if the configured Rya Instance is not already installed + * within the configured database. + * + * @param taskConfig - The configuration values that were provided to the task. (not null) + * @throws ConnectException The configured Rya Instance is not installed to the configured database + * or we were unable to figure out if it is installed. + */ +protected abstract void checkRyaInstanceExists(final MaptaskConfig) throws ConnectException; + +/** + * Creates an initialized {@link Sail} object that may be used to write {@link Statement}s to the configured + * Rya Instance. + * + * @param taskConfig - Configures how the Sail object will be created. (not null) + * @return The created Sail object. + * @throws ConnectException The Sail object could not be made. + */ +protected abstract Sail makeSail(final Map taskConfig) throws ConnectException; + +@Override +public String version() { +return Manifests.exists("Build-Version") ? Manifests.read("Build-Version"): "UNKNOWN"; +} + +@Override +public void start(final Map props) throws ConnectException { +requireNonNull(props); + +// Ensure the configured Rya Instance is installed within the configured database. +checkRyaInstanceExists(props); + +// Create the Sail object that is connected to the Rya Instance. +final Sail sail = makeSail(props); +sailRepo = new SailRepository( sail ); +conn = sailRepo.getConnection(); +} + +@Override +public void put(final Collection records) { +requireNonNull(records); + +// Return immediately if there are no records to handle. +
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472534#comment-16472534 ] ASF GitHub Bot commented on RYA-487: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187699543 --- Diff: extras/kafka.connect/README.md --- @@ -0,0 +1,22 @@ + + +The parent project for all Rya Kafka Connect work. All projects thare are part +of that system must use this project's pom as their parent pom. --- End diff -- typo All projects that* are > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187708536 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java --- @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import static java.util.Objects.requireNonNull; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.sail.Sail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.jcabi.manifests.Manifests; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Handles the common components required to write {@link Statement}s to Rya. + * + * Implementations of this class only need to specify functionality that is specific to the + * Rya implementation. + */ +@DefaultAnnotation(NonNull.class) --- End diff -- maybe I'm not clear what this annotation does, but again, all the fields are declared Nullable ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187698179 --- Diff: dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java --- @@ -62,14 +62,14 @@ public AccumuloRdfConfiguration() { super(); } -public AccumuloRdfConfiguration(Configuration other) { +public AccumuloRdfConfiguration(final Configuration other) { super(other); } -public AccumuloRdfConfigurationBuilder getBuilder() { +public static AccumuloRdfConfigurationBuilder getBuilder() { --- End diff -- isn't the convention usually builder() for static builder functions? ---
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472542#comment-16472542 ] ASF GitHub Bot commented on RYA-487: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187702276 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java --- @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final MaptaskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + +// Connect to the instance of Accumulo. +final Connector connector; +try { +final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers()); +connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() )); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e); +} + +// Use a RyaClient to see if the configured instance exists. +try { +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +config.getUsername(), +config.getPassword().toCharArray(), +config.getClusterName(), +config.getZookeepers()); +final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector); + +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +config.getRyaInstanceName() + " has not been installed."); --- End diff -- LogUtils from which package? > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472545#comment-16472545 ] ASF GitHub Bot commented on RYA-487: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187701248 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java --- @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when creating tasks. + */ +@DefaultAnnotation(NonNull.class) --- End diff -- since there is one field that is marked nullable, is this annotation usefull? > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472537#comment-16472537 ] ASF GitHub Bot commented on RYA-487: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187698795 --- Diff: extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java --- @@ -438,6 +438,9 @@ public static boolean getUsePcjUpdaterIndex(final Configuration conf) { return Optional.fromNullable(conf.get(FLUO_APP_NAME)); } +public static void setUseMongo(final Configuration conf, final boolean useMongo) { --- End diff -- can you add this to the constructor of the MongoDbRDFConfiguration constructor? if we're going to keep using this field, it would make sense for that to set it. > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187702555 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java --- @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final MaptaskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + +// Connect to the instance of Accumulo. +final Connector connector; +try { +final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers()); +connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() )); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e); +} + +// Use a RyaClient to see if the configured instance exists. +try { +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +config.getUsername(), +config.getPassword().toCharArray(), +config.getClusterName(), +config.getZookeepers()); +final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector); + +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +config.getRyaInstanceName() + " has not been installed."); +} + +} catch (final RyaClientException e) { +throw new ConnectException("Unable to determine if the Rya Instance named " + +config.getRyaInstanceName() + " has been installed.", e); +} +} + +@Override +protected Sail makeSail(final Map taskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187706947 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java --- @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api; + +import java.io.ByteArrayOutputStream; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.serialization.Serializer; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.rio.RDFWriter; +import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s + * using the RDF4J Rio Binary format. --- End diff -- i feel like it might be worthwhile to mention that you use the RDFParser and RDFWriter here and the Deserializer respectively. ---
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472538#comment-16472538 ] ASF GitHub Bot commented on RYA-487: Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187706947 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java --- @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api; + +import java.io.ByteArrayOutputStream; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.serialization.Serializer; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.rio.RDFWriter; +import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s + * using the RDF4J Rio Binary format. --- End diff -- i feel like it might be worthwhile to mention that you use the RDFParser and RDFWriter here and the Deserializer respectively. > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187709021 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java --- @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import static java.util.Objects.requireNonNull; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.sail.Sail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.jcabi.manifests.Manifests; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Handles the common components required to write {@link Statement}s to Rya. + * + * Implementations of this class only need to specify functionality that is specific to the + * Rya implementation. + */ +@DefaultAnnotation(NonNull.class) +public abstract class RyaSinkTask extends SinkTask { +private static final Logger log = LoggerFactory.getLogger(RyaSinkTask.class); + +@Nullable +private SailRepository sailRepo = null; + +@Nullable +private SailRepositoryConnection conn = null; + +/** + * Throws an exception if the configured Rya Instance is not already installed + * within the configured database. + * + * @param taskConfig - The configuration values that were provided to the task. (not null) + * @throws ConnectException The configured Rya Instance is not installed to the configured database + * or we were unable to figure out if it is installed. + */ +protected abstract void checkRyaInstanceExists(final MaptaskConfig) throws ConnectException; + +/** + * Creates an initialized {@link Sail} object that may be used to write {@link Statement}s to the configured + * Rya Instance. + * + * @param taskConfig - Configures how the Sail object will be created. (not null) + * @return The created Sail object. + * @throws ConnectException The Sail object could not be made. + */ +protected abstract Sail makeSail(final Map taskConfig) throws ConnectException; + +@Override +public String version() { +return Manifests.exists("Build-Version") ? Manifests.read("Build-Version"): "UNKNOWN"; +} + +@Override +public void start(final Map props) throws ConnectException { +requireNonNull(props); + +// Ensure the configured Rya Instance is installed within the configured database. +checkRyaInstanceExists(props); + +// Create the Sail object that is connected to the Rya Instance. +final Sail sail = makeSail(props); +sailRepo = new SailRepository( sail ); +conn = sailRepo.getConnection(); +} + +@Override +public void put(final Collection records) { +requireNonNull(records); + +// Return immediately if there are no records to handle. +if(records.isEmpty()) { +return; +} + +// If a transaction has not been started yet, then start one. +if(!conn.isActive()) { +conn.begin(); +} + +
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187699543 --- Diff: extras/kafka.connect/README.md --- @@ -0,0 +1,22 @@ + + +The parent project for all Rya Kafka Connect work. All projects thare are part +of that system must use this project's pom as their parent pom. --- End diff -- typo All projects that* are ---
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user isper3at commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187701248 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java --- @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import java.util.Map; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when creating tasks. + */ +@DefaultAnnotation(NonNull.class) --- End diff -- since there is one field that is marked nullable, is this annotation usefull? ---
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472225#comment-16472225 ] ASF GitHub Bot commented on RYA-487: Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187667486 --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java --- @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import static java.util.Objects.requireNonNull; + +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.mongo.MongoConnectionDetails; +import org.apache.rya.api.client.mongo.MongoRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import com.google.common.base.Strings; +import com.mongodb.MongoClient; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class MongoRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final MaptaskConfig) throws IllegalStateException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig); +@Nullable +final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername(); +@Nullable +final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray(); + +// Connect a Mongo Client to the configured Mongo DB instance. +final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort()); +final boolean hasCredentials = username != null && password != null; + +try(MongoClient mongoClient = hasCredentials ? +new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) : +new MongoClient(serverAddr)) { +// Use a RyaClient to see if the configured instance exists. +// Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with. +final MongoConnectionDetails connectionDetails = new MongoConnectionDetails( +config.getHostname(), +config.getPort(), +Optional.ofNullable(username), +Optional.ofNullable(password)); + +final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient); +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187667486 --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java --- @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import static java.util.Objects.requireNonNull; + +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.mongo.MongoConnectionDetails; +import org.apache.rya.api.client.mongo.MongoRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import com.google.common.base.Strings; +import com.mongodb.MongoClient; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class MongoRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final MaptaskConfig) throws IllegalStateException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig); +@Nullable +final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername(); +@Nullable +final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray(); + +// Connect a Mongo Client to the configured Mongo DB instance. +final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort()); +final boolean hasCredentials = username != null && password != null; + +try(MongoClient mongoClient = hasCredentials ? +new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) : +new MongoClient(serverAddr)) { +// Use a RyaClient to see if the configured instance exists. +// Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with. +final MongoConnectionDetails connectionDetails = new MongoConnectionDetails( +config.getHostname(), +config.getPort(), +Optional.ofNullable(username), +Optional.ofNullable(password)); + +final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient); +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +config.getRyaInstanceName() + " has not been installed."); +} +} catch(final RyaClientException e) { +throw new ConnectException("Unable to determine if the Rya
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472224#comment-16472224 ] ASF GitHub Bot commented on RYA-487: Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187667235 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java --- @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final MaptaskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + +// Connect to the instance of Accumulo. +final Connector connector; +try { +final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers()); +connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() )); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e); +} + +// Use a RyaClient to see if the configured instance exists. +try { +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +config.getUsername(), +config.getPassword().toCharArray(), +config.getClusterName(), +config.getZookeepers()); +final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector); + +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +config.getRyaInstanceName() + " has not been installed."); +} + +} catch (final RyaClientException e) { +throw new ConnectException("Unable to determine if the Rya Instance named " + +config.getRyaInstanceName() + " has been installed.", e); +} +} + +@Override +protected Sail
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187667235 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java --- @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final MaptaskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + +// Connect to the instance of Accumulo. +final Connector connector; +try { +final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers()); +connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() )); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e); +} + +// Use a RyaClient to see if the configured instance exists. +try { +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +config.getUsername(), +config.getPassword().toCharArray(), +config.getClusterName(), +config.getZookeepers()); +final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector); + +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +config.getRyaInstanceName() + " has not been installed."); +} + +} catch (final RyaClientException e) { +throw new ConnectException("Unable to determine if the Rya Instance named " + +config.getRyaInstanceName() + " has been installed.", e); +} +} + +@Override +protected Sail makeSail(final Map taskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472162#comment-16472162 ] ASF GitHub Bot commented on RYA-487: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187657223 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java --- @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final MaptaskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + +// Connect to the instance of Accumulo. +final Connector connector; +try { +final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers()); +connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() )); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e); +} + +// Use a RyaClient to see if the configured instance exists. +try { +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +config.getUsername(), +config.getPassword().toCharArray(), +config.getClusterName(), +config.getZookeepers()); +final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector); + +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +config.getRyaInstanceName() + " has not been installed."); +} + +} catch (final RyaClientException e) { +throw new ConnectException("Unable to determine if the Rya Instance named " + +config.getRyaInstanceName() + " has been installed.", e); +} +} + +@Override +protected Sail
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187657223 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java --- @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final MaptaskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + +// Connect to the instance of Accumulo. +final Connector connector; +try { +final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers()); +connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() )); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e); +} + +// Use a RyaClient to see if the configured instance exists. +try { +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +config.getUsername(), +config.getPassword().toCharArray(), +config.getClusterName(), +config.getZookeepers()); +final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector); + +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +config.getRyaInstanceName() + " has not been installed."); +} + +} catch (final RyaClientException e) { +throw new ConnectException("Unable to determine if the Rya Instance named " + +config.getRyaInstanceName() + " has been installed.", e); +} +} + +@Override +protected Sail makeSail(final Map taskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472160#comment-16472160 ] ASF GitHub Bot commented on RYA-487: Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187657023 --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java --- @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import static java.util.Objects.requireNonNull; + +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.mongo.MongoConnectionDetails; +import org.apache.rya.api.client.mongo.MongoRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import com.google.common.base.Strings; +import com.mongodb.MongoClient; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class MongoRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final MaptaskConfig) throws IllegalStateException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig); +@Nullable +final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername(); +@Nullable +final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray(); + +// Connect a Mongo Client to the configured Mongo DB instance. +final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort()); +final boolean hasCredentials = username != null && password != null; + +try(MongoClient mongoClient = hasCredentials ? +new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) : +new MongoClient(serverAddr)) { +// Use a RyaClient to see if the configured instance exists. +// Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with. +final MongoConnectionDetails connectionDetails = new MongoConnectionDetails( +config.getHostname(), +config.getPort(), +Optional.ofNullable(username), +Optional.ofNullable(password)); + +final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient); +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +
[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks
Github user kchilton2 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187657023 --- Diff: extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java --- @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.mongo; + +import static java.util.Objects.requireNonNull; + +import java.util.Arrays; +import java.util.Map; +import java.util.Optional; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.mongo.MongoConnectionDetails; +import org.apache.rya.api.client.mongo.MongoRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import com.google.common.base.Strings; +import com.mongodb.MongoClient; +import com.mongodb.MongoCredential; +import com.mongodb.ServerAddress; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class MongoRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final MaptaskConfig) throws IllegalStateException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final MongoRyaSinkConfig config = new MongoRyaSinkConfig(taskConfig); +@Nullable +final String username = Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername(); +@Nullable +final char[] password = Strings.isNullOrEmpty(config.getPassword()) ? null : config.getPassword().toCharArray(); + +// Connect a Mongo Client to the configured Mongo DB instance. +final ServerAddress serverAddr = new ServerAddress(config.getHostname(), config.getPort()); +final boolean hasCredentials = username != null && password != null; + +try(MongoClient mongoClient = hasCredentials ? +new MongoClient(serverAddr, Arrays.asList(MongoCredential.createCredential(username, config.getRyaInstanceName(), password))) : +new MongoClient(serverAddr)) { +// Use a RyaClient to see if the configured instance exists. +// Create the Mongo Connection Details that describe the Mongo DB Server we are interacting with. +final MongoConnectionDetails connectionDetails = new MongoConnectionDetails( +config.getHostname(), +config.getPort(), +Optional.ofNullable(username), +Optional.ofNullable(password)); + +final RyaClient client = MongoRyaClientFactory.build(connectionDetails, mongoClient); +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +config.getRyaInstanceName() + " has not been installed."); +} +} catch(final RyaClientException e) { +throw new ConnectException("Unable to determine if the Rya
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472088#comment-16472088 ] ASF GitHub Bot commented on RYA-487: Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187628752 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java --- @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final MaptaskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + +// Connect to the instance of Accumulo. +final Connector connector; +try { +final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers()); +connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() )); +} catch (AccumuloException | AccumuloSecurityException e) { --- End diff -- final > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472093#comment-16472093 ] ASF GitHub Bot commented on RYA-487: Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187634433 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java --- @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api.sink; + +import static java.util.Objects.requireNonNull; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.repository.sail.SailRepository; +import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection; +import org.eclipse.rdf4j.sail.Sail; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.jcabi.manifests.Manifests; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Handles the common components required to write {@link Statement}s to Rya. + * + * Implementations of this class only need to specify functionality that is specific to the + * Rya implementation. + */ +@DefaultAnnotation(NonNull.class) +public abstract class RyaSinkTask extends SinkTask { +private static final Logger log = LoggerFactory.getLogger(RyaSinkTask.class); + +@Nullable +private SailRepository sailRepo = null; + +@Nullable +private SailRepositoryConnection conn = null; + +/** + * Throws an exception if the configured Rya Instance is not already installed + * within the configured database. + * + * @param taskConfig - The configuration values that were provided to the task. (not null) + * @throws ConnectException The configured Rya Instance is not installed to the configured database + * or we were unable to figure out if it is installed. + */ +protected abstract void checkRyaInstanceExists(final MaptaskConfig) throws ConnectException; + +/** + * Creates an initialized {@link Sail} object that may be used to write {@link Statement}s to the configured + * Rya Instance. + * + * @param taskConfig - Configures how the Sail object will be created. (not null) + * @return The created Sail object. + * @throws ConnectException The Sail object could not be made. + */ +protected abstract Sail makeSail(final Map taskConfig) throws ConnectException; + +@Override +public String version() { +return Manifests.exists("Build-Version") ? Manifests.read("Build-Version"): "UNKNOWN"; +} + +@Override +public void start(final Map props) throws ConnectException { +requireNonNull(props); + +// Ensure the configured Rya Instance is installed within the configured database. +checkRyaInstanceExists(props); + +// Create the Sail object that is connected to the Rya Instance. +final Sail sail = makeSail(props); +sailRepo = new SailRepository( sail ); +conn = sailRepo.getConnection(); +} + +@Override +public void put(final Collection records) { +requireNonNull(records); + +// Return immediately if there are no records to handle. +
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472095#comment-16472095 ] ASF GitHub Bot commented on RYA-487: Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187640119 --- Diff: dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java --- @@ -274,16 +274,17 @@ public boolean getUseAggregationPipeline() { * on their child subtrees. * @param value whether to use aggregation pipeline optimization. */ -public void setUseAggregationPipeline(boolean value) { +public void setUseAggregationPipeline(final boolean value) { setBoolean(USE_AGGREGATION_PIPELINE, value); } @Override public ListgetOptimizers() { -List optimizers = super.getOptimizers(); +final List optimizers = super.getOptimizers(); if (getUseAggregationPipeline()) { -Class cl = AggregationPipelineQueryOptimizer.class; +final Class cl = AggregationPipelineQueryOptimizer.class; @SuppressWarnings("unchecked") +final --- End diff -- Put final on the same line as its variable > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472091#comment-16472091 ] ASF GitHub Bot commented on RYA-487: Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187633086 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java --- @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api; + +import java.io.ByteArrayOutputStream; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.serialization.Serializer; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.rio.RDFWriter; +import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka {@link Serializer} that is able to serialize a set of {@link Statement}s + * using the RDF4J Rio Binary format. + */ +@DefaultAnnotation(NonNull.class) +public class StatementsSerializer implements Serializer{ +private static final Logger log = LoggerFactory.getLogger(StatementsSerializer.class); + +private static final BinaryRDFWriterFactory WRITER_FACTORY = new BinaryRDFWriterFactory(); + +@Override +public void configure(final Map configs, final boolean isKey) { +// Nothing to do. +} + +@Override +public byte[] serialize(final String topic, final Set data) { +if(data == null) { +// Returning null because that is the contract of this method. +return null; +} + +// Write the statements using a Binary RDF Writer. +final ByteArrayOutputStream boas = new ByteArrayOutputStream(); --- End diff -- change variable name to baos > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472094#comment-16472094 ] ASF GitHub Bot commented on RYA-487: Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187632758 --- Diff: extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java --- @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.api; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.kafka.common.serialization.Deserializer; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.rio.RDFHandler; +import org.eclipse.rdf4j.rio.RDFHandlerException; +import org.eclipse.rdf4j.rio.RDFParseException; +import org.eclipse.rdf4j.rio.RDFParser; +import org.eclipse.rdf4j.rio.binary.BinaryRDFParserFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka {@link Deserializer} that is able to deserialize an RDF4J Rio Binary format serialized + * set of {@link Statement}s. + */ +@DefaultAnnotation(NonNull.class) +public class StatementsDeserializer implements Deserializer{ +private static final Logger log = LoggerFactory.getLogger(StatementsDeserializer.class); + +private static final BinaryRDFParserFactory PARSER_FACTORY = new BinaryRDFParserFactory(); + +@Override +public void configure(final Map configs, final boolean isKey) { +// Nothing to do. +} + +@Override +public Set deserialize(final String topic, final byte[] data) { +if(data == null || data.length == 0) { +// Return null because that is the contract of this method. +return null; +} + +try { +final RDFParser parser = PARSER_FACTORY.getParser(); +final Set statements = new HashSet<>(); + +parser.setRDFHandler(new RDFHandler() { +@Override +public void handleStatement(final Statement statement) throws RDFHandlerException { +log.debug("Statement: " + statement); +statements.add( statement ); +} + +@Override public void startRDF() throws RDFHandlerException { } +@Override public void handleNamespace(final String arg0, final String arg1) throws RDFHandlerException { } +@Override public void handleComment(final String arg0) throws RDFHandlerException { } +@Override public void endRDF() throws RDFHandlerException { } +}); + +parser.parse(new ByteArrayInputStream(data), null); +return statements; + +} catch(final RDFParseException | RDFHandlerException | IOException e) { +log.error("Could not deserialize a Set of VisibilityStatement objects using the RDF4J Rio Binray format.", e); --- End diff -- typo. Binary > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472102#comment-16472102 ] ASF GitHub Bot commented on RYA-487: Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187636404 --- Diff: extras/kafka.connect/client/pom.xml --- @@ -0,0 +1,135 @@ + + +http://maven.apache.org/POM/4.0.0; + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + +org.apache.rya +rya.kafka.connect.parent +4.0.0-incubating-SNAPSHOT + + +rya.kafka.connect.client + +Apache Rya Kafka Connect - Client +Contains a client that may be used to load Statements into + a Kafka topic to be read by Kafka Connect. + + + + +org.apache.rya +rya.sail + + +org.apache.rya +rya.kafka.connect.api + + + + +org.eclipse.rdf4j +rdf4j-model + + +com.google.guava +guava + + +com.beust +jcommander + + +com.github.stephenc.findbugs +findbugs-annotations + + +org.apache.kafka +kafka-clients + + + --- End diff -- I think pulling in rya.sail as a dependency will give you all the RDF Formats. So, this probably isn't needed. > Kafka Connect Rya Sink > -- > > Key: RYA-487 > URL: https://issues.apache.org/jira/browse/RYA-487 > Project: Rya > Issue Type: New Feature >Affects Versions: 4.0.0 >Reporter: Kevin Chilton >Assignee: Kevin Chilton >Priority: Major > Fix For: 4.0.0 > > > Implement a Kafka Connect Sink that writes to Rya. > This sink does not have to handle visibilities. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (RYA-487) Kafka Connect Rya Sink
[ https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472101#comment-16472101 ] ASF GitHub Bot commented on RYA-487: Github user ejwhite922 commented on a diff in the pull request: https://github.com/apache/incubator-rya/pull/296#discussion_r187631961 --- Diff: extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java --- @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.kafka.connect.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.kafka.connect.api.sink.RyaSinkTask; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.eclipse.rdf4j.sail.Sail; +import org.eclipse.rdf4j.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to store data. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloRyaSinkTask extends RyaSinkTask { + +@Override +protected void checkRyaInstanceExists(final MaptaskConfig) throws ConnectException { +requireNonNull(taskConfig); + +// Parse the configuration object. +final AccumuloRyaSinkConfig config = new AccumuloRyaSinkConfig(taskConfig); + +// Connect to the instance of Accumulo. +final Connector connector; +try { +final Instance instance = new ZooKeeperInstance(config.getClusterName(), config.getZookeepers()); +connector = instance.getConnector(config.getUsername(), new PasswordToken( config.getPassword() )); +} catch (AccumuloException | AccumuloSecurityException e) { +throw new ConnectException("Could not create a Connector to the configured Accumulo instance.", e); +} + +// Use a RyaClient to see if the configured instance exists. +try { +final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( +config.getUsername(), +config.getPassword().toCharArray(), +config.getClusterName(), +config.getZookeepers()); +final RyaClient client = AccumuloRyaClientFactory.build(connectionDetails, connector); + +if(!client.getInstanceExists().exists( config.getRyaInstanceName() )) { +throw new ConnectException("The Rya Instance named " + +config.getRyaInstanceName() + " has not been installed."); +} + +} catch (final RyaClientException e) { +throw new ConnectException("Unable to determine if the Rya Instance named " + +config.getRyaInstanceName() + " has been installed.", e); +} +} + +@Override +protected Sail