This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 027b424  Pulsar Connect (#1520)
027b424 is described below

commit 027b424fd914db306bc3a9223ca2b36030421ce8
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Mon Apr 9 17:08:06 2018 -0700

    Pulsar Connect (#1520)
    
    * Added Pulsar Connect interfaces that define connectors that push data 
into pulsar and take data from pulsar
    
    * Added Twitter connector
    
    * Added hbc core version mapping
    
    * Addressed comments
    
    * Fixed build
    
    * Fixed license header
---
 pom.xml                                            |   3 +
 pulsar-connect/core/pom.xml                        |  33 ++++
 .../org/apache/pulsar/connect/core/PushSource.java |  52 +++++++
 .../java/org/apache/pulsar/connect/core/Sink.java  |  49 ++++++
 pulsar-connect/pom.xml                             |  39 +++++
 pulsar-connect/twitter/pom.xml                     |  61 ++++++++
 .../pulsar/connect/twitter/TwitterFireHose.java    | 167 +++++++++++++++++++++
 .../connect/twitter/TwitterFireHoseConfig.java     |  63 ++++++++
 8 files changed, 467 insertions(+)

diff --git a/pom.xml b/pom.xml
index 30b3d5e..2ea3f52 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,8 @@ flexible messaging model and an intuitive client 
API.</description>
     <module>pulsar-log4j2-appender</module>
     <!-- functions related modules -->
     <module>pulsar-functions</module>
+    <!-- connector related modules -->
+    <module>pulsar-connect</module>
   </modules>
 
   <issueManagement>
@@ -139,6 +141,7 @@ flexible messaging model and an intuitive client 
API.</description>
     <gson.version>2.8.2</gson.version>
     <sketches.version>0.8.3</sketches.version>
     <jctools.version>2.1.1</jctools.version>
+    <hbc-core.version>2.2.0</hbc-core.version>
 
     <!-- test dependencies -->
     <disruptor.version>3.4.0</disruptor.version>
diff --git a/pulsar-connect/core/pom.xml b/pulsar-connect/core/pom.xml
new file mode 100644
index 0000000..dea6d62
--- /dev/null
+++ b/pulsar-connect/core/pom.xml
@@ -0,0 +1,33 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-connect</artifactId>
+    <version>2.0.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-connect-core</artifactId>
+  <name>Pulsar Connect :: Connect</name>
+
+</project>
diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java
 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java
new file mode 100644
index 0000000..65b006b
--- /dev/null
+++ 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.connect.core;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+/**
+ * Pulsar's Push Source interface. PushSource read data from
+ * external sources(database changes, twitter firehose, etc)
+ * and publish to a Pulsar topic. The reason its called Push is
+ * because PushSources get passed a consumption Function that they
+ * invoke whenever they have data to be published to Pulsar.
+ * The lifcycle of a PushSource is to open it passing any config needed
+ * by it to initialize(like open network connection, authenticate, etc).
+ * A consumer Function is then to it which is invoked by the source whenever
+ * there is data to be published. Once all data has been read, one can use 
close
+ * at the end of the session to do any cleanup
+ */
+public interface PushSource<T> extends AutoCloseable {
+    /**
+     * Open connector with configuration
+     *
+     * @param config initialization config
+     * @throws Exception IO type exceptions when opening a connector
+     */
+    void open(final Map<String, String> config) throws Exception;
+
+    /**
+     * Attach a consumer function to this Source. This is invoked by the 
implementation
+     * to pass messages whenever there is data to be pushed to Pulsar.
+     * @param consumer
+     */
+    void setConsumer(Function<T, CompletableFuture<Void>> consumer);
+}
\ No newline at end of file
diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
new file mode 100644
index 0000000..e22eb0f
--- /dev/null
+++ b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.connect.core;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Pulsar's Sink interface. Sink read data from
+ * a Pulsar topic and write it to external sinks(kv store, database, 
filesystem ,etc)
+ * The lifcycle of a Sink is to open it passing any config needed
+ * by it to initialize(like open network connection, authenticate, etc).
+ * On every message from the designated PulsarTopic, the write method is
+ * invoked which writes the message to the external sink. One can use close
+ * at the end of the session to do any cleanup
+ */
+public interface Sink<T> extends AutoCloseable {
+    /**
+     * Open connector with configuration
+     *
+     * @param config initialization config
+     * @throws Exception IO type exceptions when opening a connector
+     */
+    void open(final Map<String, String> config) throws Exception;
+
+    /**
+     * Attempt to publish a type safe collection of messages
+     *
+     * @param message Object to publish to the sink
+     * @return Completable future fo async publish request
+     */
+    CompletableFuture<Void> write(final T message);
+}
\ No newline at end of file
diff --git a/pulsar-connect/pom.xml b/pulsar-connect/pom.xml
new file mode 100644
index 0000000..bf1303f
--- /dev/null
+++ b/pulsar-connect/pom.xml
@@ -0,0 +1,39 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <packaging>pom</packaging>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar</artifactId>
+    <version>2.0.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-connect</artifactId>
+  <name>Pulsar Connect :: Parent</name>
+
+  <modules>
+    <module>core</module>
+    <module>twitter</module>
+  </modules>
+
+</project>
diff --git a/pulsar-connect/twitter/pom.xml b/pulsar-connect/twitter/pom.xml
new file mode 100644
index 0000000..ae92214
--- /dev/null
+++ b/pulsar-connect/twitter/pom.xml
@@ -0,0 +1,61 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar-connect</artifactId>
+    <version>2.0.0-incubating-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-connect-twitter</artifactId>
+  <name>Pulsar Connect :: Twitter</name>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-connect-core</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.dataformat</groupId>
+      <artifactId>jackson-dataformat-yaml</artifactId>
+      <version>${jackson.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>hbc-core</artifactId>
+      <version>${hbc-core.version}</version>
+    </dependency>
+
+  </dependencies>
+
+</project>
diff --git 
a/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHose.java
 
b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHose.java
new file mode 100644
index 0000000..9f6ebbe
--- /dev/null
+++ 
b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHose.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.connect.twitter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import org.apache.pulsar.connect.core.PushSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint;
+import com.twitter.hbc.core.endpoint.StreamingEndpoint;
+import com.twitter.hbc.ClientBuilder;
+import com.twitter.hbc.common.DelimitedStreamReader;
+import com.twitter.hbc.core.Constants;
+import com.twitter.hbc.core.processor.HosebirdMessageProcessor;
+import com.twitter.hbc.httpclient.BasicClient;
+import com.twitter.hbc.httpclient.auth.Authentication;
+import com.twitter.hbc.httpclient.auth.OAuth1;
+
+/**
+ * Simple Push based Twitter FireHose Source
+ */
+public class TwitterFireHose implements PushSource<String> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TwitterFireHose.class);
+
+    // ----- Fields set by the constructor
+
+    // ----- Runtime fields
+    private Object waitObject;
+    private Function<String, CompletableFuture<Void>> consumeFunction;
+
+    @Override
+    public void open(Map<String, String> config) throws IOException {
+        TwitterFireHoseConfig hoseConfig = TwitterFireHoseConfig.load(config);
+        if (hoseConfig.getConsumerKey() == null
+                || hoseConfig.getConsumerSecret() == null
+                || hoseConfig.getToken() != null
+                || hoseConfig.getTokenSecret() == null) {
+            throw new IllegalArgumentException("Required property not set.");
+        }
+        waitObject = new Object();
+        startThread(hoseConfig);
+    }
+
+    @Override
+    public void setConsumer(Function<String, CompletableFuture<Void>> 
consumeFunction) {
+        this.consumeFunction = consumeFunction;
+    }
+
+    @Override
+    public void close() throws Exception {
+        stopThread();
+    }
+
+    // ------ Custom endpoints
+
+    /**
+     * Implementing this interface allows users of this source to set a custom 
endpoint.
+     */
+    public interface EndpointInitializer {
+        StreamingEndpoint createEndpoint();
+    }
+
+    /**
+     * Required for Twitter Client
+     */
+    private static class SampleStatusesEndpoint implements 
EndpointInitializer, Serializable {
+        @Override
+        public StreamingEndpoint createEndpoint() {
+            // this default endpoint initializer returns the sample endpoint: 
Returning a sample from the firehose (all tweets)
+            StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint();
+            endpoint.stallWarnings(false);
+            endpoint.delimited(false);
+            return endpoint;
+        }
+    }
+
+    private void startThread(TwitterFireHoseConfig config) {
+        Authentication auth = new OAuth1(config.getConsumerKey(),
+                config.getConsumerSecret(),
+                config.getToken(),
+                config.getTokenSecret());
+
+        BasicClient client = new ClientBuilder()
+                .name(config.getClientName())
+                .hosts(config.getClientHosts())
+                .endpoint(new SampleStatusesEndpoint().createEndpoint())
+                .authentication(auth)
+                .processor(new HosebirdMessageProcessor() {
+                    public DelimitedStreamReader reader;
+
+                    @Override
+                    public void setup(InputStream input) {
+                        reader = new DelimitedStreamReader(input, 
Constants.DEFAULT_CHARSET,
+                            config.getClientBufferSize());
+                    }
+
+                    @Override
+                    public boolean process() throws IOException, 
InterruptedException {
+                        String line = reader.readLine();
+                        try {
+                            // We don't really care if the future succeeds or 
not.
+                            // However might be in the future to count failures
+                            // TODO:- Figure out the metrics story for 
connectors
+                            consumeFunction.apply(line);
+                        } catch (Exception e) {
+                            LOG.error("Exception thrown");
+                        }
+                        return true;
+                    }
+                })
+                .build();
+
+        Thread runnerThread = new Thread(() -> {
+            LOG.info("Started the Twitter FireHose Runner Thread");
+            client.connect();
+            LOG.info("Twitter Streaming API connection established 
successfully");
+
+            // just wait now
+            try {
+                synchronized (waitObject) {
+                    waitObject.wait();
+                }
+            } catch (Exception e) {
+                LOG.info("Got a exception in waitObject");
+            }
+            LOG.debug("Closing Twitter Streaming API connection");
+            client.stop();
+            LOG.info("Twitter Streaming API connection closed");
+            LOG.info("Twitter FireHose Runner Thread ending");
+        });
+        runnerThread.setName("TwitterFireHoseRunner");
+        runnerThread.start();
+    }
+
+    private void stopThread() {
+        LOG.info("Source closed");
+        synchronized (waitObject) {
+            waitObject.notify();
+        }
+    }
+
+}
diff --git 
a/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHoseConfig.java
 
b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHoseConfig.java
new file mode 100644
index 0000000..f0614bd
--- /dev/null
+++ 
b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHoseConfig.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.connect.twitter;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+import com.twitter.hbc.core.Constants;
+import lombok.*;
+import lombok.experimental.Accessors;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class TwitterFireHoseConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private String consumerKey;
+    private String consumerSecret;
+    private String token;
+    private String tokenSecret;
+
+    // ------ Optional property keys
+
+    private String clientName = "openconnector-twitter-source";
+    private String clientHosts = Constants.STREAM_HOST;
+    private int clientBufferSize = 50000;
+
+    public static TwitterFireHoseConfig load(String yamlFile) throws 
IOException {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(yamlFile), 
TwitterFireHoseConfig.class);
+    }
+
+    public static TwitterFireHoseConfig load(Map<String, String> map) throws 
IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
TwitterFireHoseConfig.class);
+    }
+}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to