[GitHub] incubator-rya issue #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread asfgit
Github user asfgit commented on the issue:

https://github.com/apache/incubator-rya/pull/296
  

Refer to this link for build results (access rights to CI server needed): 

https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/761/



---


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472721#comment-16472721
 ] 

ASF GitHub Bot commented on RYA-487:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187742000
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java
 ---
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFWriter;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Serializer} that is able to serialize a set of {@link 
Statement}s
+ * using the RDF4J Rio Binary format.
--- End diff --

I mean that's true, but I don't think it's worth documenting.


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187742000
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java
 ---
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFWriter;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Serializer} that is able to serialize a set of {@link 
Statement}s
+ * using the RDF4J Rio Binary format.
--- End diff --

I mean that's true, but I don't think it's worth documenting.


---


[GitHub] incubator-rya issue #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread asfgit
Github user asfgit commented on the issue:

https://github.com/apache/incubator-rya/pull/296
  

Refer to this link for build results (access rights to CI server needed): 

https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/760/



---


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472707#comment-16472707
 ] 

ASF GitHub Bot commented on RYA-487:


Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187739885
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java
 ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to 
Rya.
+ * 
+ * Implementations of this class only need to specify functionality that 
is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
--- End diff --

gotcha, thanks for the doc paste dump


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187739974
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java
 ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to 
Rya.
+ * 
+ * Implementations of this class only need to specify functionality that 
is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaSinkTask extends SinkTask {
+private static final Logger log = 
LoggerFactory.getLogger(RyaSinkTask.class);
+
+@Nullable
+private SailRepository sailRepo = null;
+
+@Nullable
+private SailRepositoryConnection conn = null;
+
+/**
+ * Throws an exception if the configured Rya Instance is not already 
installed
+ * within the configured database.
+ *
+ * @param taskConfig - The configuration values that were provided to 
the task. (not null)
+ * @throws ConnectException The configured Rya Instance is not 
installed to the configured database
+ *   or we were unable to figure out if it is installed.
+ */
+protected abstract void checkRyaInstanceExists(final Map taskConfig) throws ConnectException;
+
+/**
+ * Creates an initialized {@link Sail} object that may be used to 
write {@link Statement}s to the configured
+ * Rya Instance.
+ *
+ * @param taskConfig - Configures how the Sail object will be created. 
(not null)
+ * @return The created Sail object.
+ * @throws ConnectException The Sail object could not be made.
+ */
+protected abstract Sail makeSail(final Map taskConfig) 
throws ConnectException;
+
+@Override
+public String version() {
+return Manifests.exists("Build-Version") ? 
Manifests.read("Build-Version"): "UNKNOWN";
+}
+
+@Override
+public void start(final Map props) throws 
ConnectException {
+requireNonNull(props);
+
+// Ensure the configured Rya Instance is installed within the 
configured database.
+checkRyaInstanceExists(props);
+
+// Create the Sail object that is connected to the Rya Instance.
+final Sail sail = makeSail(props);
+sailRepo = new SailRepository( sail );
+conn = sailRepo.getConnection();
+}
+
+@Override
+public void put(final Collection records) {
+requireNonNull(records);
+
+// Return immediately if there are no records to handle.
+if(records.isEmpty()) {
+return;
+}
+
+// If a transaction has not been started yet, then start one.
+if(!conn.isActive()) {
+conn.begin();
+}
+
+  

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187739885
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java
 ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to 
Rya.
+ * 
+ * Implementations of this class only need to specify functionality that 
is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
--- End diff --

gotcha, thanks for the doc paste dump


---


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472705#comment-16472705
 ] 

ASF GitHub Bot commented on RYA-487:


Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187739762
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java
 ---
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFWriter;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Serializer} that is able to serialize a set of {@link 
Statement}s
+ * using the RDF4J Rio Binary format.
--- End diff --

fair enough, it just seemed like you are delegating the actual 
serialization to something else.


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (RYA-494) Shell insert and query bug.

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472706#comment-16472706
 ] 

ASF GitHub Bot commented on RYA-494:


Github user asfgit commented on the issue:

https://github.com/apache/incubator-rya/pull/295
  

Refer to this link for build results (access rights to CI server needed): 

https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/759/



> Shell insert and query bug.
> ---
>
> Key: RYA-494
> URL: https://issues.apache.org/jira/browse/RYA-494
> Project: Rya
>  Issue Type: Bug
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
>
> Using the Rya Shell:
>  # Connect to a cluster of Accumulo.
>  # Install an instance of Rya that has all secondary indexers and the 
> sharding turned off.
>  # Connect to that instance of rya.
>  # Load the following N-Triples file into it:
>  ## 
> {code:java}
>   .
>   .
>   .
>   .
>   .
>   .{code}
>  # Query for all statements using the following query:
>  ## select * where \{ ?s ?p ?o .}
> You will see the following results:
> {code:java}
> p,s,o
> urn:talksTo,urn:Alice,urn:Bob
> urn:talksTo,urn:Bob,urn:Charlie
> urn:talksTo,urn:Charlie,urn:Alice
> urn:talksTo,urn:David,urn:Eve
> urn:listensTo,urn:Eve,urn:Bob
> urn:org.apache.rya/2012/05#version,urn:org.apache.rya/2012/05#rts,"3.0.0"{code}
> The following statement is missing:
> {code:java}
>   .{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] incubator-rya issue #295: RYA-494 Fixed a bug where the shell was not loadin...

2018-05-11 Thread asfgit
Github user asfgit commented on the issue:

https://github.com/apache/incubator-rya/pull/295
  

Refer to this link for build results (access rights to CI server needed): 

https://builds.apache.org/job/incubator-rya-master-with-optionals-pull-requests/759/



---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187739762
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java
 ---
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFWriter;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Serializer} that is able to serialize a set of {@link 
Statement}s
+ * using the RDF4J Rio Binary format.
--- End diff --

fair enough, it just seemed like you are delegating the actual 
serialization to something else.


---


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472704#comment-16472704
 ] 

ASF GitHub Bot commented on RYA-487:


Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187739445
  
--- Diff: 
extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java
 ---
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link AccumuloRyaSinkConfig}.
+ */
+public class AccumuloRyaSinkConfigTest {
+
+@Test
+public void parses() {
--- End diff --

gotcha, the test just seemed lacking an error case


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187739297
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java 
---
@@ -438,6 +438,9 @@ public static boolean getUsePcjUpdaterIndex(final 
Configuration conf) {
 return Optional.fromNullable(conf.get(FLUO_APP_NAME));
 }
 
+public static void setUseMongo(final Configuration conf, final boolean 
useMongo) {
--- End diff --

sure makes sense


---


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472703#comment-16472703
 ] 

ASF GitHub Bot commented on RYA-487:


Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187739297
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java 
---
@@ -438,6 +438,9 @@ public static boolean getUsePcjUpdaterIndex(final 
Configuration conf) {
 return Optional.fromNullable(conf.get(FLUO_APP_NAME));
 }
 
+public static void setUseMongo(final Configuration conf, final boolean 
useMongo) {
--- End diff --

sure makes sense


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187739254
  
--- Diff: 
dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java
 ---
@@ -62,14 +62,14 @@ public AccumuloRdfConfiguration() {
 super();
 }
 
-public AccumuloRdfConfiguration(Configuration other) {
+public AccumuloRdfConfiguration(final Configuration other) {
 super(other);
 }
 
-public AccumuloRdfConfigurationBuilder getBuilder() {
+public static AccumuloRdfConfigurationBuilder getBuilder() {
--- End diff --

👍 


---


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472702#comment-16472702
 ] 

ASF GitHub Bot commented on RYA-487:


Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187739254
  
--- Diff: 
dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java
 ---
@@ -62,14 +62,14 @@ public AccumuloRdfConfiguration() {
 super();
 }
 
-public AccumuloRdfConfiguration(Configuration other) {
+public AccumuloRdfConfiguration(final Configuration other) {
 super(other);
 }
 
-public AccumuloRdfConfigurationBuilder getBuilder() {
+public static AccumuloRdfConfigurationBuilder getBuilder() {
--- End diff --

 


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472685#comment-16472685
 ] 

ASF GitHub Bot commented on RYA-487:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187736288
  
--- Diff: 
extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java
 ---
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkConnector} that uses a Mongo DB Rya backend when 
creating tasks.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkConnector extends RyaSinkConnector {
+
+@Nullable
+private MongoRyaSinkConfig config = null;
+
+@Override
+public void start(final Map props) {
+this.config = new MongoRyaSinkConfig( props );
+}
+
+@Override
+protected AbstractConfig getConfig() {
+if(config == null) {
+throw new IllegalStateException("The configuration has not 
been set yet. Invoke start(props) first.");
--- End diff --

Done.


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472680#comment-16472680
 ] 

ASF GitHub Bot commented on RYA-487:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187735910
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java
 ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to 
Rya.
+ * 
+ * Implementations of this class only need to specify functionality that 
is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaSinkTask extends SinkTask {
+private static final Logger log = 
LoggerFactory.getLogger(RyaSinkTask.class);
+
+@Nullable
+private SailRepository sailRepo = null;
+
+@Nullable
+private SailRepositoryConnection conn = null;
+
+/**
+ * Throws an exception if the configured Rya Instance is not already 
installed
+ * within the configured database.
+ *
+ * @param taskConfig - The configuration values that were provided to 
the task. (not null)
+ * @throws ConnectException The configured Rya Instance is not 
installed to the configured database
+ *   or we were unable to figure out if it is installed.
+ */
+protected abstract void checkRyaInstanceExists(final Map taskConfig) throws ConnectException;
+
+/**
+ * Creates an initialized {@link Sail} object that may be used to 
write {@link Statement}s to the configured
+ * Rya Instance.
+ *
+ * @param taskConfig - Configures how the Sail object will be created. 
(not null)
+ * @return The created Sail object.
+ * @throws ConnectException The Sail object could not be made.
+ */
+protected abstract Sail makeSail(final Map taskConfig) 
throws ConnectException;
+
+@Override
+public String version() {
+return Manifests.exists("Build-Version") ? 
Manifests.read("Build-Version"): "UNKNOWN";
+}
+
+@Override
+public void start(final Map props) throws 
ConnectException {
+requireNonNull(props);
+
+// Ensure the configured Rya Instance is installed within the 
configured database.
+checkRyaInstanceExists(props);
+
+// Create the Sail object that is connected to the Rya Instance.
+final Sail sail = makeSail(props);
+sailRepo = new SailRepository( sail );
+conn = sailRepo.getConnection();
+}
+
+@Override
+public void put(final Collection records) {
+requireNonNull(records);
+
+// Return immediately if there are no records to handle.
+

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187736164
  
--- Diff: 
extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java
 ---
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka Connect configuration that is used to configure {@link 
MongoRyaSinkConnector}s and {@link MongoRyaSinkTask}s.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkConfig extends RyaSinkConfig {
+
+public static final String HOSTNAME = "mongo.hostname";
+private static final String HOSTNAME_DOC = "The Mongo DB hostname the 
Sail connections wlll use.";
--- End diff --

Done.


---


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472672#comment-16472672
 ] 

ASF GitHub Bot commented on RYA-487:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187735067
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java
 ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to 
Rya.
+ * 
+ * Implementations of this class only need to specify functionality that 
is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
--- End diff --

"/**
 * Indicates that all members of the class or package should be annotated 
with the default value of the supplied
 * annotation class. This would be used for behavior annotations such as 
@NonNull, @CheckForNull,
 * or @CheckReturnValue. In particular, you can use 
@DefaultAnnotation(NonNull.class) on a class or package,
 * and then use @Nullable only on those parameters, methods or fields that 
you want to allow to be null.
 */"


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187735067
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java
 ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to 
Rya.
+ * 
+ * Implementations of this class only need to specify functionality that 
is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
--- End diff --

"/**
 * Indicates that all members of the class or package should be annotated 
with the default value of the supplied
 * annotation class. This would be used for behavior annotations such as 
@NonNull, @CheckForNull,
 * or @CheckReturnValue. In particular, you can use 
@DefaultAnnotation(NonNull.class) on a class or package,
 * and then use @Nullable only on those parameters, methods or fields that 
you want to allow to be null.
 */"


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187734887
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java
 ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to 
Rya.
+ * 
+ * Implementations of this class only need to specify functionality that 
is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
--- End diff --

It indicates by default the parameters are not null.


---


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472669#comment-16472669
 ] 

ASF GitHub Bot commented on RYA-487:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187734692
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java
 ---
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when 
creating tasks.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkConnector extends RyaSinkConnector {
+
+@Nullable
+private AccumuloRyaSinkConfig config = null;
+
+@Override
+public void start(final Map props) {
+this.config = new AccumuloRyaSinkConfig( props );
+}
+
+@Override
+protected AbstractConfig getConfig() {
+if(config == null) {
+throw new IllegalStateException("The configuration has not 
been set yet. Invoke start(props) first.");
--- End diff --

Done.


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187734692
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java
 ---
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when 
creating tasks.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkConnector extends RyaSinkConnector {
+
+@Nullable
+private AccumuloRyaSinkConfig config = null;
+
+@Override
+public void start(final Map props) {
+this.config = new AccumuloRyaSinkConfig( props );
+}
+
+@Override
+protected AbstractConfig getConfig() {
+if(config == null) {
+throw new IllegalStateException("The configuration has not 
been set yet. Invoke start(props) first.");
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187734448
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java
 ---
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFWriter;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Serializer} that is able to serialize a set of {@link 
Statement}s
+ * using the RDF4J Rio Binary format.
--- End diff --

I think saying I'm using the RDF4J Rio Binary format is more useful than 
indicating how I went about doing that since that's what the code is.


---


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472665#comment-16472665
 ] 

ASF GitHub Bot commented on RYA-487:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187734176
  
--- Diff: 
extras/kafka.connect/accumulo/src/test/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConfigTest.java
 ---
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig;
+import org.junit.Test;
+
+/**
+ * Unit tests the methods of {@link AccumuloRyaSinkConfig}.
+ */
+public class AccumuloRyaSinkConfigTest {
+
+@Test
+public void parses() {
--- End diff --

Could you give an example of a malformed field? Do you just mean fields 
that are not part of the schema? That's not illegal. They just get ignored.


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187733815
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
 ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 
AccumuloRyaSinkConfig(taskConfig);
+
+// Connect to the instance of Accumulo.
+final Connector connector;
+try {
+final Instance instance = new 
ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+connector = instance.getConnector(config.getUsername(), new 
PasswordToken( config.getPassword() ));
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new ConnectException("Could not create a Connector to 
the configured Accumulo instance.", e);
+}
+
+// Use a RyaClient to see if the configured instance exists.
+try {
+final AccumuloConnectionDetails connectionDetails = new 
AccumuloConnectionDetails(
--- End diff --

That's true. Feel free to write an improvement ticket for that.


---


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472658#comment-16472658
 ] 

ASF GitHub Bot commented on RYA-487:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187733573
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java
 ---
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when 
creating tasks.
+ */
+@DefaultAnnotation(NonNull.class)
--- End diff --

That field is nullable because this is a stateful object, but the 
parameters into the start(...) function may not be null. I'll add a null check 
there.


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187733573
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java
 ---
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when 
creating tasks.
+ */
+@DefaultAnnotation(NonNull.class)
--- End diff --

That field is nullable because this is a stateful object, but the 
parameters into the start(...) function may not be null. I'll add a null check 
there.


---


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472656#comment-16472656
 ] 

ASF GitHub Bot commented on RYA-487:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187733216
  
--- Diff: extras/kafka.connect/README.md ---
@@ -0,0 +1,22 @@
+
+
+The parent project for all Rya Kafka Connect work. All projects thare are 
part 
+of that system must use this project's pom as their parent pom.
--- End diff --

Done.


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472654#comment-16472654
 ] 

ASF GitHub Bot commented on RYA-487:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187732940
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java 
---
@@ -438,6 +438,9 @@ public static boolean getUsePcjUpdaterIndex(final 
Configuration conf) {
 return Optional.fromNullable(conf.get(FLUO_APP_NAME));
 }
 
+public static void setUseMongo(final Configuration conf, final boolean 
useMongo) {
--- End diff --

I don't wan to mess with how our configuration objects are initialized for 
the scope of this ticket.


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187732940
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java 
---
@@ -438,6 +438,9 @@ public static boolean getUsePcjUpdaterIndex(final 
Configuration conf) {
 return Optional.fromNullable(conf.get(FLUO_APP_NAME));
 }
 
+public static void setUseMongo(final Configuration conf, final boolean 
useMongo) {
--- End diff --

I don't wan to mess with how our configuration objects are initialized for 
the scope of this ticket.


---


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472649#comment-16472649
 ] 

ASF GitHub Bot commented on RYA-487:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187732007
  
--- Diff: 
dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java
 ---
@@ -62,14 +62,14 @@ public AccumuloRdfConfiguration() {
 super();
 }
 
-public AccumuloRdfConfiguration(Configuration other) {
+public AccumuloRdfConfiguration(final Configuration other) {
 super(other);
 }
 
-public AccumuloRdfConfigurationBuilder getBuilder() {
+public static AccumuloRdfConfigurationBuilder getBuilder() {
--- End diff --

For me it is. I don't really want to refactor the entire method name in 
this review, though. I just needed it to be static so that I could use it.


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187732007
  
--- Diff: 
dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java
 ---
@@ -62,14 +62,14 @@ public AccumuloRdfConfiguration() {
 super();
 }
 
-public AccumuloRdfConfiguration(Configuration other) {
+public AccumuloRdfConfiguration(final Configuration other) {
 super(other);
 }
 
-public AccumuloRdfConfigurationBuilder getBuilder() {
+public static AccumuloRdfConfigurationBuilder getBuilder() {
--- End diff --

For me it is. I don't really want to refactor the entire method name in 
this review, though. I just needed it to be static so that I could use it.


---


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472648#comment-16472648
 ] 

ASF GitHub Bot commented on RYA-487:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187731624
  
--- Diff: 
dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
 ---
@@ -274,16 +274,17 @@ public boolean getUseAggregationPipeline() {
  * on their child subtrees.
  * @param value whether to use aggregation pipeline optimization.
  */
-public void setUseAggregationPipeline(boolean value) {
+public void setUseAggregationPipeline(final boolean value) {
 setBoolean(USE_AGGREGATION_PIPELINE, value);
 }
 
 @Override
 public List getOptimizers() {
-List optimizers = super.getOptimizers();
+final List optimizers = 
super.getOptimizers();
 if (getUseAggregationPipeline()) {
-Class cl = AggregationPipelineQueryOptimizer.class;
+final Class cl = AggregationPipelineQueryOptimizer.class;
 @SuppressWarnings("unchecked")
+final
--- End diff --

Done.


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187731624
  
--- Diff: 
dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
 ---
@@ -274,16 +274,17 @@ public boolean getUseAggregationPipeline() {
  * on their child subtrees.
  * @param value whether to use aggregation pipeline optimization.
  */
-public void setUseAggregationPipeline(boolean value) {
+public void setUseAggregationPipeline(final boolean value) {
 setBoolean(USE_AGGREGATION_PIPELINE, value);
 }
 
 @Override
 public List getOptimizers() {
-List optimizers = super.getOptimizers();
+final List optimizers = 
super.getOptimizers();
 if (getUseAggregationPipeline()) {
-Class cl = AggregationPipelineQueryOptimizer.class;
+final Class cl = AggregationPipelineQueryOptimizer.class;
 @SuppressWarnings("unchecked")
+final
--- End diff --

Done.


---


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472644#comment-16472644
 ] 

ASF GitHub Bot commented on RYA-487:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187731377
  
--- Diff: 
extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java
 ---
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import com.google.common.base.Strings;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws IllegalStateException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final MongoRyaSinkConfig config = new 
MongoRyaSinkConfig(taskConfig);
+@Nullable
+final String username = 
Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername();
+@Nullable
+final char[] password = 
Strings.isNullOrEmpty(config.getPassword()) ? null : 
config.getPassword().toCharArray();
+
+// Connect a Mongo Client to the configured Mongo DB instance.
+final ServerAddress serverAddr = new 
ServerAddress(config.getHostname(), config.getPort());
+final boolean hasCredentials = username != null && password != 
null;
+
+try(MongoClient mongoClient = hasCredentials ?
+new MongoClient(serverAddr, 
Arrays.asList(MongoCredential.createCredential(username, 
config.getRyaInstanceName(), password))) :
+new MongoClient(serverAddr)) {
+// Use a RyaClient to see if the configured instance exists.
+// Create the Mongo Connection Details that describe the Mongo 
DB Server we are interacting with.
+final MongoConnectionDetails connectionDetails = new 
MongoConnectionDetails(
+config.getHostname(),
+config.getPort(),
+Optional.ofNullable(username),
+Optional.ofNullable(password));
+
+final RyaClient client = 
MongoRyaClientFactory.build(connectionDetails, mongoClient);
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+ 

[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472645#comment-16472645
 ] 

ASF GitHub Bot commented on RYA-487:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187731454
  
--- Diff: 
extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java
 ---
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import com.google.common.base.Strings;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws IllegalStateException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final MongoRyaSinkConfig config = new 
MongoRyaSinkConfig(taskConfig);
+@Nullable
+final String username = 
Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername();
+@Nullable
+final char[] password = 
Strings.isNullOrEmpty(config.getPassword()) ? null : 
config.getPassword().toCharArray();
+
+// Connect a Mongo Client to the configured Mongo DB instance.
+final ServerAddress serverAddr = new 
ServerAddress(config.getHostname(), config.getPort());
+final boolean hasCredentials = username != null && password != 
null;
+
+try(MongoClient mongoClient = hasCredentials ?
+new MongoClient(serverAddr, 
Arrays.asList(MongoCredential.createCredential(username, 
config.getRyaInstanceName(), password))) :
+new MongoClient(serverAddr)) {
+// Use a RyaClient to see if the configured instance exists.
+// Create the Mongo Connection Details that describe the Mongo 
DB Server we are interacting with.
+final MongoConnectionDetails connectionDetails = new 
MongoConnectionDetails(
+config.getHostname(),
+config.getPort(),
+Optional.ofNullable(username),
+Optional.ofNullable(password));
+
+final RyaClient client = 
MongoRyaClientFactory.build(connectionDetails, mongoClient);
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+ 

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187731454
  
--- Diff: 
extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java
 ---
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import com.google.common.base.Strings;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws IllegalStateException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final MongoRyaSinkConfig config = new 
MongoRyaSinkConfig(taskConfig);
+@Nullable
+final String username = 
Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername();
+@Nullable
+final char[] password = 
Strings.isNullOrEmpty(config.getPassword()) ? null : 
config.getPassword().toCharArray();
+
+// Connect a Mongo Client to the configured Mongo DB instance.
+final ServerAddress serverAddr = new 
ServerAddress(config.getHostname(), config.getPort());
+final boolean hasCredentials = username != null && password != 
null;
+
+try(MongoClient mongoClient = hasCredentials ?
+new MongoClient(serverAddr, 
Arrays.asList(MongoCredential.createCredential(username, 
config.getRyaInstanceName(), password))) :
+new MongoClient(serverAddr)) {
+// Use a RyaClient to see if the configured instance exists.
+// Create the Mongo Connection Details that describe the Mongo 
DB Server we are interacting with.
+final MongoConnectionDetails connectionDetails = new 
MongoConnectionDetails(
+config.getHostname(),
+config.getPort(),
+Optional.ofNullable(username),
+Optional.ofNullable(password));
+
+final RyaClient client = 
MongoRyaClientFactory.build(connectionDetails, mongoClient);
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+config.getRyaInstanceName() + " has not been 
installed.");
+}
+} catch(final RyaClientException e) {
+throw new ConnectException("Unable to determine if the Rya 

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187731377
  
--- Diff: 
extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java
 ---
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import com.google.common.base.Strings;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws IllegalStateException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final MongoRyaSinkConfig config = new 
MongoRyaSinkConfig(taskConfig);
+@Nullable
+final String username = 
Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername();
+@Nullable
+final char[] password = 
Strings.isNullOrEmpty(config.getPassword()) ? null : 
config.getPassword().toCharArray();
+
+// Connect a Mongo Client to the configured Mongo DB instance.
+final ServerAddress serverAddr = new 
ServerAddress(config.getHostname(), config.getPort());
+final boolean hasCredentials = username != null && password != 
null;
+
+try(MongoClient mongoClient = hasCredentials ?
+new MongoClient(serverAddr, 
Arrays.asList(MongoCredential.createCredential(username, 
config.getRyaInstanceName(), password))) :
+new MongoClient(serverAddr)) {
+// Use a RyaClient to see if the configured instance exists.
+// Create the Mongo Connection Details that describe the Mongo 
DB Server we are interacting with.
+final MongoConnectionDetails connectionDetails = new 
MongoConnectionDetails(
+config.getHostname(),
+config.getPort(),
+Optional.ofNullable(username),
+Optional.ofNullable(password));
+
+final RyaClient client = 
MongoRyaClientFactory.build(connectionDetails, mongoClient);
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+config.getRyaInstanceName() + " has not been 
installed.");
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187731146
  
--- Diff: 
extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.java
 ---
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.client.command;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.kafka.connect.api.StatementsSerializer;
+import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand;
+import org.apache.rya.rdftriplestore.utils.RdfFormatUtils;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFFormat;
+import org.eclipse.rdf4j.rio.RDFHandlerException;
+import org.eclipse.rdf4j.rio.RDFParseException;
+import org.eclipse.rdf4j.rio.RDFParser;
+import org.eclipse.rdf4j.rio.Rio;
+import org.eclipse.rdf4j.rio.UnsupportedRDFormatException;
+import org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Writes {@link Statement}s to a Kafka topic using the Rya Kafka Connect 
Sink format.
+ */
+@DefaultAnnotation(NonNull.class)
+public class WriteStatementsCommand implements RyaKafkaClientCommand {
+private static final Logger log = 
LoggerFactory.getLogger(WriteStatementsCommand.class);
+
+/**
+ * Command line parameters that are used by this command to configure 
itself.
+ */
+public static class WriteParameters extends KafkaParameters {
+@Parameter(names = {"--statementsFile", "-f"}, required = true, 
description = "The file of RDF statements to load into Rya Streams.")
+public String statementsFile;
+}
+
+@Override
+public String getCommand() {
+return "write";
+}
+
+@Override
+public String getDescription() {
+return "Writes Statements to the specified Kafka topic.";
+}
+
+@Override
+public boolean validArguments(final String[] args) {
+boolean valid = true;
+try {
+new JCommander(new WriteParameters(), args);
+} catch(final ParameterException e) {
+valid = false;
+}
+return valid;
+}
+
+/**
+ * @return Describes what arguments may be provided to the command.
+ */
+@Override
+public String getUsage() {
+final JCommander parser = new JCommander(new WriteParameters());
+
+final StringBuilder usage = new StringBuilder();
+parser.usage(usage);
+return usage.toString();
+}
+
+@Override
+public void execute(final String[] args) throws ArgumentsException, 
ExecutionException {
+requireNonNull(args);
+
+// Parse the command line arguments.
+final WriteParameters params = new WriteParameters();
+try {
+new JCommander(params, args);
+} catch(final ParameterException e) {
+throw new ArgumentsException("Could not stream the query's 
results 

[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472640#comment-16472640
 ] 

ASF GitHub Bot commented on RYA-487:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187731146
  
--- Diff: 
extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/command/WriteStatementsCommand.java
 ---
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.client.command;
+
+import static java.util.Objects.requireNonNull;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.kafka.connect.api.StatementsSerializer;
+import org.apache.rya.kafka.connect.client.RyaKafkaClientCommand;
+import org.apache.rya.rdftriplestore.utils.RdfFormatUtils;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFFormat;
+import org.eclipse.rdf4j.rio.RDFHandlerException;
+import org.eclipse.rdf4j.rio.RDFParseException;
+import org.eclipse.rdf4j.rio.RDFParser;
+import org.eclipse.rdf4j.rio.Rio;
+import org.eclipse.rdf4j.rio.UnsupportedRDFormatException;
+import org.eclipse.rdf4j.rio.helpers.AbstractRDFHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Writes {@link Statement}s to a Kafka topic using the Rya Kafka Connect 
Sink format.
+ */
+@DefaultAnnotation(NonNull.class)
+public class WriteStatementsCommand implements RyaKafkaClientCommand {
+private static final Logger log = 
LoggerFactory.getLogger(WriteStatementsCommand.class);
+
+/**
+ * Command line parameters that are used by this command to configure 
itself.
+ */
+public static class WriteParameters extends KafkaParameters {
+@Parameter(names = {"--statementsFile", "-f"}, required = true, 
description = "The file of RDF statements to load into Rya Streams.")
+public String statementsFile;
+}
+
+@Override
+public String getCommand() {
+return "write";
+}
+
+@Override
+public String getDescription() {
+return "Writes Statements to the specified Kafka topic.";
+}
+
+@Override
+public boolean validArguments(final String[] args) {
+boolean valid = true;
+try {
+new JCommander(new WriteParameters(), args);
+} catch(final ParameterException e) {
+valid = false;
+}
+return valid;
+}
+
+/**
+ * @return Describes what arguments may be provided to the command.
+ */
+@Override
+public String getUsage() {
+final JCommander parser = new JCommander(new WriteParameters());
+
+final StringBuilder usage = new StringBuilder();
+parser.usage(usage);
+return usage.toString();
+}
+
+@Override
+public void execute(final String[] args) throws ArgumentsException, 
ExecutionException {
+requireNonNull(args);
+
+// Parse the command line arguments.
+final WriteParameters 

[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472637#comment-16472637
 ] 

ASF GitHub Bot commented on RYA-487:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187730865
  
--- Diff: extras/kafka.connect/client/pom.xml ---
@@ -0,0 +1,135 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+
+org.apache.rya
+rya.kafka.connect.parent
+4.0.0-incubating-SNAPSHOT
+
+
+rya.kafka.connect.client
+
+Apache Rya Kafka Connect - Client
+Contains a client that may be used to load Statements 
into 
+ a Kafka topic to be read by Kafka Connect.
+
+
+
+
+org.apache.rya
+rya.sail
+
+
+org.apache.rya
+rya.kafka.connect.api
+
+
+
+
+org.eclipse.rdf4j
+rdf4j-model
+
+
+com.google.guava
+guava
+
+
+com.beust
+jcommander
+
+
+com.github.stephenc.findbugs
+findbugs-annotations
+
+
+org.apache.kafka
+kafka-clients
+
+
+
--- End diff --

It does, good call. Done.


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472639#comment-16472639
 ] 

ASF GitHub Bot commented on RYA-487:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187731028
  
--- Diff: 
extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/CLIDriver.java
 ---
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.client;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import 
org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ArgumentsException;
+import 
org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ExecutionException;
+import org.apache.rya.kafka.connect.client.command.ReadStatementsCommand;
+import org.apache.rya.kafka.connect.client.command.WriteStatementsCommand;
+import org.eclipse.rdf4j.model.Statement;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A CLI tool used to read/write {@link Statement}s to/from a Kafka topic 
using the format
+ * the Rya Kafka Connect Sinks expect.
+ */
+@DefaultAnnotation(NonNull.class)
+public class CLIDriver {
+
+/**
+ * Maps from command strings to the object that performs the command.
+ */
+private static final ImmutableMap 
COMMANDS;
+static {
+final Set commandClasses = 
new HashSet<>();
+commandClasses.add(ReadStatementsCommand.class);
+commandClasses.add(WriteStatementsCommand.class);
+final ImmutableMap.Builder builder 
= ImmutableMap.builder();
+for(final Class commandClass : 
commandClasses) {
+try {
+final RyaKafkaClientCommand command = 
commandClass.newInstance();
+builder.put(command.getCommand(), command);
+} catch (InstantiationException | IllegalAccessException e) {
+System.err.println("Could not run the application because 
a RyaKafkaClientCommand is missing its empty constructor.");
+e.printStackTrace();
+}
+}
+COMMANDS = builder.build();
+}
+
+private static final String USAGE = makeUsage(COMMANDS);
+
+public static void main(final String[] args) {
+// If no command provided or the command isn't recognized, then 
print the usage.
+if (args.length == 0 || !COMMANDS.containsKey(args[0])) {
+System.out.println(USAGE);
+System.exit(1);
+}
+
+// Fetch the command that will be executed.
+final String command = args[0];
+final String[] commandArgs = Arrays.copyOfRange(args, 1, 
args.length);
+final RyaKafkaClientCommand clientCommand = COMMANDS.get(command);
+
+// Print usage if the arguments are invalid for the command.
+if(!clientCommand.validArguments(commandArgs)) {
+System.out.println(clientCommand.getUsage());
+System.exit(1);
+}
+
+// Execute the command.
+try {
--- End diff --

Done.


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink 

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187731028
  
--- Diff: 
extras/kafka.connect/client/src/main/java/org/apache/rya/kafka/connect/client/CLIDriver.java
 ---
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.client;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import 
org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ArgumentsException;
+import 
org.apache.rya.kafka.connect.client.RyaKafkaClientCommand.ExecutionException;
+import org.apache.rya.kafka.connect.client.command.ReadStatementsCommand;
+import org.apache.rya.kafka.connect.client.command.WriteStatementsCommand;
+import org.eclipse.rdf4j.model.Statement;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A CLI tool used to read/write {@link Statement}s to/from a Kafka topic 
using the format
+ * the Rya Kafka Connect Sinks expect.
+ */
+@DefaultAnnotation(NonNull.class)
+public class CLIDriver {
+
+/**
+ * Maps from command strings to the object that performs the command.
+ */
+private static final ImmutableMap 
COMMANDS;
+static {
+final Set commandClasses = 
new HashSet<>();
+commandClasses.add(ReadStatementsCommand.class);
+commandClasses.add(WriteStatementsCommand.class);
+final ImmutableMap.Builder builder 
= ImmutableMap.builder();
+for(final Class commandClass : 
commandClasses) {
+try {
+final RyaKafkaClientCommand command = 
commandClass.newInstance();
+builder.put(command.getCommand(), command);
+} catch (InstantiationException | IllegalAccessException e) {
+System.err.println("Could not run the application because 
a RyaKafkaClientCommand is missing its empty constructor.");
+e.printStackTrace();
+}
+}
+COMMANDS = builder.build();
+}
+
+private static final String USAGE = makeUsage(COMMANDS);
+
+public static void main(final String[] args) {
+// If no command provided or the command isn't recognized, then 
print the usage.
+if (args.length == 0 || !COMMANDS.containsKey(args[0])) {
+System.out.println(USAGE);
+System.exit(1);
+}
+
+// Fetch the command that will be executed.
+final String command = args[0];
+final String[] commandArgs = Arrays.copyOfRange(args, 1, 
args.length);
+final RyaKafkaClientCommand clientCommand = COMMANDS.get(command);
+
+// Print usage if the arguments are invalid for the command.
+if(!clientCommand.validArguments(commandArgs)) {
+System.out.println(clientCommand.getUsage());
+System.exit(1);
+}
+
+// Execute the command.
+try {
--- End diff --

Done.


---


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472635#comment-16472635
 ] 

ASF GitHub Bot commented on RYA-487:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187730515
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java
 ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to 
Rya.
+ * 
+ * Implementations of this class only need to specify functionality that 
is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaSinkTask extends SinkTask {
+private static final Logger log = 
LoggerFactory.getLogger(RyaSinkTask.class);
+
+@Nullable
+private SailRepository sailRepo = null;
+
+@Nullable
+private SailRepositoryConnection conn = null;
+
+/**
+ * Throws an exception if the configured Rya Instance is not already 
installed
+ * within the configured database.
+ *
+ * @param taskConfig - The configuration values that were provided to 
the task. (not null)
+ * @throws ConnectException The configured Rya Instance is not 
installed to the configured database
+ *   or we were unable to figure out if it is installed.
+ */
+protected abstract void checkRyaInstanceExists(final Map taskConfig) throws ConnectException;
+
+/**
+ * Creates an initialized {@link Sail} object that may be used to 
write {@link Statement}s to the configured
+ * Rya Instance.
+ *
+ * @param taskConfig - Configures how the Sail object will be created. 
(not null)
+ * @return The created Sail object.
+ * @throws ConnectException The Sail object could not be made.
+ */
+protected abstract Sail makeSail(final Map taskConfig) 
throws ConnectException;
+
+@Override
+public String version() {
+return Manifests.exists("Build-Version") ? 
Manifests.read("Build-Version"): "UNKNOWN";
+}
+
+@Override
+public void start(final Map props) throws 
ConnectException {
+requireNonNull(props);
+
+// Ensure the configured Rya Instance is installed within the 
configured database.
+checkRyaInstanceExists(props);
+
+// Create the Sail object that is connected to the Rya Instance.
+final Sail sail = makeSail(props);
+sailRepo = new SailRepository( sail );
+conn = sailRepo.getConnection();
+}
+
+@Override
+public void put(final Collection records) {
+requireNonNull(records);
+
+// Return immediately if there are no records to handle.
+

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187730515
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java
 ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to 
Rya.
+ * 
+ * Implementations of this class only need to specify functionality that 
is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaSinkTask extends SinkTask {
+private static final Logger log = 
LoggerFactory.getLogger(RyaSinkTask.class);
+
+@Nullable
+private SailRepository sailRepo = null;
+
+@Nullable
+private SailRepositoryConnection conn = null;
+
+/**
+ * Throws an exception if the configured Rya Instance is not already 
installed
+ * within the configured database.
+ *
+ * @param taskConfig - The configuration values that were provided to 
the task. (not null)
+ * @throws ConnectException The configured Rya Instance is not 
installed to the configured database
+ *   or we were unable to figure out if it is installed.
+ */
+protected abstract void checkRyaInstanceExists(final Map taskConfig) throws ConnectException;
+
+/**
+ * Creates an initialized {@link Sail} object that may be used to 
write {@link Statement}s to the configured
+ * Rya Instance.
+ *
+ * @param taskConfig - Configures how the Sail object will be created. 
(not null)
+ * @return The created Sail object.
+ * @throws ConnectException The Sail object could not be made.
+ */
+protected abstract Sail makeSail(final Map taskConfig) 
throws ConnectException;
+
+@Override
+public String version() {
+return Manifests.exists("Build-Version") ? 
Manifests.read("Build-Version"): "UNKNOWN";
+}
+
+@Override
+public void start(final Map props) throws 
ConnectException {
+requireNonNull(props);
+
+// Ensure the configured Rya Instance is installed within the 
configured database.
+checkRyaInstanceExists(props);
+
+// Create the Sail object that is connected to the Rya Instance.
+final Sail sail = makeSail(props);
+sailRepo = new SailRepository( sail );
+conn = sailRepo.getConnection();
+}
+
+@Override
+public void put(final Collection records) {
+requireNonNull(records);
+
+// Return immediately if there are no records to handle.
+if(records.isEmpty()) {
+return;
+}
+
+// If a transaction has not been started yet, then start one.
+if(!conn.isActive()) {
+conn.begin();
+}
+
+ 

[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472633#comment-16472633
 ] 

ASF GitHub Bot commented on RYA-487:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187730101
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConnector.java
 ---
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Handles the common components required to task {@link RyaSinkTask}s 
that write to Rya.
+ * 
+ * Implementations of this class only need to specify functionality that 
is specific to the Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaSinkConnector extends SinkConnector {
+
+/**
+ * Get the configuration that will be provided to the tasks when 
{@link #taskConfigs(int)} is invoked.
+ * 
+ * Only called after start has been invoked
+ *
+ * @return The configuration object for the connector.
+ * @throws IllegalStateException Thrown if {@link 
SinkConnector#start(Map)} has not been invoked yet.
+ */
+protected abstract AbstractConfig getConfig() throws 
IllegalStateException;
+
+@Override
+public String version() {
+return Manifests.exists("Build-Version") ? 
Manifests.read("Build-Version") : "UNKNOWN";
+}
+
+@Override
+public List> taskConfigs(final int maxTasks) {
+final List> configs = new 
ArrayList<>(maxTasks);
+for(int i = 0; i < maxTasks; i++) {
+configs.add( getConfig().originalsStrings() );
+}
+return configs;
+}
+
+@Override
+public void stop() {
+// Nothing to do since the RyaSinkconnector has no background 
monitoring.
--- End diff --

Done.


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187730101
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkConnector.java
 ---
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.connect.sink.SinkConnector;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Handles the common components required to task {@link RyaSinkTask}s 
that write to Rya.
+ * 
+ * Implementations of this class only need to specify functionality that 
is specific to the Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaSinkConnector extends SinkConnector {
+
+/**
+ * Get the configuration that will be provided to the tasks when 
{@link #taskConfigs(int)} is invoked.
+ * 
+ * Only called after start has been invoked
+ *
+ * @return The configuration object for the connector.
+ * @throws IllegalStateException Thrown if {@link 
SinkConnector#start(Map)} has not been invoked yet.
+ */
+protected abstract AbstractConfig getConfig() throws 
IllegalStateException;
+
+@Override
+public String version() {
+return Manifests.exists("Build-Version") ? 
Manifests.read("Build-Version") : "UNKNOWN";
+}
+
+@Override
+public List> taskConfigs(final int maxTasks) {
+final List> configs = new 
ArrayList<>(maxTasks);
+for(int i = 0; i < maxTasks; i++) {
+configs.add( getConfig().originalsStrings() );
+}
+return configs;
+}
+
+@Override
+public void stop() {
+// Nothing to do since the RyaSinkconnector has no background 
monitoring.
--- End diff --

Done.


---


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472629#comment-16472629
 ] 

ASF GitHub Bot commented on RYA-487:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187729543
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
 ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 
AccumuloRyaSinkConfig(taskConfig);
+
+// Connect to the instance of Accumulo.
+final Connector connector;
+try {
+final Instance instance = new 
ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+connector = instance.getConnector(config.getUsername(), new 
PasswordToken( config.getPassword() ));
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new ConnectException("Could not create a Connector to 
the configured Accumulo instance.", e);
+}
+
+// Use a RyaClient to see if the configured instance exists.
+try {
+final AccumuloConnectionDetails connectionDetails = new 
AccumuloConnectionDetails(
+config.getUsername(),
+config.getPassword().toCharArray(),
+config.getClusterName(),
+config.getZookeepers());
+final RyaClient client = 
AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+config.getRyaInstanceName() + " has not been 
installed.");
+}
+
+} catch (final RyaClientException e) {
+throw new ConnectException("Unable to determine if the Rya 
Instance named " +
+config.getRyaInstanceName() + " has been installed.", 
e);
--- End diff --

Done.


> Kafka Connect Rya Sink
> 

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187729813
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java
 ---
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFHandler;
+import org.eclipse.rdf4j.rio.RDFHandlerException;
+import org.eclipse.rdf4j.rio.RDFParseException;
+import org.eclipse.rdf4j.rio.RDFParser;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFParserFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Deserializer} that is able to deserialize an RDF4J Rio 
Binary format serialized
+ * set of {@link Statement}s.
+ */
+@DefaultAnnotation(NonNull.class)
+public class StatementsDeserializer implements 
Deserializer {
+private static final Logger log = 
LoggerFactory.getLogger(StatementsDeserializer.class);
+
+private static final BinaryRDFParserFactory PARSER_FACTORY = new 
BinaryRDFParserFactory();
+
+@Override
+public void configure(final Map configs, final boolean 
isKey) {
+// Nothing to do.
+}
+
+@Override
+public Set deserialize(final String topic, final byte[] 
data) {
+if(data == null || data.length == 0) {
+// Return null because that is the contract of this method.
+return null;
+}
+
+try {
+final RDFParser parser = PARSER_FACTORY.getParser();
+final Set statements = new HashSet<>();
+
+parser.setRDFHandler(new RDFHandler() {
+@Override
+public void handleStatement(final Statement statement) 
throws RDFHandlerException {
+log.debug("Statement: " + statement);
+statements.add( statement );
+}
+
+@Override public void startRDF() throws 
RDFHandlerException { }
+@Override public void handleNamespace(final String arg0, 
final String arg1) throws RDFHandlerException { }
+@Override public void handleComment(final String arg0) 
throws RDFHandlerException { }
+@Override public void endRDF() throws RDFHandlerException 
{ }
+});
+
+parser.parse(new ByteArrayInputStream(data), null);
+return statements;
+
+} catch(final RDFParseException | RDFHandlerException | 
IOException e) {
+log.error("Could not deserialize a Set of VisibilityStatement 
objects using the RDF4J Rio Binray format.", e);
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187729543
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
 ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 
AccumuloRyaSinkConfig(taskConfig);
+
+// Connect to the instance of Accumulo.
+final Connector connector;
+try {
+final Instance instance = new 
ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+connector = instance.getConnector(config.getUsername(), new 
PasswordToken( config.getPassword() ));
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new ConnectException("Could not create a Connector to 
the configured Accumulo instance.", e);
+}
+
+// Use a RyaClient to see if the configured instance exists.
+try {
+final AccumuloConnectionDetails connectionDetails = new 
AccumuloConnectionDetails(
+config.getUsername(),
+config.getPassword().toCharArray(),
+config.getClusterName(),
+config.getZookeepers());
+final RyaClient client = 
AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+config.getRyaInstanceName() + " has not been 
installed.");
+}
+
+} catch (final RyaClientException e) {
+throw new ConnectException("Unable to determine if the Rya 
Instance named " +
+config.getRyaInstanceName() + " has been installed.", 
e);
--- End diff --

Done.


---


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472627#comment-16472627
 ] 

ASF GitHub Bot commented on RYA-487:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187729286
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
 ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 
AccumuloRyaSinkConfig(taskConfig);
+
+// Connect to the instance of Accumulo.
+final Connector connector;
+try {
+final Instance instance = new 
ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+connector = instance.getConnector(config.getUsername(), new 
PasswordToken( config.getPassword() ));
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new ConnectException("Could not create a Connector to 
the configured Accumulo instance.", e);
+}
+
+// Use a RyaClient to see if the configured instance exists.
+try {
+final AccumuloConnectionDetails connectionDetails = new 
AccumuloConnectionDetails(
+config.getUsername(),
+config.getPassword().toCharArray(),
+config.getClusterName(),
+config.getZookeepers());
+final RyaClient client = 
AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+config.getRyaInstanceName() + " has not been 
installed.");
--- End diff --

I'm assuming rya.api. Done.


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin 

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187729286
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
 ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 
AccumuloRyaSinkConfig(taskConfig);
+
+// Connect to the instance of Accumulo.
+final Connector connector;
+try {
+final Instance instance = new 
ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+connector = instance.getConnector(config.getUsername(), new 
PasswordToken( config.getPassword() ));
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new ConnectException("Could not create a Connector to 
the configured Accumulo instance.", e);
+}
+
+// Use a RyaClient to see if the configured instance exists.
+try {
+final AccumuloConnectionDetails connectionDetails = new 
AccumuloConnectionDetails(
+config.getUsername(),
+config.getPassword().toCharArray(),
+config.getClusterName(),
+config.getZookeepers());
+final RyaClient client = 
AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+config.getRyaInstanceName() + " has not been 
installed.");
--- End diff --

I'm assuming rya.api. Done.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187729157
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
 ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 
AccumuloRyaSinkConfig(taskConfig);
+
+// Connect to the instance of Accumulo.
+final Connector connector;
+try {
+final Instance instance = new 
ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+connector = instance.getConnector(config.getUsername(), new 
PasswordToken( config.getPassword() ));
+} catch (AccumuloException | AccumuloSecurityException e) {
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187727065
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
 ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 
AccumuloRyaSinkConfig(taskConfig);
+
+// Connect to the instance of Accumulo.
+final Connector connector;
+try {
+final Instance instance = new 
ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+connector = instance.getConnector(config.getUsername(), new 
PasswordToken( config.getPassword() ));
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new ConnectException("Could not create a Connector to 
the configured Accumulo instance.", e);
+}
+
+// Use a RyaClient to see if the configured instance exists.
+try {
+final AccumuloConnectionDetails connectionDetails = new 
AccumuloConnectionDetails(
+config.getUsername(),
+config.getPassword().toCharArray(),
+config.getClusterName(),
+config.getZookeepers());
+final RyaClient client = 
AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+config.getRyaInstanceName() + " has not been 
installed.");
+}
+
+} catch (final RyaClientException e) {
+throw new ConnectException("Unable to determine if the Rya 
Instance named " +
+config.getRyaInstanceName() + " has been installed.", 
e);
+}
+}
+
+@Override
+protected Sail makeSail(final Map taskConfig) throws 
ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 

[GitHub] incubator-rya pull request #295: RYA-494 Fixed a bug where the shell was not...

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/295#discussion_r187725947
  
--- Diff: 
extras/shell/src/test/java/org/apache/rya/shell/AccumuloRyaCommandsIT.java ---
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.shell;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.shell.util.InstallPrompt;
+import org.apache.rya.shell.util.PasswordPrompt;
+import org.apache.rya.shell.util.SparqlPrompt;
+import org.junit.Test;
+import org.springframework.context.ApplicationContext;
+import org.springframework.shell.Bootstrap;
+import org.springframework.shell.core.CommandResult;
+import org.springframework.shell.core.JLineShellComponent;
+
+import com.google.common.base.Optional;
+
+/**
+ * Integration tests the methods of {@link RyaCommands}.
--- End diff --

Done.


---


[jira] [Commented] (RYA-494) Shell insert and query bug.

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-494?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472608#comment-16472608
 ] 

ASF GitHub Bot commented on RYA-494:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/295#discussion_r187725795
  
--- Diff: 
extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloExecuteSparqlQueryIT.java
 ---
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.accumulo;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.test.accumulo.AccumuloITBase;
+import org.eclipse.rdf4j.model.IRI;
+import org.eclipse.rdf4j.model.Resource;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.model.ValueFactory;
+import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
+import org.eclipse.rdf4j.query.BindingSet;
+import org.eclipse.rdf4j.query.TupleQueryResult;
+import org.eclipse.rdf4j.rio.RDFFormat;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Integration tests the methods of {@link }.
+ */
+public class AccumuloExecuteSparqlQueryIT extends AccumuloITBase {
+
+@Test
+public void queryFindsAllLoadedStatements_fromSet() throws Exception {
+// Using the Rya Client, install an instance of Rya for the test.
+final AccumuloConnectionDetails connectionDetails = new 
AccumuloConnectionDetails(
+getUsername(),
+getPassword().toCharArray(),
+getInstanceName(),
+getZookeepers());
+
+final RyaClient client = 
AccumuloRyaClientFactory.build(connectionDetails, super.getConnector());
+
+final String ryaInstance = 
UUID.randomUUID().toString().replace('-', '_');
+client.getInstall().install(ryaInstance, 
InstallConfiguration.builder().build());
+
+// Load some data into the instance.
+final ValueFactory vf = SimpleValueFactory.getInstance();
+final Set statements = Sets.newHashSet(
+vf.createStatement(vf.createIRI("urn:Alice"), 
vf.createIRI("urn:talksTo"), vf.createIRI("urn:Bob")),
+vf.createStatement(vf.createIRI("urn:Bob"), 
vf.createIRI("urn:talksTo"), vf.createIRI("urn:Alice")),
+vf.createStatement(vf.createIRI("urn:Bob"), 
vf.createIRI("urn:talksTo"), vf.createIRI("urn:Charlie")),
+vf.createStatement(vf.createIRI("urn:Charlie"), 
vf.createIRI("urn:talksTo"), vf.createIRI("urn:Alice")),
+vf.createStatement(vf.createIRI("urn:David"), 
vf.createIRI("urn:talksTo"), vf.createIRI("urn:Eve")),
+vf.createStatement(vf.createIRI("urn:Eve"), 
vf.createIRI("urn:listensTo"), vf.createIRI("urn:Bob")));
+client.getLoadStatements().loadStatements(ryaInstance, statements);
+
+// Execute a query.
+final Set fetched = new HashSet<>();
+try(final TupleQueryResult result = 
client.getExecuteSparqlQuery().executeSparqlQuery(ryaInstance, "SELECT * WHERE 
{ ?s ?p ?o }")) {
+while(result.hasNext()) {
+final BindingSet bs = result.next();
+
+// If this is the statement that indicates the Rya version
+
if(bs.getBinding("p").getValue().equals(vf.createIRI("urn:org.apache.rya/2012/05#version")))
 {
--- End diff --

Done.


> Shell insert and query bug.
> ---
>
> Key: RYA-494
> URL: https://issues.apache.org/jira/browse/RYA-494
> Project: Rya
>  Issue Type: Bug
>

[GitHub] incubator-rya pull request #295: RYA-494 Fixed a bug where the shell was not...

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/295#discussion_r187725865
  
--- Diff: 
extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloExecuteSparqlQueryIT.java
 ---
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.accumulo;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.test.accumulo.AccumuloITBase;
+import org.eclipse.rdf4j.model.IRI;
+import org.eclipse.rdf4j.model.Resource;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.model.ValueFactory;
+import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
+import org.eclipse.rdf4j.query.BindingSet;
+import org.eclipse.rdf4j.query.TupleQueryResult;
+import org.eclipse.rdf4j.rio.RDFFormat;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Integration tests the methods of {@link }.
+ */
+public class AccumuloExecuteSparqlQueryIT extends AccumuloITBase {
+
+@Test
+public void queryFindsAllLoadedStatements_fromSet() throws Exception {
+// Using the Rya Client, install an instance of Rya for the test.
+final AccumuloConnectionDetails connectionDetails = new 
AccumuloConnectionDetails(
+getUsername(),
+getPassword().toCharArray(),
+getInstanceName(),
+getZookeepers());
+
+final RyaClient client = 
AccumuloRyaClientFactory.build(connectionDetails, super.getConnector());
+
+final String ryaInstance = 
UUID.randomUUID().toString().replace('-', '_');
+client.getInstall().install(ryaInstance, 
InstallConfiguration.builder().build());
+
+// Load some data into the instance.
+final ValueFactory vf = SimpleValueFactory.getInstance();
+final Set statements = Sets.newHashSet(
+vf.createStatement(vf.createIRI("urn:Alice"), 
vf.createIRI("urn:talksTo"), vf.createIRI("urn:Bob")),
+vf.createStatement(vf.createIRI("urn:Bob"), 
vf.createIRI("urn:talksTo"), vf.createIRI("urn:Alice")),
+vf.createStatement(vf.createIRI("urn:Bob"), 
vf.createIRI("urn:talksTo"), vf.createIRI("urn:Charlie")),
+vf.createStatement(vf.createIRI("urn:Charlie"), 
vf.createIRI("urn:talksTo"), vf.createIRI("urn:Alice")),
+vf.createStatement(vf.createIRI("urn:David"), 
vf.createIRI("urn:talksTo"), vf.createIRI("urn:Eve")),
+vf.createStatement(vf.createIRI("urn:Eve"), 
vf.createIRI("urn:listensTo"), vf.createIRI("urn:Bob")));
+client.getLoadStatements().loadStatements(ryaInstance, statements);
+
+// Execute a query.
+final Set fetched = new HashSet<>();
+try(final TupleQueryResult result = 
client.getExecuteSparqlQuery().executeSparqlQuery(ryaInstance, "SELECT * WHERE 
{ ?s ?p ?o }")) {
+while(result.hasNext()) {
+final BindingSet bs = result.next();
+
+// If this is the statement that indicates the Rya version
+
if(bs.getBinding("p").getValue().equals(vf.createIRI("urn:org.apache.rya/2012/05#version")))
 {
+continue;
+}
+
+// Otherwise add it to the list of fetched statements.
+fetched.add( vf.createStatement(
+(Resource)bs.getBinding("s").getValue(),
+(IRI)bs.getBinding("p").getValue(),
+bs.getBinding("o").getValue()) );
+}
+}
+
+// Show it resulted in the expected 

[GitHub] incubator-rya pull request #295: RYA-494 Fixed a bug where the shell was not...

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/295#discussion_r187725795
  
--- Diff: 
extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloExecuteSparqlQueryIT.java
 ---
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.accumulo;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.test.accumulo.AccumuloITBase;
+import org.eclipse.rdf4j.model.IRI;
+import org.eclipse.rdf4j.model.Resource;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.model.ValueFactory;
+import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
+import org.eclipse.rdf4j.query.BindingSet;
+import org.eclipse.rdf4j.query.TupleQueryResult;
+import org.eclipse.rdf4j.rio.RDFFormat;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Integration tests the methods of {@link }.
+ */
+public class AccumuloExecuteSparqlQueryIT extends AccumuloITBase {
+
+@Test
+public void queryFindsAllLoadedStatements_fromSet() throws Exception {
+// Using the Rya Client, install an instance of Rya for the test.
+final AccumuloConnectionDetails connectionDetails = new 
AccumuloConnectionDetails(
+getUsername(),
+getPassword().toCharArray(),
+getInstanceName(),
+getZookeepers());
+
+final RyaClient client = 
AccumuloRyaClientFactory.build(connectionDetails, super.getConnector());
+
+final String ryaInstance = 
UUID.randomUUID().toString().replace('-', '_');
+client.getInstall().install(ryaInstance, 
InstallConfiguration.builder().build());
+
+// Load some data into the instance.
+final ValueFactory vf = SimpleValueFactory.getInstance();
+final Set statements = Sets.newHashSet(
+vf.createStatement(vf.createIRI("urn:Alice"), 
vf.createIRI("urn:talksTo"), vf.createIRI("urn:Bob")),
+vf.createStatement(vf.createIRI("urn:Bob"), 
vf.createIRI("urn:talksTo"), vf.createIRI("urn:Alice")),
+vf.createStatement(vf.createIRI("urn:Bob"), 
vf.createIRI("urn:talksTo"), vf.createIRI("urn:Charlie")),
+vf.createStatement(vf.createIRI("urn:Charlie"), 
vf.createIRI("urn:talksTo"), vf.createIRI("urn:Alice")),
+vf.createStatement(vf.createIRI("urn:David"), 
vf.createIRI("urn:talksTo"), vf.createIRI("urn:Eve")),
+vf.createStatement(vf.createIRI("urn:Eve"), 
vf.createIRI("urn:listensTo"), vf.createIRI("urn:Bob")));
+client.getLoadStatements().loadStatements(ryaInstance, statements);
+
+// Execute a query.
+final Set fetched = new HashSet<>();
+try(final TupleQueryResult result = 
client.getExecuteSparqlQuery().executeSparqlQuery(ryaInstance, "SELECT * WHERE 
{ ?s ?p ?o }")) {
+while(result.hasNext()) {
+final BindingSet bs = result.next();
+
+// If this is the statement that indicates the Rya version
+
if(bs.getBinding("p").getValue().equals(vf.createIRI("urn:org.apache.rya/2012/05#version")))
 {
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #295: RYA-494 Fixed a bug where the shell was not...

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/295#discussion_r187724995
  
--- Diff: 
extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloExecuteSparqlQueryIT.java
 ---
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.api.client.accumulo;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.test.accumulo.AccumuloITBase;
+import org.eclipse.rdf4j.model.IRI;
+import org.eclipse.rdf4j.model.Resource;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.model.ValueFactory;
+import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
+import org.eclipse.rdf4j.query.BindingSet;
+import org.eclipse.rdf4j.query.TupleQueryResult;
+import org.eclipse.rdf4j.rio.RDFFormat;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Integration tests the methods of {@link }.
--- End diff --

Done.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187717175
  
--- Diff: 
extras/kafka.connect/mongo-it/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTaskIT.java
 ---
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.test.mongo.MongoITBase;
+import org.junit.Test;
+
+/**
+ * Integration tests the methods of {@link MongoRyaSinkTask}.
+ */
+public class MongoRyaSinkTaskIT extends MongoITBase {
+
+@Test
+public void instanceExists() throws Exception {
+// Install an instance of Rya.
+final String ryaInstanceName = "rya";
+final MongoConnectionDetails connectionDetails = new 
MongoConnectionDetails(
+super.getMongoHostname(),
+super.getMongoPort(),
+Optional.empty(),
+Optional.empty());
+
+final InstallConfiguration installConfig = 
InstallConfiguration.builder()
+.setEnableTableHashPrefix(false)
+.setEnableEntityCentricIndex(false)
+.setEnableFreeTextIndex(false)
+.setEnableTemporalIndex(false)
+.setEnablePcjIndex(false)
+.setEnableGeoIndex(false)
+.build();
+
+final RyaClient ryaClient = 
MongoRyaClientFactory.build(connectionDetails, super.getMongoClient());
+ryaClient.getInstall().install(ryaInstanceName, installConfig);
+
+// Create the task that will be tested.
+final MongoRyaSinkTask task = new MongoRyaSinkTask();
+
+try {
+// Configure the task to use the embedded Mongo DB instance 
for Rya.
+final Map config = new HashMap<>();
+config.put(MongoRyaSinkConfig.HOSTNAME, 
super.getMongoHostname());
+config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort());
+config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, "rya");
+
+// This will pass because the Rya instance exists.
+task.start(config);
+} finally {
+task.stop();
+}
+}
+
+@Test(expected = ConnectException.class)
+public void instanceDoesNotExist() throws Exception {
+// Create the task that will be tested.
+final MongoRyaSinkTask task = new MongoRyaSinkTask();
+
+try {
+// Configure the task to use the embedded Mongo DB instance 
for Rya.
+final Map config = new HashMap<>();
+config.put(MongoRyaSinkConfig.HOSTNAME, 
super.getMongoHostname());
+config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort());
+config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, 
"instance-does-not-exist");
+
+// Starting the task will fail because the Rya instance does 
not exist.
+task.start(config);
+} finally {
+task.stop();
+}
+}
+
+// TODO show that inserts using visibilities work.
--- End diff --

TODO?


---


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472577#comment-16472577
 ] 

ASF GitHub Bot commented on RYA-487:


Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187718927
  
--- Diff: 
extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java
 ---
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkConnector} that uses a Mongo DB Rya backend when 
creating tasks.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkConnector extends RyaSinkConnector {
+
+@Nullable
+private MongoRyaSinkConfig config = null;
+
+@Override
+public void start(final Map props) {
+this.config = new MongoRyaSinkConfig( props );
+}
+
+@Override
+protected AbstractConfig getConfig() {
+if(config == null) {
+throw new IllegalStateException("The configuration has not 
been set yet. Invoke start(props) first.");
--- End diff --

same start(Map)


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472576#comment-16472576
 ] 

ASF GitHub Bot commented on RYA-487:


Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187717488
  
--- Diff: 
extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java
 ---
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka Connect configuration that is used to configure {@link 
MongoRyaSinkConnector}s and {@link MongoRyaSinkTask}s.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkConfig extends RyaSinkConfig {
+
+public static final String HOSTNAME = "mongo.hostname";
+private static final String HOSTNAME_DOC = "The Mongo DB hostname the 
Sail connections wlll use.";
--- End diff --

typo: wlll


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187717488
  
--- Diff: 
extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConfig.java
 ---
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConfig;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka Connect configuration that is used to configure {@link 
MongoRyaSinkConnector}s and {@link MongoRyaSinkTask}s.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkConfig extends RyaSinkConfig {
+
+public static final String HOSTNAME = "mongo.hostname";
+private static final String HOSTNAME_DOC = "The Mongo DB hostname the 
Sail connections wlll use.";
--- End diff --

typo: wlll


---


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472575#comment-16472575
 ] 

ASF GitHub Bot commented on RYA-487:


Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187717175
  
--- Diff: 
extras/kafka.connect/mongo-it/src/test/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTaskIT.java
 ---
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.Install.InstallConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.test.mongo.MongoITBase;
+import org.junit.Test;
+
+/**
+ * Integration tests the methods of {@link MongoRyaSinkTask}.
+ */
+public class MongoRyaSinkTaskIT extends MongoITBase {
+
+@Test
+public void instanceExists() throws Exception {
+// Install an instance of Rya.
+final String ryaInstanceName = "rya";
+final MongoConnectionDetails connectionDetails = new 
MongoConnectionDetails(
+super.getMongoHostname(),
+super.getMongoPort(),
+Optional.empty(),
+Optional.empty());
+
+final InstallConfiguration installConfig = 
InstallConfiguration.builder()
+.setEnableTableHashPrefix(false)
+.setEnableEntityCentricIndex(false)
+.setEnableFreeTextIndex(false)
+.setEnableTemporalIndex(false)
+.setEnablePcjIndex(false)
+.setEnableGeoIndex(false)
+.build();
+
+final RyaClient ryaClient = 
MongoRyaClientFactory.build(connectionDetails, super.getMongoClient());
+ryaClient.getInstall().install(ryaInstanceName, installConfig);
+
+// Create the task that will be tested.
+final MongoRyaSinkTask task = new MongoRyaSinkTask();
+
+try {
+// Configure the task to use the embedded Mongo DB instance 
for Rya.
+final Map config = new HashMap<>();
+config.put(MongoRyaSinkConfig.HOSTNAME, 
super.getMongoHostname());
+config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort());
+config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, "rya");
+
+// This will pass because the Rya instance exists.
+task.start(config);
+} finally {
+task.stop();
+}
+}
+
+@Test(expected = ConnectException.class)
+public void instanceDoesNotExist() throws Exception {
+// Create the task that will be tested.
+final MongoRyaSinkTask task = new MongoRyaSinkTask();
+
+try {
+// Configure the task to use the embedded Mongo DB instance 
for Rya.
+final Map config = new HashMap<>();
+config.put(MongoRyaSinkConfig.HOSTNAME, 
super.getMongoHostname());
+config.put(MongoRyaSinkConfig.PORT, "" + super.getMongoPort());
+config.put(MongoRyaSinkConfig.RYA_INSTANCE_NAME, 
"instance-does-not-exist");
+
+// Starting the task will fail because the Rya instance does 
not exist.
+task.start(config);
+} finally {
+task.stop();
+}
+}
+
+// TODO show that inserts using visibilities work.
--- End diff --

TODO?


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
>  

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187718927
  
--- Diff: 
extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkConnector.java
 ---
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkConnector} that uses a Mongo DB Rya backend when 
creating tasks.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkConnector extends RyaSinkConnector {
+
+@Nullable
+private MongoRyaSinkConfig config = null;
+
+@Override
+public void start(final Map props) {
+this.config = new MongoRyaSinkConfig( props );
+}
+
+@Override
+protected AbstractConfig getConfig() {
+if(config == null) {
+throw new IllegalStateException("The configuration has not 
been set yet. Invoke start(props) first.");
--- End diff --

same start(Map)


---


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472535#comment-16472535
 ] 

ASF GitHub Bot commented on RYA-487:


Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187698179
  
--- Diff: 
dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java
 ---
@@ -62,14 +62,14 @@ public AccumuloRdfConfiguration() {
 super();
 }
 
-public AccumuloRdfConfiguration(Configuration other) {
+public AccumuloRdfConfiguration(final Configuration other) {
 super(other);
 }
 
-public AccumuloRdfConfigurationBuilder getBuilder() {
+public static AccumuloRdfConfigurationBuilder getBuilder() {
--- End diff --

isn't the convention usually builder() for static builder functions?


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472536#comment-16472536
 ] 

ASF GitHub Bot commented on RYA-487:


Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187701955
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
 ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 
AccumuloRyaSinkConfig(taskConfig);
+
+// Connect to the instance of Accumulo.
+final Connector connector;
+try {
+final Instance instance = new 
ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+connector = instance.getConnector(config.getUsername(), new 
PasswordToken( config.getPassword() ));
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new ConnectException("Could not create a Connector to 
the configured Accumulo instance.", e);
+}
+
+// Use a RyaClient to see if the configured instance exists.
+try {
+final AccumuloConnectionDetails connectionDetails = new 
AccumuloConnectionDetails(
--- End diff --

a quick future work nice to have idea: we have a 
ConnectionDetails->Configuration, but not the other way around.  would make 
stuff like this pretty easy.


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472540#comment-16472540
 ] 

ASF GitHub Bot commented on RYA-487:


Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187702555
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
 ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 
AccumuloRyaSinkConfig(taskConfig);
+
+// Connect to the instance of Accumulo.
+final Connector connector;
+try {
+final Instance instance = new 
ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+connector = instance.getConnector(config.getUsername(), new 
PasswordToken( config.getPassword() ));
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new ConnectException("Could not create a Connector to 
the configured Accumulo instance.", e);
+}
+
+// Use a RyaClient to see if the configured instance exists.
+try {
+final AccumuloConnectionDetails connectionDetails = new 
AccumuloConnectionDetails(
+config.getUsername(),
+config.getPassword().toCharArray(),
+config.getClusterName(),
+config.getZookeepers());
+final RyaClient client = 
AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+config.getRyaInstanceName() + " has not been 
installed.");
+}
+
+} catch (final RyaClientException e) {
+throw new ConnectException("Unable to determine if the Rya 
Instance named " +
+config.getRyaInstanceName() + " has been installed.", 
e);
+}
+}
+
+@Override
+protected Sail 

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187698795
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java 
---
@@ -438,6 +438,9 @@ public static boolean getUsePcjUpdaterIndex(final 
Configuration conf) {
 return Optional.fromNullable(conf.get(FLUO_APP_NAME));
 }
 
+public static void setUseMongo(final Configuration conf, final boolean 
useMongo) {
--- End diff --

can you add this to the constructor of the MongoDbRDFConfiguration 
constructor?  if we're going to keep using this field, it would make sense for 
that to set it.


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187708244
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java
 ---
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when 
creating tasks.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkConnector extends RyaSinkConnector {
+
+@Nullable
+private AccumuloRyaSinkConfig config = null;
+
+@Override
+public void start(final Map props) {
+this.config = new AccumuloRyaSinkConfig( props );
+}
+
+@Override
+protected AbstractConfig getConfig() {
+if(config == null) {
+throw new IllegalStateException("The configuration has not 
been set yet. Invoke start(props) first.");
--- End diff --

usually the doc'd function has the parameters be the type, not the name: 
start(Map)


---


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472539#comment-16472539
 ] 

ASF GitHub Bot commented on RYA-487:


Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187709021
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java
 ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to 
Rya.
+ * 
+ * Implementations of this class only need to specify functionality that 
is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaSinkTask extends SinkTask {
+private static final Logger log = 
LoggerFactory.getLogger(RyaSinkTask.class);
+
+@Nullable
+private SailRepository sailRepo = null;
+
+@Nullable
+private SailRepositoryConnection conn = null;
+
+/**
+ * Throws an exception if the configured Rya Instance is not already 
installed
+ * within the configured database.
+ *
+ * @param taskConfig - The configuration values that were provided to 
the task. (not null)
+ * @throws ConnectException The configured Rya Instance is not 
installed to the configured database
+ *   or we were unable to figure out if it is installed.
+ */
+protected abstract void checkRyaInstanceExists(final Map taskConfig) throws ConnectException;
+
+/**
+ * Creates an initialized {@link Sail} object that may be used to 
write {@link Statement}s to the configured
+ * Rya Instance.
+ *
+ * @param taskConfig - Configures how the Sail object will be created. 
(not null)
+ * @return The created Sail object.
+ * @throws ConnectException The Sail object could not be made.
+ */
+protected abstract Sail makeSail(final Map taskConfig) 
throws ConnectException;
+
+@Override
+public String version() {
+return Manifests.exists("Build-Version") ? 
Manifests.read("Build-Version"): "UNKNOWN";
+}
+
+@Override
+public void start(final Map props) throws 
ConnectException {
+requireNonNull(props);
+
+// Ensure the configured Rya Instance is installed within the 
configured database.
+checkRyaInstanceExists(props);
+
+// Create the Sail object that is connected to the Rya Instance.
+final Sail sail = makeSail(props);
+sailRepo = new SailRepository( sail );
+conn = sailRepo.getConnection();
+}
+
+@Override
+public void put(final Collection records) {
+requireNonNull(records);
+
+// Return immediately if there are no records to handle.
+

[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472534#comment-16472534
 ] 

ASF GitHub Bot commented on RYA-487:


Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187699543
  
--- Diff: extras/kafka.connect/README.md ---
@@ -0,0 +1,22 @@
+
+
+The parent project for all Rya Kafka Connect work. All projects thare are 
part 
+of that system must use this project's pom as their parent pom.
--- End diff --

typo  All projects that* are


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187708536
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java
 ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to 
Rya.
+ * 
+ * Implementations of this class only need to specify functionality that 
is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
--- End diff --

maybe I'm not clear what this annotation does, but again, all the fields 
are declared Nullable


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187698179
  
--- Diff: 
dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/AccumuloRdfConfiguration.java
 ---
@@ -62,14 +62,14 @@ public AccumuloRdfConfiguration() {
 super();
 }
 
-public AccumuloRdfConfiguration(Configuration other) {
+public AccumuloRdfConfiguration(final Configuration other) {
 super(other);
 }
 
-public AccumuloRdfConfigurationBuilder getBuilder() {
+public static AccumuloRdfConfigurationBuilder getBuilder() {
--- End diff --

isn't the convention usually builder() for static builder functions?


---


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472542#comment-16472542
 ] 

ASF GitHub Bot commented on RYA-487:


Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187702276
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
 ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 
AccumuloRyaSinkConfig(taskConfig);
+
+// Connect to the instance of Accumulo.
+final Connector connector;
+try {
+final Instance instance = new 
ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+connector = instance.getConnector(config.getUsername(), new 
PasswordToken( config.getPassword() ));
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new ConnectException("Could not create a Connector to 
the configured Accumulo instance.", e);
+}
+
+// Use a RyaClient to see if the configured instance exists.
+try {
+final AccumuloConnectionDetails connectionDetails = new 
AccumuloConnectionDetails(
+config.getUsername(),
+config.getPassword().toCharArray(),
+config.getClusterName(),
+config.getZookeepers());
+final RyaClient client = 
AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+config.getRyaInstanceName() + " has not been 
installed.");
--- End diff --

LogUtils from which package?  


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin 

[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472545#comment-16472545
 ] 

ASF GitHub Bot commented on RYA-487:


Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187701248
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java
 ---
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when 
creating tasks.
+ */
+@DefaultAnnotation(NonNull.class)
--- End diff --

since there is one field that is marked nullable, is this annotation 
usefull?


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472537#comment-16472537
 ] 

ASF GitHub Bot commented on RYA-487:


Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187698795
  
--- Diff: 
extras/indexing/src/main/java/org/apache/rya/indexing/accumulo/ConfigUtils.java 
---
@@ -438,6 +438,9 @@ public static boolean getUsePcjUpdaterIndex(final 
Configuration conf) {
 return Optional.fromNullable(conf.get(FLUO_APP_NAME));
 }
 
+public static void setUseMongo(final Configuration conf, final boolean 
useMongo) {
--- End diff --

can you add this to the constructor of the MongoDbRDFConfiguration 
constructor?  if we're going to keep using this field, it would make sense for 
that to set it.


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187702555
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
 ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 
AccumuloRyaSinkConfig(taskConfig);
+
+// Connect to the instance of Accumulo.
+final Connector connector;
+try {
+final Instance instance = new 
ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+connector = instance.getConnector(config.getUsername(), new 
PasswordToken( config.getPassword() ));
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new ConnectException("Could not create a Connector to 
the configured Accumulo instance.", e);
+}
+
+// Use a RyaClient to see if the configured instance exists.
+try {
+final AccumuloConnectionDetails connectionDetails = new 
AccumuloConnectionDetails(
+config.getUsername(),
+config.getPassword().toCharArray(),
+config.getClusterName(),
+config.getZookeepers());
+final RyaClient client = 
AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+config.getRyaInstanceName() + " has not been 
installed.");
+}
+
+} catch (final RyaClientException e) {
+throw new ConnectException("Unable to determine if the Rya 
Instance named " +
+config.getRyaInstanceName() + " has been installed.", 
e);
+}
+}
+
+@Override
+protected Sail makeSail(final Map taskConfig) throws 
ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187706947
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java
 ---
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFWriter;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Serializer} that is able to serialize a set of {@link 
Statement}s
+ * using the RDF4J Rio Binary format.
--- End diff --

i feel like it might be worthwhile to mention that you use the RDFParser 
and RDFWriter here and the Deserializer respectively.  


---


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472538#comment-16472538
 ] 

ASF GitHub Bot commented on RYA-487:


Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187706947
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java
 ---
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFWriter;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Serializer} that is able to serialize a set of {@link 
Statement}s
+ * using the RDF4J Rio Binary format.
--- End diff --

i feel like it might be worthwhile to mention that you use the RDFParser 
and RDFWriter here and the Deserializer respectively.  


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187709021
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java
 ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to 
Rya.
+ * 
+ * Implementations of this class only need to specify functionality that 
is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaSinkTask extends SinkTask {
+private static final Logger log = 
LoggerFactory.getLogger(RyaSinkTask.class);
+
+@Nullable
+private SailRepository sailRepo = null;
+
+@Nullable
+private SailRepositoryConnection conn = null;
+
+/**
+ * Throws an exception if the configured Rya Instance is not already 
installed
+ * within the configured database.
+ *
+ * @param taskConfig - The configuration values that were provided to 
the task. (not null)
+ * @throws ConnectException The configured Rya Instance is not 
installed to the configured database
+ *   or we were unable to figure out if it is installed.
+ */
+protected abstract void checkRyaInstanceExists(final Map taskConfig) throws ConnectException;
+
+/**
+ * Creates an initialized {@link Sail} object that may be used to 
write {@link Statement}s to the configured
+ * Rya Instance.
+ *
+ * @param taskConfig - Configures how the Sail object will be created. 
(not null)
+ * @return The created Sail object.
+ * @throws ConnectException The Sail object could not be made.
+ */
+protected abstract Sail makeSail(final Map taskConfig) 
throws ConnectException;
+
+@Override
+public String version() {
+return Manifests.exists("Build-Version") ? 
Manifests.read("Build-Version"): "UNKNOWN";
+}
+
+@Override
+public void start(final Map props) throws 
ConnectException {
+requireNonNull(props);
+
+// Ensure the configured Rya Instance is installed within the 
configured database.
+checkRyaInstanceExists(props);
+
+// Create the Sail object that is connected to the Rya Instance.
+final Sail sail = makeSail(props);
+sailRepo = new SailRepository( sail );
+conn = sailRepo.getConnection();
+}
+
+@Override
+public void put(final Collection records) {
+requireNonNull(records);
+
+// Return immediately if there are no records to handle.
+if(records.isEmpty()) {
+return;
+}
+
+// If a transaction has not been started yet, then start one.
+if(!conn.isActive()) {
+conn.begin();
+}
+
+  

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187699543
  
--- Diff: extras/kafka.connect/README.md ---
@@ -0,0 +1,22 @@
+
+
+The parent project for all Rya Kafka Connect work. All projects thare are 
part 
+of that system must use this project's pom as their parent pom.
--- End diff --

typo  All projects that* are


---


[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread isper3at
Github user isper3at commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187701248
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkConnector.java
 ---
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkConnector;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkConnector} that uses an Accumulo Rya backend when 
creating tasks.
+ */
+@DefaultAnnotation(NonNull.class)
--- End diff --

since there is one field that is marked nullable, is this annotation 
usefull?


---


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472225#comment-16472225
 ] 

ASF GitHub Bot commented on RYA-487:


Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187667486
  
--- Diff: 
extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java
 ---
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import com.google.common.base.Strings;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws IllegalStateException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final MongoRyaSinkConfig config = new 
MongoRyaSinkConfig(taskConfig);
+@Nullable
+final String username = 
Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername();
+@Nullable
+final char[] password = 
Strings.isNullOrEmpty(config.getPassword()) ? null : 
config.getPassword().toCharArray();
+
+// Connect a Mongo Client to the configured Mongo DB instance.
+final ServerAddress serverAddr = new 
ServerAddress(config.getHostname(), config.getPort());
+final boolean hasCredentials = username != null && password != 
null;
+
+try(MongoClient mongoClient = hasCredentials ?
+new MongoClient(serverAddr, 
Arrays.asList(MongoCredential.createCredential(username, 
config.getRyaInstanceName(), password))) :
+new MongoClient(serverAddr)) {
+// Use a RyaClient to see if the configured instance exists.
+// Create the Mongo Connection Details that describe the Mongo 
DB Server we are interacting with.
+final MongoConnectionDetails connectionDetails = new 
MongoConnectionDetails(
+config.getHostname(),
+config.getPort(),
+Optional.ofNullable(username),
+Optional.ofNullable(password));
+
+final RyaClient client = 
MongoRyaClientFactory.build(connectionDetails, mongoClient);
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread ejwhite922
Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187667486
  
--- Diff: 
extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java
 ---
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import com.google.common.base.Strings;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws IllegalStateException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final MongoRyaSinkConfig config = new 
MongoRyaSinkConfig(taskConfig);
+@Nullable
+final String username = 
Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername();
+@Nullable
+final char[] password = 
Strings.isNullOrEmpty(config.getPassword()) ? null : 
config.getPassword().toCharArray();
+
+// Connect a Mongo Client to the configured Mongo DB instance.
+final ServerAddress serverAddr = new 
ServerAddress(config.getHostname(), config.getPort());
+final boolean hasCredentials = username != null && password != 
null;
+
+try(MongoClient mongoClient = hasCredentials ?
+new MongoClient(serverAddr, 
Arrays.asList(MongoCredential.createCredential(username, 
config.getRyaInstanceName(), password))) :
+new MongoClient(serverAddr)) {
+// Use a RyaClient to see if the configured instance exists.
+// Create the Mongo Connection Details that describe the Mongo 
DB Server we are interacting with.
+final MongoConnectionDetails connectionDetails = new 
MongoConnectionDetails(
+config.getHostname(),
+config.getPort(),
+Optional.ofNullable(username),
+Optional.ofNullable(password));
+
+final RyaClient client = 
MongoRyaClientFactory.build(connectionDetails, mongoClient);
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+config.getRyaInstanceName() + " has not been 
installed.");
+}
+} catch(final RyaClientException e) {
+throw new ConnectException("Unable to determine if the Rya 

[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472224#comment-16472224
 ] 

ASF GitHub Bot commented on RYA-487:


Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187667235
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
 ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 
AccumuloRyaSinkConfig(taskConfig);
+
+// Connect to the instance of Accumulo.
+final Connector connector;
+try {
+final Instance instance = new 
ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+connector = instance.getConnector(config.getUsername(), new 
PasswordToken( config.getPassword() ));
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new ConnectException("Could not create a Connector to 
the configured Accumulo instance.", e);
+}
+
+// Use a RyaClient to see if the configured instance exists.
+try {
+final AccumuloConnectionDetails connectionDetails = new 
AccumuloConnectionDetails(
+config.getUsername(),
+config.getPassword().toCharArray(),
+config.getClusterName(),
+config.getZookeepers());
+final RyaClient client = 
AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+config.getRyaInstanceName() + " has not been 
installed.");
+}
+
+} catch (final RyaClientException e) {
+throw new ConnectException("Unable to determine if the Rya 
Instance named " +
+config.getRyaInstanceName() + " has been installed.", 
e);
+}
+}
+
+@Override
+protected Sail 

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread ejwhite922
Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187667235
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
 ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 
AccumuloRyaSinkConfig(taskConfig);
+
+// Connect to the instance of Accumulo.
+final Connector connector;
+try {
+final Instance instance = new 
ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+connector = instance.getConnector(config.getUsername(), new 
PasswordToken( config.getPassword() ));
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new ConnectException("Could not create a Connector to 
the configured Accumulo instance.", e);
+}
+
+// Use a RyaClient to see if the configured instance exists.
+try {
+final AccumuloConnectionDetails connectionDetails = new 
AccumuloConnectionDetails(
+config.getUsername(),
+config.getPassword().toCharArray(),
+config.getClusterName(),
+config.getZookeepers());
+final RyaClient client = 
AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+config.getRyaInstanceName() + " has not been 
installed.");
+}
+
+} catch (final RyaClientException e) {
+throw new ConnectException("Unable to determine if the Rya 
Instance named " +
+config.getRyaInstanceName() + " has been installed.", 
e);
+}
+}
+
+@Override
+protected Sail makeSail(final Map taskConfig) throws 
ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 

[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472162#comment-16472162
 ] 

ASF GitHub Bot commented on RYA-487:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187657223
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
 ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 
AccumuloRyaSinkConfig(taskConfig);
+
+// Connect to the instance of Accumulo.
+final Connector connector;
+try {
+final Instance instance = new 
ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+connector = instance.getConnector(config.getUsername(), new 
PasswordToken( config.getPassword() ));
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new ConnectException("Could not create a Connector to 
the configured Accumulo instance.", e);
+}
+
+// Use a RyaClient to see if the configured instance exists.
+try {
+final AccumuloConnectionDetails connectionDetails = new 
AccumuloConnectionDetails(
+config.getUsername(),
+config.getPassword().toCharArray(),
+config.getClusterName(),
+config.getZookeepers());
+final RyaClient client = 
AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+config.getRyaInstanceName() + " has not been 
installed.");
+}
+
+} catch (final RyaClientException e) {
+throw new ConnectException("Unable to determine if the Rya 
Instance named " +
+config.getRyaInstanceName() + " has been installed.", 
e);
+}
+}
+
+@Override
+protected Sail 

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187657223
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
 ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 
AccumuloRyaSinkConfig(taskConfig);
+
+// Connect to the instance of Accumulo.
+final Connector connector;
+try {
+final Instance instance = new 
ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+connector = instance.getConnector(config.getUsername(), new 
PasswordToken( config.getPassword() ));
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new ConnectException("Could not create a Connector to 
the configured Accumulo instance.", e);
+}
+
+// Use a RyaClient to see if the configured instance exists.
+try {
+final AccumuloConnectionDetails connectionDetails = new 
AccumuloConnectionDetails(
+config.getUsername(),
+config.getPassword().toCharArray(),
+config.getClusterName(),
+config.getZookeepers());
+final RyaClient client = 
AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+config.getRyaInstanceName() + " has not been 
installed.");
+}
+
+} catch (final RyaClientException e) {
+throw new ConnectException("Unable to determine if the Rya 
Instance named " +
+config.getRyaInstanceName() + " has been installed.", 
e);
+}
+}
+
+@Override
+protected Sail makeSail(final Map taskConfig) throws 
ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 

[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472160#comment-16472160
 ] 

ASF GitHub Bot commented on RYA-487:


Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187657023
  
--- Diff: 
extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java
 ---
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import com.google.common.base.Strings;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws IllegalStateException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final MongoRyaSinkConfig config = new 
MongoRyaSinkConfig(taskConfig);
+@Nullable
+final String username = 
Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername();
+@Nullable
+final char[] password = 
Strings.isNullOrEmpty(config.getPassword()) ? null : 
config.getPassword().toCharArray();
+
+// Connect a Mongo Client to the configured Mongo DB instance.
+final ServerAddress serverAddr = new 
ServerAddress(config.getHostname(), config.getPort());
+final boolean hasCredentials = username != null && password != 
null;
+
+try(MongoClient mongoClient = hasCredentials ?
+new MongoClient(serverAddr, 
Arrays.asList(MongoCredential.createCredential(username, 
config.getRyaInstanceName(), password))) :
+new MongoClient(serverAddr)) {
+// Use a RyaClient to see if the configured instance exists.
+// Create the Mongo Connection Details that describe the Mongo 
DB Server we are interacting with.
+final MongoConnectionDetails connectionDetails = new 
MongoConnectionDetails(
+config.getHostname(),
+config.getPort(),
+Optional.ofNullable(username),
+Optional.ofNullable(password));
+
+final RyaClient client = 
MongoRyaClientFactory.build(connectionDetails, mongoClient);
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+ 

[GitHub] incubator-rya pull request #296: RYA-487 Kafka Connect Sinks

2018-05-11 Thread kchilton2
Github user kchilton2 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187657023
  
--- Diff: 
extras/kafka.connect/mongo/src/main/java/org/apache/rya/kafka/connect/mongo/MongoRyaSinkTask.java
 ---
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.mongo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.mongo.MongoConnectionDetails;
+import org.apache.rya.api.client.mongo.MongoRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import com.google.common.base.Strings;
+import com.mongodb.MongoClient;
+import com.mongodb.MongoCredential;
+import com.mongodb.ServerAddress;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * A {@link RyaSinkTask} that uses the Mongo DB implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class MongoRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws IllegalStateException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final MongoRyaSinkConfig config = new 
MongoRyaSinkConfig(taskConfig);
+@Nullable
+final String username = 
Strings.isNullOrEmpty(config.getUsername()) ? null : config.getUsername();
+@Nullable
+final char[] password = 
Strings.isNullOrEmpty(config.getPassword()) ? null : 
config.getPassword().toCharArray();
+
+// Connect a Mongo Client to the configured Mongo DB instance.
+final ServerAddress serverAddr = new 
ServerAddress(config.getHostname(), config.getPort());
+final boolean hasCredentials = username != null && password != 
null;
+
+try(MongoClient mongoClient = hasCredentials ?
+new MongoClient(serverAddr, 
Arrays.asList(MongoCredential.createCredential(username, 
config.getRyaInstanceName(), password))) :
+new MongoClient(serverAddr)) {
+// Use a RyaClient to see if the configured instance exists.
+// Create the Mongo Connection Details that describe the Mongo 
DB Server we are interacting with.
+final MongoConnectionDetails connectionDetails = new 
MongoConnectionDetails(
+config.getHostname(),
+config.getPort(),
+Optional.ofNullable(username),
+Optional.ofNullable(password));
+
+final RyaClient client = 
MongoRyaClientFactory.build(connectionDetails, mongoClient);
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+config.getRyaInstanceName() + " has not been 
installed.");
+}
+} catch(final RyaClientException e) {
+throw new ConnectException("Unable to determine if the Rya 

[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472088#comment-16472088
 ] 

ASF GitHub Bot commented on RYA-487:


Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187628752
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
 ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 
AccumuloRyaSinkConfig(taskConfig);
+
+// Connect to the instance of Accumulo.
+final Connector connector;
+try {
+final Instance instance = new 
ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+connector = instance.getConnector(config.getUsername(), new 
PasswordToken( config.getPassword() ));
+} catch (AccumuloException | AccumuloSecurityException e) {
--- End diff --

final


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472093#comment-16472093
 ] 

ASF GitHub Bot commented on RYA-487:


Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187634433
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/sink/RyaSinkTask.java
 ---
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api.sink;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.repository.sail.SailRepository;
+import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
+import org.eclipse.rdf4j.sail.Sail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.jcabi.manifests.Manifests;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
+/**
+ * Handles the common components required to write {@link Statement}s to 
Rya.
+ * 
+ * Implementations of this class only need to specify functionality that 
is specific to the
+ * Rya implementation.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class RyaSinkTask extends SinkTask {
+private static final Logger log = 
LoggerFactory.getLogger(RyaSinkTask.class);
+
+@Nullable
+private SailRepository sailRepo = null;
+
+@Nullable
+private SailRepositoryConnection conn = null;
+
+/**
+ * Throws an exception if the configured Rya Instance is not already 
installed
+ * within the configured database.
+ *
+ * @param taskConfig - The configuration values that were provided to 
the task. (not null)
+ * @throws ConnectException The configured Rya Instance is not 
installed to the configured database
+ *   or we were unable to figure out if it is installed.
+ */
+protected abstract void checkRyaInstanceExists(final Map taskConfig) throws ConnectException;
+
+/**
+ * Creates an initialized {@link Sail} object that may be used to 
write {@link Statement}s to the configured
+ * Rya Instance.
+ *
+ * @param taskConfig - Configures how the Sail object will be created. 
(not null)
+ * @return The created Sail object.
+ * @throws ConnectException The Sail object could not be made.
+ */
+protected abstract Sail makeSail(final Map taskConfig) 
throws ConnectException;
+
+@Override
+public String version() {
+return Manifests.exists("Build-Version") ? 
Manifests.read("Build-Version"): "UNKNOWN";
+}
+
+@Override
+public void start(final Map props) throws 
ConnectException {
+requireNonNull(props);
+
+// Ensure the configured Rya Instance is installed within the 
configured database.
+checkRyaInstanceExists(props);
+
+// Create the Sail object that is connected to the Rya Instance.
+final Sail sail = makeSail(props);
+sailRepo = new SailRepository( sail );
+conn = sailRepo.getConnection();
+}
+
+@Override
+public void put(final Collection records) {
+requireNonNull(records);
+
+// Return immediately if there are no records to handle.
+

[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472095#comment-16472095
 ] 

ASF GitHub Bot commented on RYA-487:


Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187640119
  
--- Diff: 
dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRdfConfiguration.java
 ---
@@ -274,16 +274,17 @@ public boolean getUseAggregationPipeline() {
  * on their child subtrees.
  * @param value whether to use aggregation pipeline optimization.
  */
-public void setUseAggregationPipeline(boolean value) {
+public void setUseAggregationPipeline(final boolean value) {
 setBoolean(USE_AGGREGATION_PIPELINE, value);
 }
 
 @Override
 public List getOptimizers() {
-List optimizers = super.getOptimizers();
+final List optimizers = 
super.getOptimizers();
 if (getUseAggregationPipeline()) {
-Class cl = AggregationPipelineQueryOptimizer.class;
+final Class cl = AggregationPipelineQueryOptimizer.class;
 @SuppressWarnings("unchecked")
+final
--- End diff --

Put final on the same line as its variable


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472091#comment-16472091
 ] 

ASF GitHub Bot commented on RYA-487:


Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187633086
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsSerializer.java
 ---
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayOutputStream;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFWriter;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFWriterFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Serializer} that is able to serialize a set of {@link 
Statement}s
+ * using the RDF4J Rio Binary format.
+ */
+@DefaultAnnotation(NonNull.class)
+public class StatementsSerializer implements Serializer {
+private static final Logger log = 
LoggerFactory.getLogger(StatementsSerializer.class);
+
+private static final BinaryRDFWriterFactory WRITER_FACTORY = new 
BinaryRDFWriterFactory();
+
+@Override
+public void configure(final Map configs, final boolean 
isKey) {
+// Nothing to do.
+}
+
+@Override
+public byte[] serialize(final String topic, final Set data) 
{
+if(data == null) {
+// Returning null because that is the contract of this method.
+return null;
+}
+
+// Write the statements using a Binary RDF Writer.
+final ByteArrayOutputStream boas = new ByteArrayOutputStream();
--- End diff --

change variable name to baos


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472094#comment-16472094
 ] 

ASF GitHub Bot commented on RYA-487:


Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187632758
  
--- Diff: 
extras/kafka.connect/api/src/main/java/org/apache/rya/kafka/connect/api/StatementsDeserializer.java
 ---
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.api;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.eclipse.rdf4j.model.Statement;
+import org.eclipse.rdf4j.rio.RDFHandler;
+import org.eclipse.rdf4j.rio.RDFHandlerException;
+import org.eclipse.rdf4j.rio.RDFParseException;
+import org.eclipse.rdf4j.rio.RDFParser;
+import org.eclipse.rdf4j.rio.binary.BinaryRDFParserFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Deserializer} that is able to deserialize an RDF4J Rio 
Binary format serialized
+ * set of {@link Statement}s.
+ */
+@DefaultAnnotation(NonNull.class)
+public class StatementsDeserializer implements 
Deserializer {
+private static final Logger log = 
LoggerFactory.getLogger(StatementsDeserializer.class);
+
+private static final BinaryRDFParserFactory PARSER_FACTORY = new 
BinaryRDFParserFactory();
+
+@Override
+public void configure(final Map configs, final boolean 
isKey) {
+// Nothing to do.
+}
+
+@Override
+public Set deserialize(final String topic, final byte[] 
data) {
+if(data == null || data.length == 0) {
+// Return null because that is the contract of this method.
+return null;
+}
+
+try {
+final RDFParser parser = PARSER_FACTORY.getParser();
+final Set statements = new HashSet<>();
+
+parser.setRDFHandler(new RDFHandler() {
+@Override
+public void handleStatement(final Statement statement) 
throws RDFHandlerException {
+log.debug("Statement: " + statement);
+statements.add( statement );
+}
+
+@Override public void startRDF() throws 
RDFHandlerException { }
+@Override public void handleNamespace(final String arg0, 
final String arg1) throws RDFHandlerException { }
+@Override public void handleComment(final String arg0) 
throws RDFHandlerException { }
+@Override public void endRDF() throws RDFHandlerException 
{ }
+});
+
+parser.parse(new ByteArrayInputStream(data), null);
+return statements;
+
+} catch(final RDFParseException | RDFHandlerException | 
IOException e) {
+log.error("Could not deserialize a Set of VisibilityStatement 
objects using the RDF4J Rio Binray format.", e);
--- End diff --

typo. Binary


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472102#comment-16472102
 ] 

ASF GitHub Bot commented on RYA-487:


Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187636404
  
--- Diff: extras/kafka.connect/client/pom.xml ---
@@ -0,0 +1,135 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+
+org.apache.rya
+rya.kafka.connect.parent
+4.0.0-incubating-SNAPSHOT
+
+
+rya.kafka.connect.client
+
+Apache Rya Kafka Connect - Client
+Contains a client that may be used to load Statements 
into 
+ a Kafka topic to be read by Kafka Connect.
+
+
+
+
+org.apache.rya
+rya.sail
+
+
+org.apache.rya
+rya.kafka.connect.api
+
+
+
+
+org.eclipse.rdf4j
+rdf4j-model
+
+
+com.google.guava
+guava
+
+
+com.beust
+jcommander
+
+
+com.github.stephenc.findbugs
+findbugs-annotations
+
+
+org.apache.kafka
+kafka-clients
+
+
+
--- End diff --

I think pulling in rya.sail as a dependency will give you all the RDF 
Formats.  So, this probably isn't needed.


> Kafka Connect Rya Sink
> --
>
> Key: RYA-487
> URL: https://issues.apache.org/jira/browse/RYA-487
> Project: Rya
>  Issue Type: New Feature
>Affects Versions: 4.0.0
>Reporter: Kevin Chilton
>Assignee: Kevin Chilton
>Priority: Major
> Fix For: 4.0.0
>
>
> Implement a Kafka Connect Sink that writes to Rya.
> This sink does not have to handle visibilities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (RYA-487) Kafka Connect Rya Sink

2018-05-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/RYA-487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16472101#comment-16472101
 ] 

ASF GitHub Bot commented on RYA-487:


Github user ejwhite922 commented on a diff in the pull request:

https://github.com/apache/incubator-rya/pull/296#discussion_r187631961
  
--- Diff: 
extras/kafka.connect/accumulo/src/main/java/org/apache/rya/kafka/connect/accumulo/AccumuloRyaSinkTask.java
 ---
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.kafka.connect.accumulo;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.Map;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.client.RyaClient;
+import org.apache.rya.api.client.RyaClientException;
+import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
+import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.kafka.connect.api.sink.RyaSinkTask;
+import org.apache.rya.rdftriplestore.inference.InferenceEngineException;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.eclipse.rdf4j.sail.Sail;
+import org.eclipse.rdf4j.sail.SailException;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A {@link RyaSinkTask} that uses the Accumulo implementation of Rya to 
store data.
+ */
+@DefaultAnnotation(NonNull.class)
+public class AccumuloRyaSinkTask extends RyaSinkTask {
+
+@Override
+protected void checkRyaInstanceExists(final Map 
taskConfig) throws ConnectException {
+requireNonNull(taskConfig);
+
+// Parse the configuration object.
+final AccumuloRyaSinkConfig config = new 
AccumuloRyaSinkConfig(taskConfig);
+
+// Connect to the instance of Accumulo.
+final Connector connector;
+try {
+final Instance instance = new 
ZooKeeperInstance(config.getClusterName(), config.getZookeepers());
+connector = instance.getConnector(config.getUsername(), new 
PasswordToken( config.getPassword() ));
+} catch (AccumuloException | AccumuloSecurityException e) {
+throw new ConnectException("Could not create a Connector to 
the configured Accumulo instance.", e);
+}
+
+// Use a RyaClient to see if the configured instance exists.
+try {
+final AccumuloConnectionDetails connectionDetails = new 
AccumuloConnectionDetails(
+config.getUsername(),
+config.getPassword().toCharArray(),
+config.getClusterName(),
+config.getZookeepers());
+final RyaClient client = 
AccumuloRyaClientFactory.build(connectionDetails, connector);
+
+if(!client.getInstanceExists().exists( 
config.getRyaInstanceName() )) {
+throw new ConnectException("The Rya Instance named " +
+config.getRyaInstanceName() + " has not been 
installed.");
+}
+
+} catch (final RyaClientException e) {
+throw new ConnectException("Unable to determine if the Rya 
Instance named " +
+config.getRyaInstanceName() + " has been installed.", 
e);
+}
+}
+
+@Override
+protected Sail 

  1   2   >