This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 027b424 Pulsar Connect (#1520) 027b424 is described below commit 027b424fd914db306bc3a9223ca2b36030421ce8 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Mon Apr 9 17:08:06 2018 -0700 Pulsar Connect (#1520) * Added Pulsar Connect interfaces that define connectors that push data into pulsar and take data from pulsar * Added Twitter connector * Added hbc core version mapping * Addressed comments * Fixed build * Fixed license header --- pom.xml | 3 + pulsar-connect/core/pom.xml | 33 ++++ .../org/apache/pulsar/connect/core/PushSource.java | 52 +++++++ .../java/org/apache/pulsar/connect/core/Sink.java | 49 ++++++ pulsar-connect/pom.xml | 39 +++++ pulsar-connect/twitter/pom.xml | 61 ++++++++ .../pulsar/connect/twitter/TwitterFireHose.java | 167 +++++++++++++++++++++ .../connect/twitter/TwitterFireHoseConfig.java | 63 ++++++++ 8 files changed, 467 insertions(+) diff --git a/pom.xml b/pom.xml index 30b3d5e..2ea3f52 100644 --- a/pom.xml +++ b/pom.xml @@ -103,6 +103,8 @@ flexible messaging model and an intuitive client API.</description> <module>pulsar-log4j2-appender</module> <!-- functions related modules --> <module>pulsar-functions</module> + <!-- connector related modules --> + <module>pulsar-connect</module> </modules> <issueManagement> @@ -139,6 +141,7 @@ flexible messaging model and an intuitive client API.</description> <gson.version>2.8.2</gson.version> <sketches.version>0.8.3</sketches.version> <jctools.version>2.1.1</jctools.version> + <hbc-core.version>2.2.0</hbc-core.version> <!-- test dependencies --> <disruptor.version>3.4.0</disruptor.version> diff --git a/pulsar-connect/core/pom.xml b/pulsar-connect/core/pom.xml new file mode 100644 index 0000000..dea6d62 --- /dev/null +++ b/pulsar-connect/core/pom.xml @@ -0,0 +1,33 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-connect</artifactId> + <version>2.0.0-incubating-SNAPSHOT</version> + </parent> + + <artifactId>pulsar-connect-core</artifactId> + <name>Pulsar Connect :: Connect</name> + +</project> diff --git a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java new file mode 100644 index 0000000..65b006b --- /dev/null +++ b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.connect.core; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +/** + * Pulsar's Push Source interface. PushSource read data from + * external sources(database changes, twitter firehose, etc) + * and publish to a Pulsar topic. The reason its called Push is + * because PushSources get passed a consumption Function that they + * invoke whenever they have data to be published to Pulsar. + * The lifcycle of a PushSource is to open it passing any config needed + * by it to initialize(like open network connection, authenticate, etc). + * A consumer Function is then to it which is invoked by the source whenever + * there is data to be published. Once all data has been read, one can use close + * at the end of the session to do any cleanup + */ +public interface PushSource<T> extends AutoCloseable { + /** + * Open connector with configuration + * + * @param config initialization config + * @throws Exception IO type exceptions when opening a connector + */ + void open(final Map<String, String> config) throws Exception; + + /** + * Attach a consumer function to this Source. This is invoked by the implementation + * to pass messages whenever there is data to be pushed to Pulsar. + * @param consumer + */ + void setConsumer(Function<T, CompletableFuture<Void>> consumer); +} \ No newline at end of file diff --git a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java new file mode 100644 index 0000000..e22eb0f --- /dev/null +++ b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.connect.core; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +/** + * Pulsar's Sink interface. Sink read data from + * a Pulsar topic and write it to external sinks(kv store, database, filesystem ,etc) + * The lifcycle of a Sink is to open it passing any config needed + * by it to initialize(like open network connection, authenticate, etc). + * On every message from the designated PulsarTopic, the write method is + * invoked which writes the message to the external sink. One can use close + * at the end of the session to do any cleanup + */ +public interface Sink<T> extends AutoCloseable { + /** + * Open connector with configuration + * + * @param config initialization config + * @throws Exception IO type exceptions when opening a connector + */ + void open(final Map<String, String> config) throws Exception; + + /** + * Attempt to publish a type safe collection of messages + * + * @param message Object to publish to the sink + * @return Completable future fo async publish request + */ + CompletableFuture<Void> write(final T message); +} \ No newline at end of file diff --git a/pulsar-connect/pom.xml b/pulsar-connect/pom.xml new file mode 100644 index 0000000..bf1303f --- /dev/null +++ b/pulsar-connect/pom.xml @@ -0,0 +1,39 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <packaging>pom</packaging> + <parent> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar</artifactId> + <version>2.0.0-incubating-SNAPSHOT</version> + </parent> + + <artifactId>pulsar-connect</artifactId> + <name>Pulsar Connect :: Parent</name> + + <modules> + <module>core</module> + <module>twitter</module> + </modules> + +</project> diff --git a/pulsar-connect/twitter/pom.xml b/pulsar-connect/twitter/pom.xml new file mode 100644 index 0000000..ae92214 --- /dev/null +++ b/pulsar-connect/twitter/pom.xml @@ -0,0 +1,61 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-connect</artifactId> + <version>2.0.0-incubating-SNAPSHOT</version> + </parent> + + <artifactId>pulsar-connect-twitter</artifactId> + <name>Pulsar Connect :: Twitter</name> + + <dependencies> + + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-connect-core</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>${jackson.version}</version> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-yaml</artifactId> + <version>${jackson.version}</version> + </dependency> + + <dependency> + <groupId>com.twitter</groupId> + <artifactId>hbc-core</artifactId> + <version>${hbc-core.version}</version> + </dependency> + + </dependencies> + +</project> diff --git a/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHose.java b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHose.java new file mode 100644 index 0000000..9f6ebbe --- /dev/null +++ b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHose.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.connect.twitter; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +import org.apache.pulsar.connect.core.PushSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.twitter.hbc.core.endpoint.StatusesSampleEndpoint; +import com.twitter.hbc.core.endpoint.StreamingEndpoint; +import com.twitter.hbc.ClientBuilder; +import com.twitter.hbc.common.DelimitedStreamReader; +import com.twitter.hbc.core.Constants; +import com.twitter.hbc.core.processor.HosebirdMessageProcessor; +import com.twitter.hbc.httpclient.BasicClient; +import com.twitter.hbc.httpclient.auth.Authentication; +import com.twitter.hbc.httpclient.auth.OAuth1; + +/** + * Simple Push based Twitter FireHose Source + */ +public class TwitterFireHose implements PushSource<String> { + + private static final Logger LOG = LoggerFactory.getLogger(TwitterFireHose.class); + + // ----- Fields set by the constructor + + // ----- Runtime fields + private Object waitObject; + private Function<String, CompletableFuture<Void>> consumeFunction; + + @Override + public void open(Map<String, String> config) throws IOException { + TwitterFireHoseConfig hoseConfig = TwitterFireHoseConfig.load(config); + if (hoseConfig.getConsumerKey() == null + || hoseConfig.getConsumerSecret() == null + || hoseConfig.getToken() != null + || hoseConfig.getTokenSecret() == null) { + throw new IllegalArgumentException("Required property not set."); + } + waitObject = new Object(); + startThread(hoseConfig); + } + + @Override + public void setConsumer(Function<String, CompletableFuture<Void>> consumeFunction) { + this.consumeFunction = consumeFunction; + } + + @Override + public void close() throws Exception { + stopThread(); + } + + // ------ Custom endpoints + + /** + * Implementing this interface allows users of this source to set a custom endpoint. + */ + public interface EndpointInitializer { + StreamingEndpoint createEndpoint(); + } + + /** + * Required for Twitter Client + */ + private static class SampleStatusesEndpoint implements EndpointInitializer, Serializable { + @Override + public StreamingEndpoint createEndpoint() { + // this default endpoint initializer returns the sample endpoint: Returning a sample from the firehose (all tweets) + StatusesSampleEndpoint endpoint = new StatusesSampleEndpoint(); + endpoint.stallWarnings(false); + endpoint.delimited(false); + return endpoint; + } + } + + private void startThread(TwitterFireHoseConfig config) { + Authentication auth = new OAuth1(config.getConsumerKey(), + config.getConsumerSecret(), + config.getToken(), + config.getTokenSecret()); + + BasicClient client = new ClientBuilder() + .name(config.getClientName()) + .hosts(config.getClientHosts()) + .endpoint(new SampleStatusesEndpoint().createEndpoint()) + .authentication(auth) + .processor(new HosebirdMessageProcessor() { + public DelimitedStreamReader reader; + + @Override + public void setup(InputStream input) { + reader = new DelimitedStreamReader(input, Constants.DEFAULT_CHARSET, + config.getClientBufferSize()); + } + + @Override + public boolean process() throws IOException, InterruptedException { + String line = reader.readLine(); + try { + // We don't really care if the future succeeds or not. + // However might be in the future to count failures + // TODO:- Figure out the metrics story for connectors + consumeFunction.apply(line); + } catch (Exception e) { + LOG.error("Exception thrown"); + } + return true; + } + }) + .build(); + + Thread runnerThread = new Thread(() -> { + LOG.info("Started the Twitter FireHose Runner Thread"); + client.connect(); + LOG.info("Twitter Streaming API connection established successfully"); + + // just wait now + try { + synchronized (waitObject) { + waitObject.wait(); + } + } catch (Exception e) { + LOG.info("Got a exception in waitObject"); + } + LOG.debug("Closing Twitter Streaming API connection"); + client.stop(); + LOG.info("Twitter Streaming API connection closed"); + LOG.info("Twitter FireHose Runner Thread ending"); + }); + runnerThread.setName("TwitterFireHoseRunner"); + runnerThread.start(); + } + + private void stopThread() { + LOG.info("Source closed"); + synchronized (waitObject) { + waitObject.notify(); + } + } + +} diff --git a/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHoseConfig.java b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHoseConfig.java new file mode 100644 index 0000000..f0614bd --- /dev/null +++ b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHoseConfig.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.connect.twitter; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.Map; + +import com.twitter.hbc.core.Constants; +import lombok.*; +import lombok.experimental.Accessors; + +@Data +@Setter +@Getter +@EqualsAndHashCode +@ToString +@Accessors(chain = true) +public class TwitterFireHoseConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private String consumerKey; + private String consumerSecret; + private String token; + private String tokenSecret; + + // ------ Optional property keys + + private String clientName = "openconnector-twitter-source"; + private String clientHosts = Constants.STREAM_HOST; + private int clientBufferSize = 50000; + + public static TwitterFireHoseConfig load(String yamlFile) throws IOException { + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + return mapper.readValue(new File(yamlFile), TwitterFireHoseConfig.class); + } + + public static TwitterFireHoseConfig load(Map<String, String> map) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(new ObjectMapper().writeValueAsString(map), TwitterFireHoseConfig.class); + } +} \ No newline at end of file -- To stop receiving notification emails like this one, please contact mme...@apache.org.