[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-04-06 Thread david-streamlio
Github user david-streamlio closed the pull request at:

https://github.com/apache/nifi/pull/2553


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread david-streamlio
Github user david-streamlio commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174898281
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+@Tags({ "Pulsar"})
+@CapabilityDescription("Standard ControllerService implementation of 
PulsarClientService.")
+public class StandardPulsarClientPool extends AbstractControllerService 
implements PulsarClientPool {
+
+public static final PropertyDescriptor PULSAR_SERVICE_URL = new 
PropertyDescriptor
+.Builder().name("PULSAR_SERVICE_URL")
+.displayName("Pulsar Service URL")
+.description("URL for the Pulsar cluster, e.g localhost:6650")
+.required(true)
+.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = 
new PropertyDescriptor.Builder()
+.name("Maximum concurrent lookup-requests")
+.description("Number of concurrent lookup-requests allowed on 
each broker-connection to prevent "
++ "overload on broker. (default: 5000) It should be 
configured with higher value only in case "
++ "of it requires to produce/subscribe on thousands of 
topics")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("5000")
+.build();
+
+public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new 
PropertyDescriptor.Builder()
+.name("Maximum connects per Pulsar broker")
+.description("Sets the max number of connection that the 
client library will open to a single broker.\n" +
+"By default, the connection pool will use a single 
connection for all the producers and consumers. " +
+"Increasing this parameter may improve throughput when 
using many producers over a high latency connection")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("1")
+.build();
+
+public static final PropertyDescriptor IO_THREADS = new 
PropertyDescriptor.Builder()
+.name("I/O Threads")
+.description("The number of threads to be used for handling 
connections to brokers (default: 1 thread)")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174883410
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java
 ---
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarConsumer;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
+
+static final AllowableValue EXCLUSIVE = new 
AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the 
same topic with the same subscription name");
+static final AllowableValue SHARED = new AllowableValue("Shared", 
"Shared", "Multiple consumer will be able to use the same subscription name and 
the messages");
+static final AllowableValue FAILOVER = new AllowableValue("Failover", 
"Failover", "Multiple consumer will be able to use the same subscription name 
but only 1 consumer "
++ "will receive the messages. If that consumer disconnects, 
one of the other connected consumers will start receiving messages");
+
+protected static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
+.name("topic")
+.displayName("Topic Name")
+.description("The name of the Pulsar Topic.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+static final PropertyDescriptor SUBSCRIPTION = new 
PropertyDescriptor.Builder()
+.name("Subscription")
+.displayName("Subscription Name")
+.description("The name of the Pulsar subscription to consume 
from.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final 

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174881995
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java
 ---
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarConsumer;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
+
+static final AllowableValue EXCLUSIVE = new 
AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the 
same topic with the same subscription name");
+static final AllowableValue SHARED = new AllowableValue("Shared", 
"Shared", "Multiple consumer will be able to use the same subscription name and 
the messages");
+static final AllowableValue FAILOVER = new AllowableValue("Failover", 
"Failover", "Multiple consumer will be able to use the same subscription name 
but only 1 consumer "
++ "will receive the messages. If that consumer disconnects, 
one of the other connected consumers will start receiving messages");
+
+protected static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
+.name("topic")
+.displayName("Topic Name")
+.description("The name of the Pulsar Topic.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+static final PropertyDescriptor SUBSCRIPTION = new 
PropertyDescriptor.Builder()
+.name("Subscription")
+.displayName("Subscription Name")
+.description("The name of the Pulsar subscription to consume 
from.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final 

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174874403
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/pool/ResourcePoolImpl.java
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar.pool;
+
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Vector;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+public class ResourcePoolImpl implements 
ResourcePool {
+
+private final Lock lock = new ReentrantLock();
+private final Condition poolAvailable = lock.newCondition();
+private int max_resources;
+private final Vector pool;
+
+private final ResourceExceptionHandler resourceExceptionHandler;
+private final ResourceFactory factory;
+
+public ResourcePoolImpl(ResourceFactory factory, int max_resources) 
{
+this(factory, new ResourceExceptionHandlerImpl(), 
max_resources);
+}
+
+public ResourcePoolImpl(ResourceFactory factory, 
ResourceExceptionHandler handler, int max_resources) {
+lock.lock();
+try {
+this.factory = factory;
+this.resourceExceptionHandler = handler;
+this.max_resources = max_resources;
+this.pool = new Vector(max_resources);
+} finally {
+lock.unlock();
+}
+}
+
+private R createResource(Properties props) {
+R resource = null;
+try {
+
+resource = factory.create(props);
+
+if (resource == null)
--- End diff --

Curly brackets.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174881541
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java
 ---
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarConsumer;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
+
+static final AllowableValue EXCLUSIVE = new 
AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the 
same topic with the same subscription name");
+static final AllowableValue SHARED = new AllowableValue("Shared", 
"Shared", "Multiple consumer will be able to use the same subscription name and 
the messages");
+static final AllowableValue FAILOVER = new AllowableValue("Failover", 
"Failover", "Multiple consumer will be able to use the same subscription name 
but only 1 consumer "
++ "will receive the messages. If that consumer disconnects, 
one of the other connected consumers will start receiving messages");
+
+protected static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
+.name("topic")
+.displayName("Topic Name")
+.description("The name of the Pulsar Topic.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+static final PropertyDescriptor SUBSCRIPTION = new 
PropertyDescriptor.Builder()
+.name("Subscription")
+.displayName("Subscription Name")
+.description("The name of the Pulsar subscription to consume 
from.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final 

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174881132
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java
 ---
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarConsumer;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
+
+static final AllowableValue EXCLUSIVE = new 
AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the 
same topic with the same subscription name");
+static final AllowableValue SHARED = new AllowableValue("Shared", 
"Shared", "Multiple consumer will be able to use the same subscription name and 
the messages");
+static final AllowableValue FAILOVER = new AllowableValue("Failover", 
"Failover", "Multiple consumer will be able to use the same subscription name 
but only 1 consumer "
++ "will receive the messages. If that consumer disconnects, 
one of the other connected consumers will start receiving messages");
+
+protected static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
+.name("topic")
+.displayName("Topic Name")
+.description("The name of the Pulsar Topic.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+static final PropertyDescriptor SUBSCRIPTION = new 
PropertyDescriptor.Builder()
+.name("Subscription")
+.displayName("Subscription Name")
+.description("The name of the Pulsar subscription to consume 
from.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final 

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174873036
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+@Tags({ "Pulsar"})
+@CapabilityDescription("Standard ControllerService implementation of 
PulsarClientService.")
+public class StandardPulsarClientPool extends AbstractControllerService 
implements PulsarClientPool {
+
+public static final PropertyDescriptor PULSAR_SERVICE_URL = new 
PropertyDescriptor
+.Builder().name("PULSAR_SERVICE_URL")
+.displayName("Pulsar Service URL")
+.description("URL for the Pulsar cluster, e.g localhost:6650")
+.required(true)
+.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = 
new PropertyDescriptor.Builder()
+.name("Maximum concurrent lookup-requests")
+.description("Number of concurrent lookup-requests allowed on 
each broker-connection to prevent "
++ "overload on broker. (default: 5000) It should be 
configured with higher value only in case "
++ "of it requires to produce/subscribe on thousands of 
topics")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("5000")
+.build();
+
+public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new 
PropertyDescriptor.Builder()
+.name("Maximum connects per Pulsar broker")
+.description("Sets the max number of connection that the 
client library will open to a single broker.\n" +
+"By default, the connection pool will use a single 
connection for all the producers and consumers. " +
+"Increasing this parameter may improve throughput when 
using many producers over a high latency connection")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("1")
+.build();
+
+public static final PropertyDescriptor IO_THREADS = new 
PropertyDescriptor.Builder()
+.name("I/O Threads")
+.description("The number of threads to be used for handling 
connections to brokers (default: 1 thread)")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174880893
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java
 ---
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarConsumer;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
+
+static final AllowableValue EXCLUSIVE = new 
AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the 
same topic with the same subscription name");
+static final AllowableValue SHARED = new AllowableValue("Shared", 
"Shared", "Multiple consumer will be able to use the same subscription name and 
the messages");
+static final AllowableValue FAILOVER = new AllowableValue("Failover", 
"Failover", "Multiple consumer will be able to use the same subscription name 
but only 1 consumer "
++ "will receive the messages. If that consumer disconnects, 
one of the other connected consumers will start receiving messages");
+
+protected static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
+.name("topic")
+.displayName("Topic Name")
+.description("The name of the Pulsar Topic.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+static final PropertyDescriptor SUBSCRIPTION = new 
PropertyDescriptor.Builder()
+.name("Subscription")
+.displayName("Subscription Name")
+.description("The name of the Pulsar subscription to consume 
from.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final 

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174872527
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+@Tags({ "Pulsar"})
+@CapabilityDescription("Standard ControllerService implementation of 
PulsarClientService.")
+public class StandardPulsarClientPool extends AbstractControllerService 
implements PulsarClientPool {
+
+public static final PropertyDescriptor PULSAR_SERVICE_URL = new 
PropertyDescriptor
+.Builder().name("PULSAR_SERVICE_URL")
+.displayName("Pulsar Service URL")
+.description("URL for the Pulsar cluster, e.g localhost:6650")
+.required(true)
+.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = 
new PropertyDescriptor.Builder()
+.name("Maximum concurrent lookup-requests")
+.description("Number of concurrent lookup-requests allowed on 
each broker-connection to prevent "
++ "overload on broker. (default: 5000) It should be 
configured with higher value only in case "
++ "of it requires to produce/subscribe on thousands of 
topics")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("5000")
+.build();
+
+public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new 
PropertyDescriptor.Builder()
+.name("Maximum connects per Pulsar broker")
+.description("Sets the max number of connection that the 
client library will open to a single broker.\n" +
+"By default, the connection pool will use a single 
connection for all the producers and consumers. " +
+"Increasing this parameter may improve throughput when 
using many producers over a high latency connection")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("1")
+.build();
+
+public static final PropertyDescriptor IO_THREADS = new 
PropertyDescriptor.Builder()
+.name("I/O Threads")
+.description("The number of threads to be used for handling 
connections to brokers (default: 1 thread)")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174878320
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/test/java/org/apache/nifi/pulsar/TestStandardPulsarClientService.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestStandardPulsarClientService {
+
+@Before
+public void init() {
--- End diff --

Looks like you can delete this.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174882988
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java
 ---
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarConsumer;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
+
+static final AllowableValue EXCLUSIVE = new 
AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the 
same topic with the same subscription name");
+static final AllowableValue SHARED = new AllowableValue("Shared", 
"Shared", "Multiple consumer will be able to use the same subscription name and 
the messages");
+static final AllowableValue FAILOVER = new AllowableValue("Failover", 
"Failover", "Multiple consumer will be able to use the same subscription name 
but only 1 consumer "
++ "will receive the messages. If that consumer disconnects, 
one of the other connected consumers will start receiving messages");
+
+protected static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
+.name("topic")
+.displayName("Topic Name")
+.description("The name of the Pulsar Topic.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+static final PropertyDescriptor SUBSCRIPTION = new 
PropertyDescriptor.Builder()
+.name("Subscription")
+.displayName("Subscription Name")
+.description("The name of the Pulsar subscription to consume 
from.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final 

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174884957
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/PublishPulsar_1_0.java
 ---
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarProducer;
+import org.apache.nifi.pulsar.cache.LRUCache;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import 
org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"})
+@CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Pulsar using the Pulsar 1.21 Producer API."
++ "The messages to send may be individual FlowFiles or may be 
delimited, using a "
++ "user-specified delimiter, such as a new-line. "
++ "The complementary NiFi processor for fetching messages is 
ConsumePulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Pulsar for this FlowFile. This attribute is added 
only to "
++ "FlowFiles that are routed to success.")
+public class PublishPulsar_1_0 extends AbstractPulsarProcessor {
+
+protected static final String MSG_COUNT = "msg.count";
+
+static final AllowableValue COMPRESSION_TYPE_NONE = new 
AllowableValue("NONE", "None", "No compression");
+static final AllowableValue COMPRESSION_TYPE_LZ4 = new 
AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm.");
+static final AllowableValue COMPRESSION_TYPE_ZLIB = new 
AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm");
+
+static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = 
new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a 
custom partition");
+static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION 
= new AllowableValue("RoundRobinPartition", "Round Robin Partition", "Route 
messages to all "
+   
+ "partitions in a round robin 
manner");
+static final AllowableValue 

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174883778
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/PublishPulsar_1_0.java
 ---
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarProducer;
+import org.apache.nifi.pulsar.cache.LRUCache;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import 
org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"})
+@CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Pulsar using the Pulsar 1.21 Producer API."
++ "The messages to send may be individual FlowFiles or may be 
delimited, using a "
++ "user-specified delimiter, such as a new-line. "
++ "The complementary NiFi processor for fetching messages is 
ConsumePulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Pulsar for this FlowFile. This attribute is added 
only to "
++ "FlowFiles that are routed to success.")
+public class PublishPulsar_1_0 extends AbstractPulsarProcessor {
--- End diff --

Consider changing it to `PublishPulsar_1_X`.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174884531
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/PublishPulsar_1_0.java
 ---
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarProducer;
+import org.apache.nifi.pulsar.cache.LRUCache;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import 
org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"})
+@CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Pulsar using the Pulsar 1.21 Producer API."
++ "The messages to send may be individual FlowFiles or may be 
delimited, using a "
++ "user-specified delimiter, such as a new-line. "
++ "The complementary NiFi processor for fetching messages is 
ConsumePulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Pulsar for this FlowFile. This attribute is added 
only to "
++ "FlowFiles that are routed to success.")
+public class PublishPulsar_1_0 extends AbstractPulsarProcessor {
+
+protected static final String MSG_COUNT = "msg.count";
+
+static final AllowableValue COMPRESSION_TYPE_NONE = new 
AllowableValue("NONE", "None", "No compression");
+static final AllowableValue COMPRESSION_TYPE_LZ4 = new 
AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm.");
+static final AllowableValue COMPRESSION_TYPE_ZLIB = new 
AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm");
+
+static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = 
new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a 
custom partition");
+static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION 
= new AllowableValue("RoundRobinPartition", "Round Robin Partition", "Route 
messages to all "
+   
+ "partitions in a round robin 
manner");
+static final AllowableValue 

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174879831
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java
 ---
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarConsumer;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
--- End diff --

Also note that as it is an incubator project and someone really doesn't 
want to track 1.X going forward AND complains that you broke compatibility for 
them by staying up to date with 1.X, that's on them. Incubator projects are by 
definition moving targets and should be handled that way during risk assessment 
by teams using them.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174872630
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+@Tags({ "Pulsar"})
+@CapabilityDescription("Standard ControllerService implementation of 
PulsarClientService.")
+public class StandardPulsarClientPool extends AbstractControllerService 
implements PulsarClientPool {
+
+public static final PropertyDescriptor PULSAR_SERVICE_URL = new 
PropertyDescriptor
+.Builder().name("PULSAR_SERVICE_URL")
+.displayName("Pulsar Service URL")
+.description("URL for the Pulsar cluster, e.g localhost:6650")
+.required(true)
+.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = 
new PropertyDescriptor.Builder()
+.name("Maximum concurrent lookup-requests")
+.description("Number of concurrent lookup-requests allowed on 
each broker-connection to prevent "
++ "overload on broker. (default: 5000) It should be 
configured with higher value only in case "
++ "of it requires to produce/subscribe on thousands of 
topics")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("5000")
+.build();
+
+public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new 
PropertyDescriptor.Builder()
+.name("Maximum connects per Pulsar broker")
+.description("Sets the max number of connection that the 
client library will open to a single broker.\n" +
+"By default, the connection pool will use a single 
connection for all the producers and consumers. " +
+"Increasing this parameter may improve throughput when 
using many producers over a high latency connection")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("1")
+.build();
+
+public static final PropertyDescriptor IO_THREADS = new 
PropertyDescriptor.Builder()
+.name("I/O Threads")
+.description("The number of threads to be used for handling 
connections to brokers (default: 1 thread)")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174874448
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/pool/ResourcePoolImpl.java
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar.pool;
+
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Vector;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+public class ResourcePoolImpl implements 
ResourcePool {
+
+private final Lock lock = new ReentrantLock();
+private final Condition poolAvailable = lock.newCondition();
+private int max_resources;
+private final Vector pool;
+
+private final ResourceExceptionHandler resourceExceptionHandler;
+private final ResourceFactory factory;
+
+public ResourcePoolImpl(ResourceFactory factory, int max_resources) 
{
+this(factory, new ResourceExceptionHandlerImpl(), 
max_resources);
+}
+
+public ResourcePoolImpl(ResourceFactory factory, 
ResourceExceptionHandler handler, int max_resources) {
+lock.lock();
+try {
+this.factory = factory;
+this.resourceExceptionHandler = handler;
+this.max_resources = max_resources;
+this.pool = new Vector(max_resources);
+} finally {
+lock.unlock();
+}
+}
+
+private R createResource(Properties props) {
+R resource = null;
+try {
+
+resource = factory.create(props);
--- End diff --

There's some extraneous white space around this.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174884100
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/PublishPulsar_1_0.java
 ---
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarProducer;
+import org.apache.nifi.pulsar.cache.LRUCache;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import 
org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"})
+@CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Pulsar using the Pulsar 1.21 Producer API."
++ "The messages to send may be individual FlowFiles or may be 
delimited, using a "
++ "user-specified delimiter, such as a new-line. "
++ "The complementary NiFi processor for fetching messages is 
ConsumePulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Pulsar for this FlowFile. This attribute is added 
only to "
++ "FlowFiles that are routed to success.")
+public class PublishPulsar_1_0 extends AbstractPulsarProcessor {
+
+protected static final String MSG_COUNT = "msg.count";
+
+static final AllowableValue COMPRESSION_TYPE_NONE = new 
AllowableValue("NONE", "None", "No compression");
+static final AllowableValue COMPRESSION_TYPE_LZ4 = new 
AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm.");
+static final AllowableValue COMPRESSION_TYPE_ZLIB = new 
AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm");
+
+static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = 
new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a 
custom partition");
+static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION 
= new AllowableValue("RoundRobinPartition", "Round Robin Partition", "Route 
messages to all "
+   
+ "partitions in a round robin 
manner");
+static final AllowableValue 

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174878122
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/pool/ResourcePoolImpl.java
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar.pool;
+
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.Vector;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+
+public class ResourcePoolImpl implements 
ResourcePool {
+
+private final Lock lock = new ReentrantLock();
+private final Condition poolAvailable = lock.newCondition();
+private int max_resources;
+private final Vector pool;
+
+private final ResourceExceptionHandler resourceExceptionHandler;
+private final ResourceFactory factory;
+
+public ResourcePoolImpl(ResourceFactory factory, int max_resources) 
{
+this(factory, new ResourceExceptionHandlerImpl(), 
max_resources);
+}
+
+public ResourcePoolImpl(ResourceFactory factory, 
ResourceExceptionHandler handler, int max_resources) {
+lock.lock();
+try {
+this.factory = factory;
+this.resourceExceptionHandler = handler;
+this.max_resources = max_resources;
+this.pool = new Vector(max_resources);
+} finally {
+lock.unlock();
+}
+}
+
+private R createResource(Properties props) {
+R resource = null;
+try {
+
+resource = factory.create(props);
+
+if (resource == null)
+throw new ResourceCreationException("Unable to create 
resource");
+
+} catch (Exception e) {
+resourceExceptionHandler.handle(e);
+}
+return resource;
+}
+
+
+/*
+ * Shutdown the pool and release the resources
+ */
+public void close() {
+
+Iterator itr = pool.iterator();
+while (itr.hasNext()) {
+itr.next().close();
+}
+
+}
+
+public boolean isEmpty() {
+return (pool.isEmpty());
+}
+
+public boolean isFull() {
+return (pool != null && pool.size() == max_resources);
+}
+
+@Override
+public R acquire(Properties props) throws InterruptedException {
+lock.lock();
+try {
+while (max_resources <= 0) {
+poolAvailable.await();
+}
+
+--max_resources;
+
+if (pool != null) {
+int size = pool.size();
+if (size > 0)
+return pool.remove(size - 1);
+}
+return createResource(props);
+} finally {
+lock.unlock();
+}
+}
+
+@Override
+public void evict(R resource) {
+lock.lock();
+try {
+
+// Attempt to close the connection
+if (!resource.isClosed())
--- End diff --

Curly brackets.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174879028
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/pom.xml ---
@@ -0,0 +1,78 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+
+org.apache.nifi
+nifi-pulsar-bundle
+1.6.0-SNAPSHOT
+
+
+nifi-pulsar-processors
+jar
+
+
+
+org.apache.nifi
+nifi-api
+
+
+org.apache.nifi
+nifi-record-serialization-service-api
+
+
+org.apache.nifi
+nifi-record
+
+
+org.apache.nifi
+nifi-utils
+1.6.0-SNAPSHOT
+
+ 
+org.apache.nifi
+nifi-ssl-context-service-api
+
+
+org.apache.nifi
+nifi-pulsar-client-service-api
+1.6.0-SNAPSHOT
+provided
+   
+
+   org.apache.pulsar
--- End diff --

Broken indentation level.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174872872
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+@Tags({ "Pulsar"})
+@CapabilityDescription("Standard ControllerService implementation of 
PulsarClientService.")
+public class StandardPulsarClientPool extends AbstractControllerService 
implements PulsarClientPool {
+
+public static final PropertyDescriptor PULSAR_SERVICE_URL = new 
PropertyDescriptor
+.Builder().name("PULSAR_SERVICE_URL")
+.displayName("Pulsar Service URL")
+.description("URL for the Pulsar cluster, e.g localhost:6650")
+.required(true)
+.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = 
new PropertyDescriptor.Builder()
+.name("Maximum concurrent lookup-requests")
+.description("Number of concurrent lookup-requests allowed on 
each broker-connection to prevent "
++ "overload on broker. (default: 5000) It should be 
configured with higher value only in case "
++ "of it requires to produce/subscribe on thousands of 
topics")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("5000")
+.build();
+
+public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new 
PropertyDescriptor.Builder()
+.name("Maximum connects per Pulsar broker")
+.description("Sets the max number of connection that the 
client library will open to a single broker.\n" +
+"By default, the connection pool will use a single 
connection for all the producers and consumers. " +
+"Increasing this parameter may improve throughput when 
using many producers over a high latency connection")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("1")
+.build();
+
+public static final PropertyDescriptor IO_THREADS = new 
PropertyDescriptor.Builder()
+.name("I/O Threads")
+.description("The number of threads to be used for handling 
connections to brokers (default: 1 thread)")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174879478
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java
 ---
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarConsumer;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
--- End diff --

You should consider changing this to `ConsumePulsar_1_X` to warn users that 
you may be moving the internal client compatibility forward if let's say 1.4 
breaks compatibility with the current 1.2 branch in the incubator.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174866298
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/PoolableResource.java
 ---
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar.pool;
+
+public interface PoolableResource {
+
+public void close();
--- End diff --

Javadoc.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174882429
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java
 ---
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarConsumer;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
+
+static final AllowableValue EXCLUSIVE = new 
AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the 
same topic with the same subscription name");
+static final AllowableValue SHARED = new AllowableValue("Shared", 
"Shared", "Multiple consumer will be able to use the same subscription name and 
the messages");
+static final AllowableValue FAILOVER = new AllowableValue("Failover", 
"Failover", "Multiple consumer will be able to use the same subscription name 
but only 1 consumer "
++ "will receive the messages. If that consumer disconnects, 
one of the other connected consumers will start receiving messages");
+
+protected static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
+.name("topic")
+.displayName("Topic Name")
+.description("The name of the Pulsar Topic.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+static final PropertyDescriptor SUBSCRIPTION = new 
PropertyDescriptor.Builder()
+.name("Subscription")
+.displayName("Subscription Name")
+.description("The name of the Pulsar subscription to consume 
from.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final 

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174882522
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java
 ---
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarConsumer;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_0 extends AbstractPulsarProcessor {
+
+static final AllowableValue EXCLUSIVE = new 
AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the 
same topic with the same subscription name");
+static final AllowableValue SHARED = new AllowableValue("Shared", 
"Shared", "Multiple consumer will be able to use the same subscription name and 
the messages");
+static final AllowableValue FAILOVER = new AllowableValue("Failover", 
"Failover", "Multiple consumer will be able to use the same subscription name 
but only 1 consumer "
++ "will receive the messages. If that consumer disconnects, 
one of the other connected consumers will start receiving messages");
+
+protected static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
+.name("topic")
+.displayName("Topic Name")
+.description("The name of the Pulsar Topic.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+static final PropertyDescriptor SUBSCRIPTION = new 
PropertyDescriptor.Builder()
+.name("Subscription")
+.displayName("Subscription Name")
+.description("The name of the Pulsar subscription to consume 
from.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+.build();
+
+public static final 

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174884863
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/PublishPulsar_1_0.java
 ---
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarProducer;
+import org.apache.nifi.pulsar.cache.LRUCache;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import 
org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"})
+@CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Pulsar using the Pulsar 1.21 Producer API."
++ "The messages to send may be individual FlowFiles or may be 
delimited, using a "
++ "user-specified delimiter, such as a new-line. "
++ "The complementary NiFi processor for fetching messages is 
ConsumePulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Pulsar for this FlowFile. This attribute is added 
only to "
++ "FlowFiles that are routed to success.")
+public class PublishPulsar_1_0 extends AbstractPulsarProcessor {
+
+protected static final String MSG_COUNT = "msg.count";
+
+static final AllowableValue COMPRESSION_TYPE_NONE = new 
AllowableValue("NONE", "None", "No compression");
+static final AllowableValue COMPRESSION_TYPE_LZ4 = new 
AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm.");
+static final AllowableValue COMPRESSION_TYPE_ZLIB = new 
AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm");
+
+static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = 
new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a 
custom partition");
+static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION 
= new AllowableValue("RoundRobinPartition", "Round Robin Partition", "Route 
messages to all "
+   
+ "partitions in a round robin 
manner");
+static final AllowableValue 

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174868443
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/ResourceFactory.java
 ---
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar.pool;
+
+import java.util.Properties;
+
+public interface ResourceFactory {
+
+public R create(Properties props) throws ResourceCreationException;
--- End diff --

Javadoc


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174874129
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+@Tags({ "Pulsar"})
+@CapabilityDescription("Standard ControllerService implementation of 
PulsarClientService.")
+public class StandardPulsarClientPool extends AbstractControllerService 
implements PulsarClientPool {
+
+public static final PropertyDescriptor PULSAR_SERVICE_URL = new 
PropertyDescriptor
+.Builder().name("PULSAR_SERVICE_URL")
+.displayName("Pulsar Service URL")
+.description("URL for the Pulsar cluster, e.g localhost:6650")
+.required(true)
+.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = 
new PropertyDescriptor.Builder()
+.name("Maximum concurrent lookup-requests")
+.description("Number of concurrent lookup-requests allowed on 
each broker-connection to prevent "
++ "overload on broker. (default: 5000) It should be 
configured with higher value only in case "
++ "of it requires to produce/subscribe on thousands of 
topics")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("5000")
+.build();
+
+public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new 
PropertyDescriptor.Builder()
+.name("Maximum connects per Pulsar broker")
+.description("Sets the max number of connection that the 
client library will open to a single broker.\n" +
+"By default, the connection pool will use a single 
connection for all the producers and consumers. " +
+"Increasing this parameter may improve throughput when 
using many producers over a high latency connection")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("1")
+.build();
+
+public static final PropertyDescriptor IO_THREADS = new 
PropertyDescriptor.Builder()
+.name("I/O Threads")
+.description("The number of threads to be used for handling 
connections to brokers (default: 1 thread)")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+

[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174878824
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/test/java/org/apache/nifi/pulsar/TestStandardPulsarClientService.java
 ---
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestStandardPulsarClientService {
+
+@Before
+public void init() {
+
+}
+
+@Test
+public void testService() throws InitializationException {
+final TestRunner runner = 
TestRunners.newTestRunner(TestProcessor.class);
+final PulsarClientPool service = new StandardPulsarClientPool();
+runner.addControllerService("test-good", service);
+
+runner.setProperty(service, 
StandardPulsarClientPool.PULSAR_SERVICE_URL, "localhost:6667");
+// runner.enableControllerService(service);
--- End diff --

I think you might actually needs this. If that's not the case, it should be 
removed.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174865149
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/pom.xml ---
@@ -0,0 +1,40 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+
+org.apache.nifi
+nifi-pulsar-bundle
+1.6.0-SNAPSHOT
+
+
+nifi-pulsar-client-service-api
+jar
+
+
+
+org.apache.nifi
+nifi-api
+provided
+
+
+   org.apache.pulsar
--- End diff --

Nit: indent level is broken here.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174866349
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/PoolableResource.java
 ---
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar.pool;
+
+public interface PoolableResource {
+
+public void close();
+
+public boolean isClosed();
--- End diff --

Javadoc


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174866791
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/ResourceExceptionHandler.java
 ---
@@ -0,0 +1,23 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar.pool;
+
+public interface ResourceExceptionHandler {
+
+void handle(Exception exc);
--- End diff --

Javadoc


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174865317
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientPool.java
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+
+
+@Tags({"Pulsar"})
+@CapabilityDescription("Provides the ability to create Pulsar Producer / 
Consumer instances on demand, based on the configuration."
+ + "properties defined")
+public interface PulsarClientPool extends ControllerService {
+
+public ResourcePool getProducerPool();
--- End diff --

There should be a basic javadoc here.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2553#discussion_r174865373
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientPool.java
 ---
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+
+
+@Tags({"Pulsar"})
+@CapabilityDescription("Provides the ability to create Pulsar Producer / 
Consumer instances on demand, based on the configuration."
+ + "properties defined")
+public interface PulsarClientPool extends ControllerService {
+
+public ResourcePool getProducerPool();
+
+public ResourcePool getConsumerPool();
--- End diff --

Same here.


---


[GitHub] nifi pull request #2553: Nifi 4908 rebase

2018-03-15 Thread david-streamlio
GitHub user david-streamlio opened a pull request:

https://github.com/apache/nifi/pull/2553

Nifi 4908 rebase

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [ ] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [ ] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/david-streamlio/nifi NIFI-4908-rebase

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2553.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2553


commit e4f8550159132d85d0e19d61f24e087e51742dee
Author: David Kjerrumgaard 
Date:   2018-02-24T01:40:22Z

Added Pulsar processors and Controller Service

commit b8b9601f2469df811b59b1019f0b7052dff9abd3
Author: David Kjerrumgaard 
Date:   2018-02-26T19:19:26Z

Updated all nifi component version references to 1.6.0-SNAPSHOT

commit 2ebdc06394d3d6f5fa71ec07fc5d086e81f4e632
Author: David Kjerrumgaard 
Date:   2018-02-26T19:48:56Z

Merge remote-tracking branch 'upstream/master' into NIFI-4908

commit d4e82ecc49e62596264db206f26b3b6a93f4b021
Author: David Kjerrumgaard 
Date:   2018-02-28T18:18:35Z

Refactored Processors to indicate the supported version of Pulsar, i.e _1_0

commit 8b24b2496d404a65e6427273395ae73e558a5531
Author: David Kjerrumgaard 
Date:   2018-02-24T01:40:22Z

Added Pulsar processors and Controller Service

commit 29ca7f2b653503de9ad13da9f6c49cc2f311eaae
Author: David Kjerrumgaard 
Date:   2018-02-26T19:19:26Z

Updated all nifi component version references to 1.6.0-SNAPSHOT

commit 3e30c9530ab0cd844fb900607dd00ff1a4014221
Author: David Kjerrumgaard 
Date:   2018-02-28T18:18:35Z

Refactored Processors to indicate the supported version of Pulsar, i.e _1_0

commit 20b451d66ba6f7bbcc057f752a25101e33580469
Author: David Kjerrumgaard 
Date:   2018-02-24T01:40:22Z

Added Pulsar processors and Controller Service

commit 475491195d2f1fc2c24a4f403d6fc6cfd30982b0
Author: David Kjerrumgaard 
Date:   2018-02-26T19:19:26Z

Updated all nifi component version references to 1.6.0-SNAPSHOT

commit 046e28703914f66efbea27a6112bc88b33750bdf
Author: David Kjerrumgaard 
Date:   2018-02-28T18:18:35Z

Refactored Processors to indicate the supported version of Pulsar, i.e _1_0

commit f36b0b61680c55a5ac0279a44cdc580b42f20118
Author: David Kjerrumgaard 
Date:   2018-03-10T03:01:06Z

Combined Pulsar components into a single bundle

commit 7de9f99be1724178e768e89737a1b2d57e45f08b
Author: David Kjerrumgaard 
Date:   2018-03-10T05:48:25Z

Merged with HEAD

commit 03e8ef2a19381cd3249eb45c471c36b4ae265566
Author: David Kjerrumgaard 
Date:   2018-03-10T06:26:46Z

Fixed parent.relativePath in POM files

commit e61bc9b559cac45200159efa6d875d68a5713cfa
Author: David Kjerrumgaard 
Date:   2018-03-10T06:29:43Z

Added nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-nar/ 
module

commit 27d3f86315dd91fc8e61d4399caa289304bf5fd2
Author: David Kjerrumgaard 
Date:   2018-03-10T06:41:34Z

Added dependency versions that