[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-05-14 Thread david-streamlio
Github user david-streamlio closed the pull request at:

https://github.com/apache/nifi/pull/2614


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183235809
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/pom.xml ---
@@ -0,0 +1,67 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+
+org.apache.nifi
+nifi-pulsar-bundle
+1.7.0-SNAPSHOT
+
+
+nifi-pulsar-client-service
+jar
+
+
+
+org.apache.nifi
+nifi-pulsar-client-service-api
+1.7.0-SNAPSHOT
--- End diff --

This should be marked as provided as well.


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183234826
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_X extends AbstractPulsarConsumerProcessor {
+
+private static final List PROPERTIES;
+private static final Set RELATIONSHIPS;
+
+static {
+final List properties = new ArrayList<>();
+properties.add(PULSAR_CLIENT_SERVICE);
+properties.add(TOPIC);
+properties.add(SUBSCRIPTION);
+properties.add(ASYNC_ENABLED);
+properties.add(MAX_ASYNC_REQUESTS);
+properties.add(ACK_TIMEOUT);
+properties.add(PRIORITY_LEVEL);
+properties.add(RECEIVER_QUEUE_SIZE);
+properties.add(SUBSCRIPTION_TYPE);
+properties.add(MAX_WAIT_TIME);
+
+PROPERTIES = Collections.unmodifiableList(properties);
+
+final Set relationships = new HashSet<>();
+relationships.add(REL_SUCCESS);
+RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+}
+
+@Override
+public Set getRelationships() {
+return RELATIONSHIPS;
+}
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+return PROPERTIES;
+}
+
+@Override
+public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+
+try {
+if (context.getProperty(ASYNC_ENABLED).asBoolean()) {
+// Launch consumers
+consumeAsync(context, session);
+
+// Handle completed consumers
+handleAsync(context, session);
+
+} else {
+consume(context, session);
+}
+} catch (PulsarClientException e) {
+getLogger().error("Unable to consume from Pulsar Topic ", e);
+context.yield();
+throw new ProcessException(e);
+}
+
+}
+
+private void handleAsync(ProcessContext context, ProcessSession 
session) {
+
+try {
+Future done = consumerService.take();
+Message msg = done.get();
+
+if 

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183235065
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+
+@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", 
"Send", "Message", "PubSub", "1.0"})
+@CapabilityDescription("Sends the contents of a FlowFile as individual 
records to Apache Pulsar using the Pulsar 1.x client API. "
++ "The contents of the FlowFile are expected to be record-oriented 
data that can be read by the configured Record Reader. "
++ "The complementary NiFi processor for fetching messages is 
ConsumePulsarRecord_1_0.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Pulsar for this FlowFile. This attribute is added 
only to "
++ "FlowFiles that are routed to success.")
+@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, 
ConsumePulsarRecord_1_X.class})
+public class PublishPulsarRecord_1_X extends 
AbstractPulsarProducerProcessor {
+
+private static final List PROPERTIES;
+private static final Set RELATIONSHIPS;
+
+static {
+final List properties = new ArrayList<>();
+properties.add(PULSAR_CLIENT_SERVICE);
+properties.add(RECORD_READER);
+properties.add(RECORD_WRITER);
+properties.add(TOPIC);
+properties.add(ASYNC_ENABLED);
+properties.add(MAX_ASYNC_REQUESTS);
+properties.add(BATCHING_ENABLED);
+

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183233581
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord_1_X.java
 ---
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+
+@CapabilityDescription("Consumes messages from Apache Pulsar specifically 
built against the Pulsar 1.x Consumer API. "
++ "The complementary NiFi processor for sending messages is 
PublishPulsarRecord_1_0. Please note that, at this time, "
++ "the Processor assumes that all records that are retrieved from 
a given partition have the same schema. If any "
++ "of the Pulsar messages that are pulled but cannot be parsed or 
written with the configured Record Reader or "
++ "Record Writer, the contents of the message will be written to a 
separate FlowFile, and that FlowFile will be transferred to the "
++ "'parse.failure' relationship. Otherwise, each FlowFile is sent 
to the 'success' relationship and may contain many individual "
++ "messages within the single FlowFile. A 'record.count' attribute 
is added to indicate how many messages are contained in the "
++ "FlowFile. No two Pulsar messages will be placed into the same 
FlowFile if they have different schemas.")
+@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", 
"Ingress", "Topic", "PubSub", "Consume"})
+@WritesAttributes({
+@WritesAttribute(attribute = "record.count", description = "The number 
of records received")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183234943
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+
+@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", 
"Send", "Message", "PubSub", "1.0"})
+@CapabilityDescription("Sends the contents of a FlowFile as individual 
records to Apache Pulsar using the Pulsar 1.x client API. "
++ "The contents of the FlowFile are expected to be record-oriented 
data that can be read by the configured Record Reader. "
++ "The complementary NiFi processor for fetching messages is 
ConsumePulsarRecord_1_0.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Pulsar for this FlowFile. This attribute is added 
only to "
++ "FlowFiles that are routed to success.")
+@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, 
ConsumePulsarRecord_1_X.class})
+public class PublishPulsarRecord_1_X extends 
AbstractPulsarProducerProcessor {
+
+private static final List PROPERTIES;
+private static final Set RELATIONSHIPS;
+
+static {
+final List properties = new ArrayList<>();
+properties.add(PULSAR_CLIENT_SERVICE);
+properties.add(RECORD_READER);
+properties.add(RECORD_WRITER);
+properties.add(TOPIC);
+properties.add(ASYNC_ENABLED);
+properties.add(MAX_ASYNC_REQUESTS);
+properties.add(BATCHING_ENABLED);
+

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183235109
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+
+@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", 
"Send", "Message", "PubSub", "1.0"})
+@CapabilityDescription("Sends the contents of a FlowFile as individual 
records to Apache Pulsar using the Pulsar 1.x client API. "
++ "The contents of the FlowFile are expected to be record-oriented 
data that can be read by the configured Record Reader. "
++ "The complementary NiFi processor for fetching messages is 
ConsumePulsarRecord_1_0.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Pulsar for this FlowFile. This attribute is added 
only to "
++ "FlowFiles that are routed to success.")
+@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, 
ConsumePulsarRecord_1_X.class})
+public class PublishPulsarRecord_1_X extends 
AbstractPulsarProducerProcessor {
+
+private static final List PROPERTIES;
+private static final Set RELATIONSHIPS;
+
+static {
+final List properties = new ArrayList<>();
+properties.add(PULSAR_CLIENT_SERVICE);
+properties.add(RECORD_READER);
+properties.add(RECORD_WRITER);
+properties.add(TOPIC);
+properties.add(ASYNC_ENABLED);
+properties.add(MAX_ASYNC_REQUESTS);
+properties.add(BATCHING_ENABLED);
+

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183234762
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_X extends AbstractPulsarConsumerProcessor {
+
+private static final List PROPERTIES;
+private static final Set RELATIONSHIPS;
+
+static {
+final List properties = new ArrayList<>();
+properties.add(PULSAR_CLIENT_SERVICE);
+properties.add(TOPIC);
+properties.add(SUBSCRIPTION);
+properties.add(ASYNC_ENABLED);
+properties.add(MAX_ASYNC_REQUESTS);
+properties.add(ACK_TIMEOUT);
+properties.add(PRIORITY_LEVEL);
+properties.add(RECEIVER_QUEUE_SIZE);
+properties.add(SUBSCRIPTION_TYPE);
+properties.add(MAX_WAIT_TIME);
+
+PROPERTIES = Collections.unmodifiableList(properties);
+
+final Set relationships = new HashSet<>();
+relationships.add(REL_SUCCESS);
+RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+}
+
+@Override
+public Set getRelationships() {
+return RELATIONSHIPS;
+}
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+return PROPERTIES;
+}
+
+@Override
+public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+
+try {
+if (context.getProperty(ASYNC_ENABLED).asBoolean()) {
+// Launch consumers
+consumeAsync(context, session);
+
+// Handle completed consumers
+handleAsync(context, session);
+
+} else {
+consume(context, session);
+}
+} catch (PulsarClientException e) {
+getLogger().error("Unable to consume from Pulsar Topic ", e);
+context.yield();
+throw new ProcessException(e);
+}
+
+}
+
+private void handleAsync(ProcessContext context, ProcessSession 
session) {
+
+try {
+Future done = consumerService.take();
+Message msg = done.get();
+
+if 

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183209203
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+@Tags({ "Pulsar"})
+@CapabilityDescription("Standard ControllerService implementation of 
PulsarClientService.")
+public class StandardPulsarClientPool extends AbstractControllerService 
implements PulsarClientPool {
+
+public static final PropertyDescriptor PULSAR_SERVICE_URL = new 
PropertyDescriptor
+.Builder().name("PULSAR_SERVICE_URL")
+.displayName("Pulsar Service URL")
+.description("URL for the Pulsar cluster, e.g localhost:6650")
+.required(true)
+.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = 
new PropertyDescriptor.Builder()
+.name("Maximum concurrent lookup-requests")
+.description("Number of concurrent lookup-requests allowed on 
each broker-connection to prevent "
++ "overload on broker. (default: 5000) It should be 
configured with higher value only in case "
++ "of it requires to produce/subscribe on thousands of 
topics")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("5000")
+.build();
+
+public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new 
PropertyDescriptor.Builder()
+.name("Maximum connects per Pulsar broker")
+.description("Sets the max number of connection that the 
client library will open to a single broker.\n" +
+"By default, the connection pool will use a single 
connection for all the producers and consumers. " +
+"Increasing this parameter may improve throughput when 
using many producers over a high latency connection")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("1")
+.build();
+
+public static final PropertyDescriptor IO_THREADS = new 
PropertyDescriptor.Builder()
+.name("I/O Threads")
+.description("The number of threads to be used for handling 
connections to brokers (default: 1 thread)")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183234572
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord_1_X.java
 ---
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+
+@CapabilityDescription("Consumes messages from Apache Pulsar specifically 
built against the Pulsar 1.x Consumer API. "
++ "The complementary NiFi processor for sending messages is 
PublishPulsarRecord_1_0. Please note that, at this time, "
++ "the Processor assumes that all records that are retrieved from 
a given partition have the same schema. If any "
++ "of the Pulsar messages that are pulled but cannot be parsed or 
written with the configured Record Reader or "
++ "Record Writer, the contents of the message will be written to a 
separate FlowFile, and that FlowFile will be transferred to the "
++ "'parse.failure' relationship. Otherwise, each FlowFile is sent 
to the 'success' relationship and may contain many individual "
++ "messages within the single FlowFile. A 'record.count' attribute 
is added to indicate how many messages are contained in the "
++ "FlowFile. No two Pulsar messages will be placed into the same 
FlowFile if they have different schemas.")
+@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", 
"Ingress", "Topic", "PubSub", "Consume"})
+@WritesAttributes({
+@WritesAttribute(attribute = "record.count", description = "The number 
of records received")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183209029
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-nar/src/main/resources/META-INF/NOTICE
 ---
@@ -0,0 +1,612 @@
+nifi-druid-controller-service-api-nar
--- End diff --

Needs to be changed to the pulsar client service.


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183208823
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/PoolableResource.java
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar.pool;
+
+/**
+ * Service interface for any object that can be pooled for re-use., which
+ * defines methods for closing the object, effectively marking it no longer
+ * usable.
+ *
+ * @author david
--- End diff --

Please remove.


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183235163
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsar_1_X.java
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"})
+@CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Pulsar using the Pulsar 1.X Producer API."
++ "The messages to send may be individual FlowFiles or may be 
delimited, using a "
++ "user-specified delimiter, such as a new-line. "
++ "The complementary NiFi processor for fetching messages is 
ConsumePulsar_1_X.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Pulsar for this FlowFile. This attribute is added 
only to "
++ "FlowFiles that are routed to success.")
+public class PublishPulsar_1_X extends AbstractPulsarProducerProcessor {
+
+private static final List PROPERTIES;
+private static final Set RELATIONSHIPS;
+
+static {
+final List properties = new ArrayList<>();
+properties.add(PULSAR_CLIENT_SERVICE);
+properties.add(TOPIC);
+properties.add(ASYNC_ENABLED);
+properties.add(MAX_ASYNC_REQUESTS);
+properties.add(BATCHING_ENABLED);
+properties.add(BATCHING_MAX_MESSAGES);
+properties.add(BATCH_INTERVAL);
+properties.add(BLOCK_IF_QUEUE_FULL);
+properties.add(COMPRESSION_TYPE);
+properties.add(MESSAGE_ROUTING_MODE);
+properties.add(PENDING_MAX_MESSAGES);
+
+PROPERTIES = Collections.unmodifiableList(properties);
+
+final Set relationships = new HashSet<>();
+relationships.add(REL_SUCCESS);
+relationships.add(REL_FAILURE);
+RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+}
+
+@Override
+public Set getRelationships() {
+return RELATIONSHIPS;
+}
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+return PROPERTIES;
+}
+
+@Override
+public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+
+FlowFile flowFile = session.get();
+
+if (flowFile == null) {
+return;
+}
+
+final ComponentLog logger = getLogger();
+final String topic = 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+
+if (StringUtils.isBlank(topic)) {
+

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183208994
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/ResourceFactory.java
 ---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar.pool;
+
+import java.util.Properties;
+
+/**
+ * Factory pattern interface for @PoolableResource objects. Concrete 
implementations
+ * of this interface will be responsible for the creation of 
@PoolableResource objects
+ * based on the Properties passed in.
+ *
+ * @author david
--- End diff --

Please remove


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183210297
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarProducerProcessor.java
 ---
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarProducer;
+import org.apache.nifi.pulsar.cache.LRUCache;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.PulsarClientException;
+import 
org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+
+public abstract class AbstractPulsarProducerProcessor extends 
AbstractPulsarProcessor {
+
+public static final String MSG_COUNT = "msg.count";
+public static final String TOPIC_NAME = "topic.name";
+
+static final AllowableValue COMPRESSION_TYPE_NONE = new 
AllowableValue("NONE", "None", "No compression");
+static final AllowableValue COMPRESSION_TYPE_LZ4 = new 
AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm.");
+static final AllowableValue COMPRESSION_TYPE_ZLIB = new 
AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm");
+
+static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = 
new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a 
custom partition");
+static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION 
= new AllowableValue("RoundRobinPartition", "Round Robin Partition", "Route 
messages to all "
+   
+ "partitions in a round robin 
manner");
+static final AllowableValue MESSAGE_ROUTING_MODE_SINGLE_PARTITION = 
new AllowableValue("SinglePartition", "Single Partition", "Route messages to a 
single partition");
+
+public static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
+.name("topic")
+.displayName("Topic Name")
+.description("The name of the Pulsar Topic.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.build();
+
+public static final PropertyDescriptor ASYNC_ENABLED = new 
PropertyDescriptor.Builder()
+.name("Async Enabled")
+.description("Control whether the messages will be sent 
asyncronously or not. Messages sent"
++ " syncronously will be acknowledged immediately 
before processing the next 

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183234732
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord_1_X.java
 ---
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+
+@CapabilityDescription("Consumes messages from Apache Pulsar specifically 
built against the Pulsar 1.x Consumer API. "
++ "The complementary NiFi processor for sending messages is 
PublishPulsarRecord_1_0. Please note that, at this time, "
++ "the Processor assumes that all records that are retrieved from 
a given partition have the same schema. If any "
++ "of the Pulsar messages that are pulled but cannot be parsed or 
written with the configured Record Reader or "
++ "Record Writer, the contents of the message will be written to a 
separate FlowFile, and that FlowFile will be transferred to the "
++ "'parse.failure' relationship. Otherwise, each FlowFile is sent 
to the 'success' relationship and may contain many individual "
++ "messages within the single FlowFile. A 'record.count' attribute 
is added to indicate how many messages are contained in the "
++ "FlowFile. No two Pulsar messages will be placed into the same 
FlowFile if they have different schemas.")
+@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", 
"Ingress", "Topic", "PubSub", "Consume"})
+@WritesAttributes({
+@WritesAttribute(attribute = "record.count", description = "The number 
of records received")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183234849
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_X extends AbstractPulsarConsumerProcessor {
+
+private static final List PROPERTIES;
+private static final Set RELATIONSHIPS;
+
+static {
+final List properties = new ArrayList<>();
+properties.add(PULSAR_CLIENT_SERVICE);
+properties.add(TOPIC);
+properties.add(SUBSCRIPTION);
+properties.add(ASYNC_ENABLED);
+properties.add(MAX_ASYNC_REQUESTS);
+properties.add(ACK_TIMEOUT);
+properties.add(PRIORITY_LEVEL);
+properties.add(RECEIVER_QUEUE_SIZE);
+properties.add(SUBSCRIPTION_TYPE);
+properties.add(MAX_WAIT_TIME);
+
+PROPERTIES = Collections.unmodifiableList(properties);
+
+final Set relationships = new HashSet<>();
+relationships.add(REL_SUCCESS);
+RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+}
+
+@Override
+public Set getRelationships() {
+return RELATIONSHIPS;
+}
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+return PROPERTIES;
+}
+
+@Override
+public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+
+try {
+if (context.getProperty(ASYNC_ENABLED).asBoolean()) {
+// Launch consumers
+consumeAsync(context, session);
+
+// Handle completed consumers
+handleAsync(context, session);
+
+} else {
+consume(context, session);
+}
+} catch (PulsarClientException e) {
+getLogger().error("Unable to consume from Pulsar Topic ", e);
+context.yield();
+throw new ProcessException(e);
+}
+
+}
+
+private void handleAsync(ProcessContext context, ProcessSession 
session) {
+
+try {
+Future done = consumerService.take();
+Message msg = done.get();
+
+if 

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183235146
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+
+@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", 
"Send", "Message", "PubSub", "1.0"})
+@CapabilityDescription("Sends the contents of a FlowFile as individual 
records to Apache Pulsar using the Pulsar 1.x client API. "
++ "The contents of the FlowFile are expected to be record-oriented 
data that can be read by the configured Record Reader. "
++ "The complementary NiFi processor for fetching messages is 
ConsumePulsarRecord_1_0.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Pulsar for this FlowFile. This attribute is added 
only to "
++ "FlowFiles that are routed to success.")
+@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, 
ConsumePulsarRecord_1_X.class})
+public class PublishPulsarRecord_1_X extends 
AbstractPulsarProducerProcessor {
+
+private static final List PROPERTIES;
+private static final Set RELATIONSHIPS;
+
+static {
+final List properties = new ArrayList<>();
+properties.add(PULSAR_CLIENT_SERVICE);
+properties.add(RECORD_READER);
+properties.add(RECORD_WRITER);
+properties.add(TOPIC);
+properties.add(ASYNC_ENABLED);
+properties.add(MAX_ASYNC_REQUESTS);
+properties.add(BATCHING_ENABLED);
+

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183235215
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/RecordBasedConst.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+public final class RecordBasedConst {
+
+public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+.name("record-reader")
+.displayName("Record Reader")
+.description("The Record Reader to use for incoming FlowFiles")
+.identifiesControllerService(RecordReaderFactory.class)
+.expressionLanguageSupported(false)
+.required(true)
+.build();
+
+public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+.name("record-writer")
+.displayName("Record Writer")
+.description("The Record Writer to use in order to serialize 
the data before sending to Pulsar")
+.identifiesControllerService(RecordSetWriterFactory.class)
+.expressionLanguageSupported(false)
+.required(true)
+.build();
+
+private RecordBasedConst() {
--- End diff --

Not needed.


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183235123
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+
+@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", 
"Send", "Message", "PubSub", "1.0"})
+@CapabilityDescription("Sends the contents of a FlowFile as individual 
records to Apache Pulsar using the Pulsar 1.x client API. "
++ "The contents of the FlowFile are expected to be record-oriented 
data that can be read by the configured Record Reader. "
++ "The complementary NiFi processor for fetching messages is 
ConsumePulsarRecord_1_0.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Pulsar for this FlowFile. This attribute is added 
only to "
++ "FlowFiles that are routed to success.")
+@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, 
ConsumePulsarRecord_1_X.class})
+public class PublishPulsarRecord_1_X extends 
AbstractPulsarProducerProcessor {
+
+private static final List PROPERTIES;
+private static final Set RELATIONSHIPS;
+
+static {
+final List properties = new ArrayList<>();
+properties.add(PULSAR_CLIENT_SERVICE);
+properties.add(RECORD_READER);
+properties.add(RECORD_WRITER);
+properties.add(TOPIC);
+properties.add(ASYNC_ENABLED);
+properties.add(MAX_ASYNC_REQUESTS);
+properties.add(BATCHING_ENABLED);
+

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183234534
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord_1_X.java
 ---
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+
+@CapabilityDescription("Consumes messages from Apache Pulsar specifically 
built against the Pulsar 1.x Consumer API. "
++ "The complementary NiFi processor for sending messages is 
PublishPulsarRecord_1_0. Please note that, at this time, "
++ "the Processor assumes that all records that are retrieved from 
a given partition have the same schema. If any "
++ "of the Pulsar messages that are pulled but cannot be parsed or 
written with the configured Record Reader or "
++ "Record Writer, the contents of the message will be written to a 
separate FlowFile, and that FlowFile will be transferred to the "
++ "'parse.failure' relationship. Otherwise, each FlowFile is sent 
to the 'success' relationship and may contain many individual "
++ "messages within the single FlowFile. A 'record.count' attribute 
is added to indicate how many messages are contained in the "
++ "FlowFile. No two Pulsar messages will be placed into the same 
FlowFile if they have different schemas.")
+@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", 
"Ingress", "Topic", "PubSub", "Consume"})
+@WritesAttributes({
+@WritesAttribute(attribute = "record.count", description = "The number 
of records received")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183234861
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/InFlightMessageMonitor.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Helper class to monitor the asynchronous submission of a large number
+ * of records to Apache Pulsar.
+ *
+ * @author david
--- End diff --

Please remove.


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183234779
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_X extends AbstractPulsarConsumerProcessor {
+
+private static final List PROPERTIES;
+private static final Set RELATIONSHIPS;
+
+static {
+final List properties = new ArrayList<>();
+properties.add(PULSAR_CLIENT_SERVICE);
+properties.add(TOPIC);
+properties.add(SUBSCRIPTION);
+properties.add(ASYNC_ENABLED);
+properties.add(MAX_ASYNC_REQUESTS);
+properties.add(ACK_TIMEOUT);
+properties.add(PRIORITY_LEVEL);
+properties.add(RECEIVER_QUEUE_SIZE);
+properties.add(SUBSCRIPTION_TYPE);
+properties.add(MAX_WAIT_TIME);
+
+PROPERTIES = Collections.unmodifiableList(properties);
+
+final Set relationships = new HashSet<>();
+relationships.add(REL_SUCCESS);
+RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+}
+
+@Override
+public Set getRelationships() {
+return RELATIONSHIPS;
+}
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+return PROPERTIES;
+}
+
+@Override
+public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+
+try {
+if (context.getProperty(ASYNC_ENABLED).asBoolean()) {
+// Launch consumers
+consumeAsync(context, session);
+
+// Handle completed consumers
+handleAsync(context, session);
+
+} else {
+consume(context, session);
+}
+} catch (PulsarClientException e) {
+getLogger().error("Unable to consume from Pulsar Topic ", e);
+context.yield();
+throw new ProcessException(e);
+}
+
+}
+
+private void handleAsync(ProcessContext context, ProcessSession 
session) {
+
+try {
+Future done = consumerService.take();
+Message msg = done.get();
+
+if 

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183234539
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord_1_X.java
 ---
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+
+@CapabilityDescription("Consumes messages from Apache Pulsar specifically 
built against the Pulsar 1.x Consumer API. "
++ "The complementary NiFi processor for sending messages is 
PublishPulsarRecord_1_0. Please note that, at this time, "
++ "the Processor assumes that all records that are retrieved from 
a given partition have the same schema. If any "
++ "of the Pulsar messages that are pulled but cannot be parsed or 
written with the configured Record Reader or "
++ "Record Writer, the contents of the message will be written to a 
separate FlowFile, and that FlowFile will be transferred to the "
++ "'parse.failure' relationship. Otherwise, each FlowFile is sent 
to the 'success' relationship and may contain many individual "
++ "messages within the single FlowFile. A 'record.count' attribute 
is added to indicate how many messages are contained in the "
++ "FlowFile. No two Pulsar messages will be placed into the same 
FlowFile if they have different schemas.")
+@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", 
"Ingress", "Topic", "PubSub", "Consume"})
+@WritesAttributes({
+@WritesAttribute(attribute = "record.count", description = "The number 
of records received")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183209099
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+@Tags({ "Pulsar"})
+@CapabilityDescription("Standard ControllerService implementation of 
PulsarClientService.")
+public class StandardPulsarClientPool extends AbstractControllerService 
implements PulsarClientPool {
+
+public static final PropertyDescriptor PULSAR_SERVICE_URL = new 
PropertyDescriptor
+.Builder().name("PULSAR_SERVICE_URL")
+.displayName("Pulsar Service URL")
+.description("URL for the Pulsar cluster, e.g localhost:6650")
+.required(true)
+.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = 
new PropertyDescriptor.Builder()
+.name("Maximum concurrent lookup-requests")
+.description("Number of concurrent lookup-requests allowed on 
each broker-connection to prevent "
++ "overload on broker. (default: 5000) It should be 
configured with higher value only in case "
++ "of it requires to produce/subscribe on thousands of 
topics")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("5000")
+.build();
+
+public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new 
PropertyDescriptor.Builder()
+.name("Maximum connects per Pulsar broker")
+.description("Sets the max number of connection that the 
client library will open to a single broker.\n" +
+"By default, the connection pool will use a single 
connection for all the producers and consumers. " +
+"Increasing this parameter may improve throughput when 
using many producers over a high latency connection")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
--- End diff --

> Consider adding 
expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).

In fact, you might want to consider that for all of these properties so you 
can make it more customizable.


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183235251
  
--- Diff: nifi-nar-bundles/pom.xml ---
@@ -93,6 +93,7 @@
 nifi-spark-bundle
 nifi-atlas-bundle
 nifi-druid-bundle
+nifi-pulsar-bundle
--- End diff --

You need to add a dependency declaration for the NAR in 
`nifi-assembly/pom.xml`. Just putting that note here since there's no really 
good place to track that in the review.


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183208828
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientPool.java
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+
+
+@Tags({"Pulsar"})
+@CapabilityDescription("Provides the ability to create Pulsar Producer / 
Consumer instances on demand, based on the configuration."
+ + "properties defined")
+/**
+ * Service definition for apache Pulsar Client ControllerService
+ * responsible for maintaining a pool of @PulsarProducer and
+ * @PulsarConsumer objects.
+ *
+ * Since both of these objects can be reused, in a manner similar
+ * to database connections, and the cost to create these objects is
+ * relatively high. The PulsarClientPool keeps these objects in pools
+ * for re-use.
+ *
+ * @author david
--- End diff --

Please remove.


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183209058
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+@Tags({ "Pulsar"})
+@CapabilityDescription("Standard ControllerService implementation of 
PulsarClientService.")
+public class StandardPulsarClientPool extends AbstractControllerService 
implements PulsarClientPool {
+
+public static final PropertyDescriptor PULSAR_SERVICE_URL = new 
PropertyDescriptor
+.Builder().name("PULSAR_SERVICE_URL")
+.displayName("Pulsar Service URL")
+.description("URL for the Pulsar cluster, e.g localhost:6650")
+.required(true)
+.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
--- End diff --

Consider adding 
`expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)`.


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183208835
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientPool.java
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+
+
+@Tags({"Pulsar"})
--- End diff --

Could use others like "client" and "pool."


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183235033
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+
+@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", 
"Send", "Message", "PubSub", "1.0"})
+@CapabilityDescription("Sends the contents of a FlowFile as individual 
records to Apache Pulsar using the Pulsar 1.x client API. "
++ "The contents of the FlowFile are expected to be record-oriented 
data that can be read by the configured Record Reader. "
++ "The complementary NiFi processor for fetching messages is 
ConsumePulsarRecord_1_0.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Pulsar for this FlowFile. This attribute is added 
only to "
++ "FlowFiles that are routed to success.")
+@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, 
ConsumePulsarRecord_1_X.class})
+public class PublishPulsarRecord_1_X extends 
AbstractPulsarProducerProcessor {
+
+private static final List PROPERTIES;
+private static final Set RELATIONSHIPS;
+
+static {
+final List properties = new ArrayList<>();
+properties.add(PULSAR_CLIENT_SERVICE);
+properties.add(RECORD_READER);
+properties.add(RECORD_WRITER);
+properties.add(TOPIC);
+properties.add(ASYNC_ENABLED);
+properties.add(MAX_ASYNC_REQUESTS);
+properties.add(BATCHING_ENABLED);
+

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183234838
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_X extends AbstractPulsarConsumerProcessor {
+
+private static final List PROPERTIES;
+private static final Set RELATIONSHIPS;
+
+static {
+final List properties = new ArrayList<>();
+properties.add(PULSAR_CLIENT_SERVICE);
+properties.add(TOPIC);
+properties.add(SUBSCRIPTION);
+properties.add(ASYNC_ENABLED);
+properties.add(MAX_ASYNC_REQUESTS);
+properties.add(ACK_TIMEOUT);
+properties.add(PRIORITY_LEVEL);
+properties.add(RECEIVER_QUEUE_SIZE);
+properties.add(SUBSCRIPTION_TYPE);
+properties.add(MAX_WAIT_TIME);
+
+PROPERTIES = Collections.unmodifiableList(properties);
+
+final Set relationships = new HashSet<>();
+relationships.add(REL_SUCCESS);
+RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+}
+
+@Override
+public Set getRelationships() {
+return RELATIONSHIPS;
+}
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+return PROPERTIES;
+}
+
+@Override
+public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+
+try {
+if (context.getProperty(ASYNC_ENABLED).asBoolean()) {
+// Launch consumers
+consumeAsync(context, session);
+
+// Handle completed consumers
+handleAsync(context, session);
+
+} else {
+consume(context, session);
+}
+} catch (PulsarClientException e) {
+getLogger().error("Unable to consume from Pulsar Topic ", e);
+context.yield();
+throw new ProcessException(e);
+}
+
+}
+
+private void handleAsync(ProcessContext context, ProcessSession 
session) {
+
+try {
+Future done = consumerService.take();
+Message msg = done.get();
+
+if 

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183209046
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+@Tags({ "Pulsar"})
--- End diff --

Should be fleshed out with more tags.


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183209071
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+@Tags({ "Pulsar"})
+@CapabilityDescription("Standard ControllerService implementation of 
PulsarClientService.")
+public class StandardPulsarClientPool extends AbstractControllerService 
implements PulsarClientPool {
+
+public static final PropertyDescriptor PULSAR_SERVICE_URL = new 
PropertyDescriptor
+.Builder().name("PULSAR_SERVICE_URL")
+.displayName("Pulsar Service URL")
+.description("URL for the Pulsar cluster, e.g localhost:6650")
+.required(true)
+.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = 
new PropertyDescriptor.Builder()
+.name("Maximum concurrent lookup-requests")
+.description("Number of concurrent lookup-requests allowed on 
each broker-connection to prevent "
++ "overload on broker. (default: 5000) It should be 
configured with higher value only in case "
++ "of it requires to produce/subscribe on thousands of 
topics")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("5000")
--- End diff --

Are you sure this is a sane default? Your description suggests that 5,000 
is the upper end of where most users would want to be.


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-10 Thread pvillard31
Github user pvillard31 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r180328774
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/pom.xml ---
@@ -0,0 +1,67 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+
+org.apache.nifi
+nifi-pulsar-bundle
+1.7.0-SNAPSHOT
+
+
+nifi-pulsar-client-service
+jar
+
+
+
+org.apache.nifi
+nifi-pulsar-client-service-api
+1.6.0-SNAPSHOT
--- End diff --

Needs to be updated to 1.7.0-SNAPSHOT


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-10 Thread pvillard31
Github user pvillard31 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r180328802
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/pom.xml ---
@@ -0,0 +1,67 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+
+org.apache.nifi
+nifi-pulsar-bundle
+1.7.0-SNAPSHOT
+
+
+nifi-pulsar-client-service
+jar
+
+
+
+org.apache.nifi
+nifi-pulsar-client-service-api
+1.6.0-SNAPSHOT
+
+
+org.apache.nifi
+nifi-api
+provided
+
+
+org.apache.nifi
+nifi-processor-utils
+1.7.0-SNAPSHOT
+provided
+
+
+org.apache.nifi
+nifi-ssl-context-service-api
+provided
+
+
+org.apache.nifi
+nifi-mock
+1.6.0-SNAPSHOT
--- End diff --

Needs to be updated to 1.7.0-SNAPSHOT


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-10 Thread pvillard31
Github user pvillard31 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r180329047
  
--- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-nar/pom.xml ---
@@ -0,0 +1,50 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+
+org.apache.nifi
+nifi-pulsar-bundle
+1.7.0-SNAPSHOT
+
+
+nifi-pulsar-nar
+nar
+
+
+
+org.apache.nifi
+nifi-pulsar-client-service-api-nar
+1.6.0-SNAPSHOT
+nar
+   
+   
+
+org.apache.nifi
+nifi-pulsar-processors
+1.6.0-SNAPSHOT
+
+
+   
+   org.apache.nifi
+   nifi-pulsar-client-service
+   1.6.0-SNAPSHOT
--- End diff --

Needs to be updated to 1.7.0-SNAPSHOT
and indentation to fix


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-10 Thread pvillard31
Github user pvillard31 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r180329019
  
--- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-nar/pom.xml ---
@@ -0,0 +1,50 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+
+org.apache.nifi
+nifi-pulsar-bundle
+1.7.0-SNAPSHOT
+
+
+nifi-pulsar-nar
+nar
+
+
+
+org.apache.nifi
+nifi-pulsar-client-service-api-nar
+1.6.0-SNAPSHOT
+nar
+   
+   
+
+org.apache.nifi
+nifi-pulsar-processors
+1.6.0-SNAPSHOT
--- End diff --

Needs to be updated to 1.7.0-SNAPSHOT


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-10 Thread pvillard31
Github user pvillard31 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r180328729
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-nar/pom.xml ---
@@ -0,0 +1,35 @@
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+  4.0.0
+  
+  
+org.apache.nifi
+nifi-pulsar-bundle
+1.7.0-SNAPSHOT
+  
+  
+  nifi-pulsar-client-service-api-nar
+  nar
+  
+  
+   
+org.apache.nifi
+nifi-pulsar-client-service-api
+1.6.0-SNAPSHOT
--- End diff --

Needs to be updated to 1.7.0-SNAPSHOT


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-10 Thread pvillard31
Github user pvillard31 commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r180328937
  
--- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-nar/pom.xml ---
@@ -0,0 +1,50 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+
+org.apache.nifi
+nifi-pulsar-bundle
+1.7.0-SNAPSHOT
+
+
+nifi-pulsar-nar
+nar
+
+
+
+org.apache.nifi
+nifi-pulsar-client-service-api-nar
+1.6.0-SNAPSHOT
--- End diff --

Needs to be updated to 1.7.0-SNAPSHOT
and indentation to fix


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-06 Thread david-streamlio
GitHub user david-streamlio opened a pull request:

https://github.com/apache/nifi/pull/2614

Added Apache Pulsar Processors and Controller Service

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [ ] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [ ] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/david-streamlio/nifi NIFI-4914

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2614.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2614


commit bcfdae70506e4b1f2aab03a8bcfc9f55f859597c
Author: David Kjerrumgaard 
Date:   2018-04-06T22:42:30Z

Added Apache Pulsar Processors and Controller Service




---