This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 83a3377 Remove Aerospike Connector (#2456) 83a3377 is described below commit 83a3377e39cf17c2ebdf4c92f68eac26696356ee Author: Ali Ahmed <alahmed...@gmail.com> AuthorDate: Tue Aug 28 00:10:52 2018 -0700 Remove Aerospike Connector (#2456) --- distribution/io/src/assemble/io.xml | 6 - pom.xml | 1 - pulsar-io/aerospike/pom.xml | 75 --------- .../pulsar/io/aerospike/AerospikeAbstractSink.java | 169 --------------------- .../pulsar/io/aerospike/AerospikeSinkConfig.java | 64 -------- .../pulsar/io/aerospike/AerospikeStringSink.java | 35 ----- .../resources/META-INF/services/pulsar-io.yaml | 22 --- pulsar-io/pom.xml | 1 - site2/docs/deploy-bare-metal.md | 1 - site2/docs/getting-started-standalone.md | 1 - site2/docs/io-aerospike.md | 21 --- site2/docs/io-connectors.md | 1 - site2/docs/io-overview.md | 3 +- site2/docs/io-quickstart.md | 3 +- 14 files changed, 2 insertions(+), 401 deletions(-) diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml index f69d5ce..28f9ad6 100644 --- a/distribution/io/src/assemble/io.xml +++ b/distribution/io/src/assemble/io.xml @@ -74,11 +74,5 @@ <outputDirectory>connectors</outputDirectory> <fileMode>644</fileMode> </file> - - <file> - <source>${basedir}/../../pulsar-io/aerospike/target/pulsar-io-aerospike-${project.version}.nar</source> - <outputDirectory>connectors</outputDirectory> - <fileMode>644</fileMode> - </file> </files> </assembly> diff --git a/pom.xml b/pom.xml index 66d371e..f7a93f1 100644 --- a/pom.xml +++ b/pom.xml @@ -161,7 +161,6 @@ flexible messaging model and an intuitive client API.</description> <sketches.version>0.8.3</sketches.version> <hbc-core.version>2.2.0</hbc-core.version> <cassandra-driver-core.version>3.4.0</cassandra-driver-core.version> - <aerospike-client.version>4.1.5</aerospike-client.version> <kafka-client.version>0.10.2.1</kafka-client.version> <rabbitmq-client.version>5.1.1</rabbitmq-client.version> <aws-sdk.version>1.11.297</aws-sdk.version> diff --git a/pulsar-io/aerospike/pom.xml b/pulsar-io/aerospike/pom.xml deleted file mode 100644 index 5f678be..0000000 --- a/pulsar-io/aerospike/pom.xml +++ /dev/null @@ -1,75 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.pulsar</groupId> - <artifactId>pulsar-io</artifactId> - <version>2.2.0-incubating-SNAPSHOT</version> - </parent> - - <artifactId>pulsar-io-aerospike</artifactId> - <name>Pulsar IO :: Aerospike</name> - - <dependencies> - - <dependency> - <groupId>${project.groupId}</groupId> - <artifactId>pulsar-io-core</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - </dependency> - - <dependency> - <groupId>com.fasterxml.jackson.dataformat</groupId> - <artifactId>jackson-dataformat-yaml</artifactId> - </dependency> - - <dependency> - <groupId>com.aerospike</groupId> - <artifactId>aerospike-client</artifactId> - <version>${aerospike-client.version}</version> - <exclusions> - <exclusion> - <groupId>org.gnu</groupId> - <artifactId>gnu-crypto</artifactId> - </exclusion> - </exclusions> - </dependency> - - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-nar-maven-plugin</artifactId> - </plugin> - </plugins> - </build> - - -</project> diff --git a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java deleted file mode 100644 index fe3787a..0000000 --- a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java +++ /dev/null @@ -1,169 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.pulsar.io.aerospike; - -import com.aerospike.client.AerospikeClient; -import com.aerospike.client.AerospikeException; -import com.aerospike.client.Bin; -import com.aerospike.client.Host; -import com.aerospike.client.Key; -import com.aerospike.client.Value; -import com.aerospike.client.async.EventLoop; -import com.aerospike.client.async.EventPolicy; -import com.aerospike.client.async.NioEventLoops; -import com.aerospike.client.listener.WriteListener; -import com.aerospike.client.policy.ClientPolicy; -import com.aerospike.client.policy.WritePolicy; - -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingDeque; - -import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.io.core.KeyValue; -import org.apache.pulsar.io.core.Sink; -import org.apache.pulsar.io.core.SinkContext; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A Simple abstract class for Aerospike sink - * Users need to implement extractKeyValue function to use this sink - */ -public abstract class AerospikeAbstractSink<K, V> implements Sink<byte[]> { - - private static final Logger LOG = LoggerFactory.getLogger(AerospikeAbstractSink.class); - - // ----- Runtime fields - private AerospikeSinkConfig aerospikeSinkConfig; - private AerospikeClient client; - private WritePolicy writePolicy; - private BlockingQueue<AWriteListener> queue; - private NioEventLoops eventLoops; - private EventLoop eventLoop; - - @Override - public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception { - aerospikeSinkConfig = AerospikeSinkConfig.load(config); - if (aerospikeSinkConfig.getSeedHosts() == null - || aerospikeSinkConfig.getKeyspace() == null - || aerospikeSinkConfig.getColumnName() == null) { - throw new IllegalArgumentException("Required property not set."); - } - - writePolicy = new WritePolicy(); - writePolicy.maxRetries = aerospikeSinkConfig.getRetries(); - writePolicy.setTimeout(aerospikeSinkConfig.getTimeoutMs()); - createClient(); - queue = new LinkedBlockingDeque<>(aerospikeSinkConfig.getMaxConcurrentRequests()); - for (int i = 0; i < aerospikeSinkConfig.getMaxConcurrentRequests(); ++i) { - queue.put(new AWriteListener(queue)); - } - - eventLoops = new NioEventLoops(new EventPolicy(), 1); - eventLoop = eventLoops.next(); - } - - @Override - public void close() throws Exception { - if (client != null) { - client.close(); - } - - if (eventLoops != null) { - eventLoops.close(); - } - LOG.info("Connection Closed"); - } - - @Override - public void write(Record<byte[]> record) { - KeyValue<K, V> keyValue = extractKeyValue(record); - Key key = new Key(aerospikeSinkConfig.getKeyspace(), aerospikeSinkConfig.getKeySet(), keyValue.getKey().toString()); - Bin bin = new Bin(aerospikeSinkConfig.getColumnName(), Value.getAsBlob(keyValue.getValue())); - AWriteListener listener = null; - try { - listener = queue.take(); - } catch (InterruptedException ex) { - record.fail(); - return; - } - listener.setContext(record); - client.put(eventLoop, listener, writePolicy, key, bin); - } - - private void createClient() { - String[] hosts = aerospikeSinkConfig.getSeedHosts().split(","); - if (hosts.length <= 0) { - throw new RuntimeException("Invalid Seed Hosts"); - } - Host[] aeroSpikeHosts = new Host[hosts.length]; - for (int i = 0; i < hosts.length; ++i) { - String[] hostPort = hosts[i].split(":"); - aeroSpikeHosts[i] = new Host(hostPort[0], Integer.valueOf(hostPort[1])); - } - ClientPolicy policy = new ClientPolicy(); - if (aerospikeSinkConfig.getUserName() != null && !aerospikeSinkConfig.getUserName().isEmpty() - && aerospikeSinkConfig.getPassword() != null && !aerospikeSinkConfig.getPassword().isEmpty()) { - policy.user = aerospikeSinkConfig.getUserName(); - policy.password = aerospikeSinkConfig.getPassword(); - } - client = new AerospikeClient(policy, aeroSpikeHosts); - } - - private class AWriteListener implements WriteListener { - private Record<byte[]> context; - private BlockingQueue<AWriteListener> queue; - - public AWriteListener(BlockingQueue<AWriteListener> queue) { - this.queue = queue; - } - - public void setContext(Record<byte[]> record) { - this.context = record; - } - - @Override - public void onSuccess(Key key) { - if (context != null) { - context.ack(); - } - try { - queue.put(this); - } catch (InterruptedException ex) { - throw new RuntimeException("Interrupted while being added to the queue" ,ex); - } - } - - @Override - public void onFailure(AerospikeException e) { - if (context != null) { - context.fail(); - } - try { - queue.put(this); - } catch (InterruptedException ex) { - throw new RuntimeException("Interrupted while being added to the queue", ex); - } - } - } - - public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> message); -} \ No newline at end of file diff --git a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSinkConfig.java b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSinkConfig.java deleted file mode 100644 index 931d280..0000000 --- a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSinkConfig.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.pulsar.io.aerospike; - -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; -import lombok.*; -import lombok.experimental.Accessors; - -import java.io.File; -import java.io.IOException; -import java.io.Serializable; -import java.util.Map; - -@Data -@Setter -@Getter -@EqualsAndHashCode -@ToString -@Accessors(chain = true) -public class AerospikeSinkConfig implements Serializable { - - private static final long serialVersionUID = 1L; - - private String seedHosts; - private String keyspace; - private String columnName; - - // Optional - private String userName; - private String password; - private String keySet; - private int maxConcurrentRequests = 100; - private int timeoutMs = 100; - private int retries = 1; - - - public static AerospikeSinkConfig load(String yamlFile) throws IOException { - ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); - return mapper.readValue(new File(yamlFile), AerospikeSinkConfig.class); - } - - public static AerospikeSinkConfig load(Map<String, Object> map) throws IOException { - ObjectMapper mapper = new ObjectMapper(); - return mapper.readValue(new ObjectMapper().writeValueAsString(map), AerospikeSinkConfig.class); - } -} \ No newline at end of file diff --git a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java deleted file mode 100644 index bac07a0..0000000 --- a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.pulsar.io.aerospike; - -import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.io.core.KeyValue; - -/** - * Aerospike sink that treats incoming messages on the input topic as Strings - * and write identical key/value pairs. - */ -public class AerospikeStringSink extends AerospikeAbstractSink<String, String> { - @Override - public KeyValue<String, String> extractKeyValue(Record<byte[]> record) { - String key = record.getKey().orElseGet(() -> new String(record.getValue())); - return new KeyValue<>(key, new String(record.getValue())); - } -} \ No newline at end of file diff --git a/pulsar-io/aerospike/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/aerospike/src/main/resources/META-INF/services/pulsar-io.yaml deleted file mode 100644 index f2a7ab5..0000000 --- a/pulsar-io/aerospike/src/main/resources/META-INF/services/pulsar-io.yaml +++ /dev/null @@ -1,22 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -name: aerospike -description: Aerospike database sink -sinkClass: org.apache.pulsar.io.aerospike.AerospikeStringSink diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml index 4307e8f..67aec42 100644 --- a/pulsar-io/pom.xml +++ b/pulsar-io/pom.xml @@ -35,7 +35,6 @@ <module>core</module> <module>twitter</module> <module>cassandra</module> - <module>aerospike</module> <module>kafka</module> <module>rabbitmq</module> <module>kinesis</module> diff --git a/site2/docs/deploy-bare-metal.md b/site2/docs/deploy-bare-metal.md index b5793c6..b0590a0 100644 --- a/site2/docs/deploy-bare-metal.md +++ b/site2/docs/deploy-bare-metal.md @@ -125,7 +125,6 @@ $ tar xvfz apache-pulsar-io-connectors-{{pulsar:version}}-bin.tar.gz $ mv apache-pulsar-io-connectors-{{pulsar:version}}/connectors connectors $ ls connectors -pulsar-io-aerospike-{{pulsar:version}}.nar pulsar-io-cassandra-{{pulsar:version}}.nar pulsar-io-kafka-{{pulsar:version}}.nar pulsar-io-kinesis-{{pulsar:version}}.nar diff --git a/site2/docs/getting-started-standalone.md b/site2/docs/getting-started-standalone.md index 0dcf024..6207cff 100644 --- a/site2/docs/getting-started-standalone.md +++ b/site2/docs/getting-started-standalone.md @@ -87,7 +87,6 @@ $ tar xvfz /path/to/apache-pulsar-io-connectors-{{pulsar:version}}-bin.tar.gz $ cd apache-pulsar-io-connectors-{{pulsar:version}}/connectors connectors $ ls connectors -pulsar-io-aerospike-{{pulsar:version}}.nar pulsar-io-cassandra-{{pulsar:version}}.nar pulsar-io-kafka-{{pulsar:version}}.nar pulsar-io-kinesis-{{pulsar:version}}.nar diff --git a/site2/docs/io-aerospike.md b/site2/docs/io-aerospike.md deleted file mode 100644 index b23e2e3..0000000 --- a/site2/docs/io-aerospike.md +++ /dev/null @@ -1,21 +0,0 @@ ---- -id: io-aerospike -title: Aerospike Sink Connector -sidebar_label: Aerospike Sink Connector ---- - -The Aerospike Sink connector is used to write messages to an Aerospike Cluster. - -## Sink Configuration Options - -The following configuration options are specific to the Aerospike Connector: - -| Name | Required | Default | Description | -|------|----------|---------|-------------| -| `seedHosts` | `true` | `null` | Comma seperated list of one or more Aerospike cluster hosts; each host can be specified as a valid IP address or hostname followed by an optional port number (default is 3000). | -| `keyspace` | `true` | `null` | Aerospike namespace to use. | -| `keySet` | `false` | `null` | Aerospike set name to use. | -| `columnName` | `true` | `null` | Aerospike bin name to use. | -| `maxConcurrentRequests` | `false` | `100` | Maximum number of concurrent Aerospike transactions that a Sink can open. | -| `timeoutMs` | `false` | `100` | A single timeout value controls `socketTimeout` and `totalTimeout` for Aerospike transactions. | -| `retries` | `false` | `1` | Maximum number of retries before aborting a write transaction to Aerospike. | diff --git a/site2/docs/io-connectors.md b/site2/docs/io-connectors.md index 5a76998..6f7d4b3 100644 --- a/site2/docs/io-connectors.md +++ b/site2/docs/io-connectors.md @@ -9,7 +9,6 @@ These connectors import and export data from some of the most commonly used data as easy as writing a simple connector configuration and running the connector locally or submitting the connector to a Pulsar Functions cluster. -- [Aerospike Sink Connector](io-aerospike.md) - [Cassandra Sink Connector](io-cassandra.md) - [Kafka Sink Connector](io-kafka.md#sink) - [Kafka Source Connector](io-kafka.md#source) diff --git a/site2/docs/io-overview.md b/site2/docs/io-overview.md index 0c55716..be8792c 100644 --- a/site2/docs/io-overview.md +++ b/site2/docs/io-overview.md @@ -4,7 +4,7 @@ title: Pulsar IO Overview sidebar_label: Overview --- -Messaging systems are most powerful when you can easily use them in conjunction with external systems like databases and other messaging systems. **Pulsar IO** is a feature of Pulsar that enables you to easily create, deploy, and manage Pulsar **connectors** that interact with external systems, such as [Apache Cassandra](https://cassandra.apache.org), [Aerospike](https://www.aerospike.com), and many others. +Messaging systems are most powerful when you can easily use them in conjunction with external systems like databases and other messaging systems. **Pulsar IO** is a feature of Pulsar that enables you to easily create, deploy, and manage Pulsar **connectors** that interact with external systems, such as [Apache Cassandra](https://cassandra.apache.org), and many others. > #### Pulsar IO and Pulsar Functions > Under the hood, Pulsar IO connectors are specialized [Pulsar > Functions](functions-overview.md) purpose-built to interface with external > systems. The [administrative interface](io-quickstart.md) for Pulsar IO is, > in fact, quite similar to that of Pulsar Functions. @@ -30,7 +30,6 @@ The following connectors are currently available for Pulsar: |Name|Java Class|Documentation| |---|---|---| -|[Aerospike sink](https://www.aerospike.com/)|[`org.apache.pulsar.io.aerospike.AerospikeSink`](https://github.com/apache/incubator-pulsar/blob/master/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java)|[Documentation](io-aerospike.md)| |[Cassandra sink](https://cassandra.apache.org)|[`org.apache.pulsar.io.cassandra.CassandraSink`](https://github.com/apache/incubator-pulsar/blob/master/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java)|[Documentation](io-cassandra.md)| |[Kafka source](https://kafka.apache.org)|[`org.apache.pulsar.io.kafka.KafkaSource`](https://github.com/apache/incubator-pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java)|[Documentation](io-kafka.md#source)| |[Kafka sink](https://kafka.apache.org)|[`org.apache.pulsar.io.kafka.KafkaSink`](https://github.com/apache/incubator-pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java)|[Documentation](io-kafka.md#sink)| diff --git a/site2/docs/io-quickstart.md b/site2/docs/io-quickstart.md index 914429b..d426a1c 100644 --- a/site2/docs/io-quickstart.md +++ b/site2/docs/io-quickstart.md @@ -69,7 +69,6 @@ $ tar xvfz /path/to/apache-pulsar-io-connectors-{{pulsar:version}}-bin.tar.gz $ cp -r apache-pulsar-io-connectors-{{pulsar:version}}/connectors connectors $ ls connectors -pulsar-io-aerospike-{{pulsar:version}}.nar pulsar-io-cassandra-{{pulsar:version}}.nar pulsar-io-kafka-{{pulsar:version}}.nar pulsar-io-kinesis-{{pulsar:version}}.nar @@ -123,7 +122,7 @@ curl -s http://localhost:8080/admin/v2/functions/connectors Example output: ```json -[{"name":"aerospike","description":"Aerospike database sink","sinkClass":"org.apache.pulsar.io.aerospike.AerospikeStringSink"},{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaStringSink"},{"name":"kinesis","description":"Kinesis sink connect [...] +[{"name":"cassandra","description":"Writes data into Cassandra","sinkClass":"org.apache.pulsar.io.cassandra.CassandraStringSink"},{"name":"kafka","description":"Kafka source and sink connector","sourceClass":"org.apache.pulsar.io.kafka.KafkaStringSource","sinkClass":"org.apache.pulsar.io.kafka.KafkaStringSink"},{"name":"kinesis","description":"Kinesis sink connector","sinkClass":"org.apache.pulsar.io.kinesis.KinesisSink"},{"name":"rabbitmq","description":"RabbitMQ source connector","sour [...] ``` If an error occurred while starting Pulsar service, you may be able to seen exception at the terminal you are running `pulsar/standalone`,