[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors

2018-11-20 Thread david-streamlio
Github user david-streamlio commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3178#discussion_r235123524
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+public class StandardPulsarClientService extends AbstractControllerService 
implements PulsarClientService {
+
+public static final PropertyDescriptor PULSAR_SERVICE_URL = new 
PropertyDescriptor.Builder()
+.name("PULSAR_SERVICE_URL")
+.displayName("Pulsar Service URL")
+.description("URL for the Pulsar cluster, e.g localhost:6650")
+.required(true)
+.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.build();
+
+public static final PropertyDescriptor 
ACCEPT_UNTRUSTED_TLS_CERTIFICATE_FROM_BROKER = new PropertyDescriptor.Builder()
+.name("ACCEPT_UNTRUSTED_TLS_CERTIFICATE_FROM_BROKER")
+.displayName("Allow TLS insecure connection")
+.description("If a valid trusted certificate is provided in 
the 'TLS Trust Certs File Path' property of this controller service,"
++ " then, by default, all communication between this 
controller service and the Apache Pulsar broker will be secured via"
++ " TLS and only use the trusted TLS certificate from 
broker. Setting this property to 'false' will allow this controller"
++ " service to accept an untrusted TLS certificate 
from broker as well. This property should only be set to false if you trust"
++ " the broker you are connecting to, but do not have 
access to the TLS certificate file.")
+.required(false)
+.allowableValues("true", "false")
+.defaultValue("false")
+.build();
+
+public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = 
new PropertyDescriptor.Builder()
+.name("CONCURRENT_LOOKUP_REQUESTS")
+.displayName("Maximum concurrent lookup-requests")
+.description("Number of concurrent lookup-requests allowed on 
each broker-connection.")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.defaultValue("5000")
+.build();
+
+public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new 
PropertyDescriptor.Builder()
+.name("CONNECTIONS_PER_BROKER")
+ 

[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors

2018-11-20 Thread david-streamlio
Github user david-streamlio commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3178#discussion_r235123261
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarProducerProcessor.java
 ---
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientService;
+import org.apache.nifi.pulsar.cache.LRUCache;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import 
org.apache.pulsar.shade.org.apache.commons.collections.CollectionUtils;
+
+public abstract class AbstractPulsarProducerProcessor extends 
AbstractProcessor {
+
+public static final String MSG_COUNT = "msg.count";
+public static final String TOPIC_NAME = "topic.name";
+
+static final AllowableValue COMPRESSION_TYPE_NONE = new 
AllowableValue("NONE", "None", "No compression");
+static final AllowableValue COMPRESSION_TYPE_LZ4 = new 
AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm.");
+static final AllowableValue COMPRESSION_TYPE_ZLIB = new 
AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm");
+
+static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = 
new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a 
custom partition");
+static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION 
= new AllowableValue("RoundRobinPartition", "Round Robin Partition", "Route 
messages to all "
+   
+ "partitions in a round robin 
manner");
+static final AllowableValue MESSAGE_ROUTING_MODE_SINGLE_PARTITION = 
new AllowableValue("SinglePartition", "Single Partition", "Route messages to a 
single partition");
+
+public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
+.name("success")
+.description("FlowFiles for which all content was sent to 
Pulsar.")
+.build();
+
+public static final Relationship REL_FAILURE = new 
Relationship.Builder()
+.name("failure")
+.description("Any FlowFile that cannot be sent to Pulsar will 
be routed to this Relationship")
+.build();
+
+public static final PropertyDescriptor PULSAR_CLIENT_SERVICE = new 
PropertyDescriptor.Builder()
+.name("PULSAR_CLIENT_SERVICE")
+ 

[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors

2018-11-20 Thread joewitt
Github user joewitt commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3178#discussion_r235108402
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord.java
 ---
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@CapabilityDescription("Consumes messages from Apache Pulsar. "
++ "The complementary NiFi processor for sending messages is 
PublishPulsarRecord. Please note that, at this time, "
++ "the Processor assumes that all records that are retrieved have 
the same schema. If any of the Pulsar messages "
++ "that are pulled but cannot be parsed or written with the 
configured Record Reader or Record Writer, the contents "
++ "of the message will be written to a separate FlowFile, and that 
FlowFile will be transferred to the 'parse.failure' "
++ "relationship. Otherwise, each FlowFile is sent to the 'success' 
relationship and may contain many individual "
++ "messages within the single FlowFile. A 'record.count' attribute 
is added to indicate how many messages are contained in the "
++ "FlowFile. No two Pulsar messages will be placed into the same 
FlowFile if they have different schemas.")
+@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", 
"Ingress", "Topic", "PubSub", "Consume"})
+@WritesAttributes({
+@WritesAttribute(attribute = "record.count", description = "The number 
of records received")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@SeeAlso({PublishPulsar.class, ConsumePulsar.class, 
PublishPulsarRecord.class})
+public class ConsumePulsarRecord extends 
AbstractPulsarConsumerProcessor {
+
+public static final String MSG_COUNT = "record.count";
+
+public static final PropertyDescriptor 

[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors

2018-11-20 Thread joewitt
Github user joewitt commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3178#discussion_r235107847
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar.java
 ---
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@SeeAlso({PublishPulsar.class, ConsumePulsarRecord.class, 
PublishPulsarRecord.class})
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar. The 
complementary NiFi processor for sending messages is PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar extends AbstractPulsarConsumerProcessor 
{
+
+@Override
+public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+try {
+Consumer consumer = getConsumer(context, 
getConsumerId(context, session.get()));
+
+if (consumer == null) {
+context.yield();
+return;
+}
+
+if (context.getProperty(ASYNC_ENABLED).asBoolean()) {
+consumeAsync(consumer, context, session);
+handleAsync(consumer, context, session);
+} else {
+consume(consumer, context, session);
+}
+} catch (PulsarClientException e) {
+getLogger().error("Unable to consume from Pulsar Topic ", e);
+context.yield();
+throw new ProcessException(e);
+}
+}
+
+private void handleAsync(final Consumer consumer, 
ProcessContext context, ProcessSession session) {
+try {
+Future> done = getConsumerService().poll(50, 
TimeUnit.MILLISECONDS);
+
+if (done != null) {
+   Message msg = done.get();
+
+   if (msg != null) {
+  FlowFile flowFile = null;
+  final byte[] value = msg.getData();
+  if (value != null && value.length > 0) {
+  flowFile = session.create();
+  flowFile = session.write(flowFile, out -> {
+  out.write(value);
+  });
+
+ session.getProvenanceReporter().receive(flowFile, 
getPulsarClientService().getPulsarBrokerRootURL() + "/" + consumer.getTopic());
+ session.transfer(flowFile, REL_SUCCESS);
+ session.commit();
+  }
+  // Acknowledge consuming the message
+  getAckService().submit(new Callable() {
+  @Override
+  public Object call() throws Exception {
+ return consumer.acknowledgeAsync(msg).get();
+  }
+   });
+  }
+} else {
+  

[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors

2018-11-20 Thread david-streamlio
Github user david-streamlio commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3178#discussion_r235107594
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+public class StandardPulsarClientService extends AbstractControllerService 
implements PulsarClientService {
+
+public static final PropertyDescriptor PULSAR_SERVICE_URL = new 
PropertyDescriptor.Builder()
+.name("PULSAR_SERVICE_URL")
+.displayName("Pulsar Service URL")
+.description("URL for the Pulsar cluster, e.g localhost:6650")
+.required(true)
+.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.build();
+
+public static final PropertyDescriptor 
ACCEPT_UNTRUSTED_TLS_CERTIFICATE_FROM_BROKER = new PropertyDescriptor.Builder()
--- End diff --

Ok, I will remove that property


---


[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors

2018-11-20 Thread david-streamlio
Github user david-streamlio commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3178#discussion_r235107339
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/cache/LRUCache.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar.cache;
+
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class LRUCache {
--- End diff --

@joewitt The LRU Cache code wasn't copied from anywhere, and is just my 
naive implementation of the LRU Cache.  Most java-based examples I see utilize 
a LinkedHashMap and I do not. 

I do appreciate your need to ensure 100% Apache compliance with the code, 
so I can provide an implementation of the LRU cache based on the 

https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/map/LRUMap.html
 class instead, if you think that would be better/safer from a licensing 
perspective


---


[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors

2018-11-20 Thread joewitt
Github user joewitt commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3178#discussion_r235106784
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarProducerProcessor.java
 ---
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientService;
+import org.apache.nifi.pulsar.cache.LRUCache;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerBuilder;
+import org.apache.pulsar.client.api.PulsarClientException;
+import 
org.apache.pulsar.shade.org.apache.commons.collections.CollectionUtils;
+
+public abstract class AbstractPulsarProducerProcessor extends 
AbstractProcessor {
+
+public static final String MSG_COUNT = "msg.count";
+public static final String TOPIC_NAME = "topic.name";
+
+static final AllowableValue COMPRESSION_TYPE_NONE = new 
AllowableValue("NONE", "None", "No compression");
+static final AllowableValue COMPRESSION_TYPE_LZ4 = new 
AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm.");
+static final AllowableValue COMPRESSION_TYPE_ZLIB = new 
AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm");
+
+static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = 
new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a 
custom partition");
+static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION 
= new AllowableValue("RoundRobinPartition", "Round Robin Partition", "Route 
messages to all "
+   
+ "partitions in a round robin 
manner");
+static final AllowableValue MESSAGE_ROUTING_MODE_SINGLE_PARTITION = 
new AllowableValue("SinglePartition", "Single Partition", "Route messages to a 
single partition");
+
+public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
+.name("success")
+.description("FlowFiles for which all content was sent to 
Pulsar.")
+.build();
+
+public static final Relationship REL_FAILURE = new 
Relationship.Builder()
+.name("failure")
+.description("Any FlowFile that cannot be sent to Pulsar will 
be routed to this Relationship")
+.build();
+
+public static final PropertyDescriptor PULSAR_CLIENT_SERVICE = new 
PropertyDescriptor.Builder()
+.name("PULSAR_CLIENT_SERVICE")
+

[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors

2018-11-20 Thread joewitt
Github user joewitt commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3178#discussion_r235102894
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/pom.xml ---
@@ -0,0 +1,84 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+
+org.apache.nifi
+nifi-pulsar-bundle
+1.9.0-SNAPSHOT
+
+
+nifi-pulsar-processors
+jar
+
+
+
+org.apache.nifi
+nifi-api
+
+
+org.apache.nifi
+nifi-record-serialization-service-api
+
+
+org.apache.nifi
+nifi-record
+
+
+org.apache.nifi
+nifi-utils
+1.9.0-SNAPSHOT
+
+ 
+org.apache.nifi
+nifi-ssl-context-service-api
+
+
+org.apache.nifi
+nifi-pulsar-client-service-api
+1.9.0-SNAPSHOT
+provided
+   
+
+org.apache.pulsar
+pulsar-client
+${pulsar.version}
+   
+
+org.apache.nifi
+nifi-mock
+test
+1.9.0-SNAPSHOT
+
+
+org.slf4j
+slf4j-simple
+test
+
+
+junit
+junit
+test
+
+
+   org.apache.commons
+   commons-lang3
+   3.7
--- End diff --

assuming we really need it


---


[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors

2018-11-20 Thread joewitt
Github user joewitt commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3178#discussion_r235102831
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/pom.xml ---
@@ -0,0 +1,84 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+
+org.apache.nifi
+nifi-pulsar-bundle
+1.9.0-SNAPSHOT
+
+
+nifi-pulsar-processors
+jar
+
+
+
+org.apache.nifi
+nifi-api
+
+
+org.apache.nifi
+nifi-record-serialization-service-api
+
+
+org.apache.nifi
+nifi-record
+
+
+org.apache.nifi
+nifi-utils
+1.9.0-SNAPSHOT
+
+ 
+org.apache.nifi
+nifi-ssl-context-service-api
+
+
+org.apache.nifi
+nifi-pulsar-client-service-api
+1.9.0-SNAPSHOT
+provided
+   
+
+org.apache.pulsar
+pulsar-client
+${pulsar.version}
+   
+
+org.apache.nifi
+nifi-mock
+test
+1.9.0-SNAPSHOT
+
+
+org.slf4j
+slf4j-simple
+test
+
+
+junit
+junit
+test
+
+
+   org.apache.commons
+   commons-lang3
+   3.7
--- End diff --

this should move up to current release which I think is 3.8.1


---


[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors

2018-11-20 Thread joewitt
Github user joewitt commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3178#discussion_r235102049
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+public class StandardPulsarClientService extends AbstractControllerService 
implements PulsarClientService {
+
+public static final PropertyDescriptor PULSAR_SERVICE_URL = new 
PropertyDescriptor.Builder()
+.name("PULSAR_SERVICE_URL")
+.displayName("Pulsar Service URL")
+.description("URL for the Pulsar cluster, e.g localhost:6650")
+.required(true)
+.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.build();
+
+public static final PropertyDescriptor 
ACCEPT_UNTRUSTED_TLS_CERTIFICATE_FROM_BROKER = new PropertyDescriptor.Builder()
+.name("ACCEPT_UNTRUSTED_TLS_CERTIFICATE_FROM_BROKER")
+.displayName("Allow TLS insecure connection")
+.description("If a valid trusted certificate is provided in 
the 'TLS Trust Certs File Path' property of this controller service,"
++ " then, by default, all communication between this 
controller service and the Apache Pulsar broker will be secured via"
++ " TLS and only use the trusted TLS certificate from 
broker. Setting this property to 'false' will allow this controller"
++ " service to accept an untrusted TLS certificate 
from broker as well. This property should only be set to false if you trust"
++ " the broker you are connecting to, but do not have 
access to the TLS certificate file.")
+.required(false)
+.allowableValues("true", "false")
+.defaultValue("false")
+.build();
+
+public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = 
new PropertyDescriptor.Builder()
+.name("CONCURRENT_LOOKUP_REQUESTS")
+.displayName("Maximum concurrent lookup-requests")
+.description("Number of concurrent lookup-requests allowed on 
each broker-connection.")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.defaultValue("5000")
+.build();
+
+public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new 
PropertyDescriptor.Builder()
+.name("CONNECTIONS_PER_BROKER")
+

[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors

2018-11-20 Thread joewitt
Github user joewitt commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3178#discussion_r235101491
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java
 ---
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+public class StandardPulsarClientService extends AbstractControllerService 
implements PulsarClientService {
+
+public static final PropertyDescriptor PULSAR_SERVICE_URL = new 
PropertyDescriptor.Builder()
+.name("PULSAR_SERVICE_URL")
+.displayName("Pulsar Service URL")
+.description("URL for the Pulsar cluster, e.g localhost:6650")
+.required(true)
+.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+.build();
+
+public static final PropertyDescriptor 
ACCEPT_UNTRUSTED_TLS_CERTIFICATE_FROM_BROKER = new PropertyDescriptor.Builder()
--- End diff --

@david-streamlio this is the property i am saying we should eliminate.  
It's fine that pulsar supports this but we dont need to from nifi.  We spend a 
ton of time/energy on security and we need to get more serious on it.  I see no 
reason to keep this here.  If we later find this to be a real problem then we 
could talk about ways to improve the situation by helping them generate proper 
certs/etc..  Also, I dont think we claim this is ok 'if you trust the broker 
you're connecting to' - the point is it could be any broker...  This is 
inherently not a good thing to use.  We should solve the problem.  We can do 
that later.


---


[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors

2018-11-20 Thread joewitt
Github user joewitt commented on a diff in the pull request:

https://github.com/apache/nifi/pull/3178#discussion_r235099937
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/cache/LRUCache.java
 ---
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar.cache;
+
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class LRUCache {
--- End diff --

@david-streamlio can you confirm this is a uniquely written class for this 
purpose and not taken from elsewhere?  I ask because I've had the displeasure 
of finding copied code before and it makes for some build/release messes and 
this is the type of thing that often hits that trigger.  This is fresh/clean 
room stuff?


---


[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors

2018-11-19 Thread david-streamlio
GitHub user david-streamlio opened a pull request:

https://github.com/apache/nifi/pull/3178

NIFI-4914: Add Apache Pulsar processors

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [ ] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [ ] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/david-streamlio/nifi NIFI-4914

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/3178.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3178






---