[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183233581
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord_1_X.java
 ---
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+
+@CapabilityDescription("Consumes messages from Apache Pulsar specifically 
built against the Pulsar 1.x Consumer API. "
++ "The complementary NiFi processor for sending messages is 
PublishPulsarRecord_1_0. Please note that, at this time, "
++ "the Processor assumes that all records that are retrieved from 
a given partition have the same schema. If any "
++ "of the Pulsar messages that are pulled but cannot be parsed or 
written with the configured Record Reader or "
++ "Record Writer, the contents of the message will be written to a 
separate FlowFile, and that FlowFile will be transferred to the "
++ "'parse.failure' relationship. Otherwise, each FlowFile is sent 
to the 'success' relationship and may contain many individual "
++ "messages within the single FlowFile. A 'record.count' attribute 
is added to indicate how many messages are contained in the "
++ "FlowFile. No two Pulsar messages will be placed into the same 
FlowFile if they have different schemas.")
+@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", 
"Ingress", "Topic", "PubSub", "Consume"})
+@WritesAttributes({
+@WritesAttribute(attribute = "record.count", description = "The number 
of records received")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183234943
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+
+@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", 
"Send", "Message", "PubSub", "1.0"})
+@CapabilityDescription("Sends the contents of a FlowFile as individual 
records to Apache Pulsar using the Pulsar 1.x client API. "
++ "The contents of the FlowFile are expected to be record-oriented 
data that can be read by the configured Record Reader. "
++ "The complementary NiFi processor for fetching messages is 
ConsumePulsarRecord_1_0.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Pulsar for this FlowFile. This attribute is added 
only to "
++ "FlowFiles that are routed to success.")
+@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, 
ConsumePulsarRecord_1_X.class})
+public class PublishPulsarRecord_1_X extends 
AbstractPulsarProducerProcessor {
+
+private static final List PROPERTIES;
+private static final Set RELATIONSHIPS;
+
+static {
+final List properties = new ArrayList<>();
+properties.add(PULSAR_CLIENT_SERVICE);
+properties.add(RECORD_READER);
+properties.add(RECORD_WRITER);
+properties.add(TOPIC);
+properties.add(ASYNC_ENABLED);
+properties.add(MAX_ASYNC_REQUESTS);
+properties.add(BATCHING_ENABLED);
+

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183235109
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+
+@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", 
"Send", "Message", "PubSub", "1.0"})
+@CapabilityDescription("Sends the contents of a FlowFile as individual 
records to Apache Pulsar using the Pulsar 1.x client API. "
++ "The contents of the FlowFile are expected to be record-oriented 
data that can be read by the configured Record Reader. "
++ "The complementary NiFi processor for fetching messages is 
ConsumePulsarRecord_1_0.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Pulsar for this FlowFile. This attribute is added 
only to "
++ "FlowFiles that are routed to success.")
+@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, 
ConsumePulsarRecord_1_X.class})
+public class PublishPulsarRecord_1_X extends 
AbstractPulsarProducerProcessor {
+
+private static final List PROPERTIES;
+private static final Set RELATIONSHIPS;
+
+static {
+final List properties = new ArrayList<>();
+properties.add(PULSAR_CLIENT_SERVICE);
+properties.add(RECORD_READER);
+properties.add(RECORD_WRITER);
+properties.add(TOPIC);
+properties.add(ASYNC_ENABLED);
+properties.add(MAX_ASYNC_REQUESTS);
+properties.add(BATCHING_ENABLED);
+

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183234762
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_X extends AbstractPulsarConsumerProcessor {
+
+private static final List PROPERTIES;
+private static final Set RELATIONSHIPS;
+
+static {
+final List properties = new ArrayList<>();
+properties.add(PULSAR_CLIENT_SERVICE);
+properties.add(TOPIC);
+properties.add(SUBSCRIPTION);
+properties.add(ASYNC_ENABLED);
+properties.add(MAX_ASYNC_REQUESTS);
+properties.add(ACK_TIMEOUT);
+properties.add(PRIORITY_LEVEL);
+properties.add(RECEIVER_QUEUE_SIZE);
+properties.add(SUBSCRIPTION_TYPE);
+properties.add(MAX_WAIT_TIME);
+
+PROPERTIES = Collections.unmodifiableList(properties);
+
+final Set relationships = new HashSet<>();
+relationships.add(REL_SUCCESS);
+RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+}
+
+@Override
+public Set getRelationships() {
+return RELATIONSHIPS;
+}
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+return PROPERTIES;
+}
+
+@Override
+public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+
+try {
+if (context.getProperty(ASYNC_ENABLED).asBoolean()) {
+// Launch consumers
+consumeAsync(context, session);
+
+// Handle completed consumers
+handleAsync(context, session);
+
+} else {
+consume(context, session);
+}
+} catch (PulsarClientException e) {
+getLogger().error("Unable to consume from Pulsar Topic ", e);
+context.yield();
+throw new ProcessException(e);
+}
+
+}
+
+private void handleAsync(ProcessContext context, ProcessSession 
session) {
+
+try {
+Future done = consumerService.take();
+Message msg = done.get();
+
+if 

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183209203
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+@Tags({ "Pulsar"})
+@CapabilityDescription("Standard ControllerService implementation of 
PulsarClientService.")
+public class StandardPulsarClientPool extends AbstractControllerService 
implements PulsarClientPool {
+
+public static final PropertyDescriptor PULSAR_SERVICE_URL = new 
PropertyDescriptor
+.Builder().name("PULSAR_SERVICE_URL")
+.displayName("Pulsar Service URL")
+.description("URL for the Pulsar cluster, e.g localhost:6650")
+.required(true)
+.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = 
new PropertyDescriptor.Builder()
+.name("Maximum concurrent lookup-requests")
+.description("Number of concurrent lookup-requests allowed on 
each broker-connection to prevent "
++ "overload on broker. (default: 5000) It should be 
configured with higher value only in case "
++ "of it requires to produce/subscribe on thousands of 
topics")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("5000")
+.build();
+
+public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new 
PropertyDescriptor.Builder()
+.name("Maximum connects per Pulsar broker")
+.description("Sets the max number of connection that the 
client library will open to a single broker.\n" +
+"By default, the connection pool will use a single 
connection for all the producers and consumers. " +
+"Increasing this parameter may improve throughput when 
using many producers over a high latency connection")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("1")
+.build();
+
+public static final PropertyDescriptor IO_THREADS = new 
PropertyDescriptor.Builder()
+.name("I/O Threads")
+.description("The number of threads to be used for handling 
connections to brokers (default: 1 thread)")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183234572
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord_1_X.java
 ---
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+
+@CapabilityDescription("Consumes messages from Apache Pulsar specifically 
built against the Pulsar 1.x Consumer API. "
++ "The complementary NiFi processor for sending messages is 
PublishPulsarRecord_1_0. Please note that, at this time, "
++ "the Processor assumes that all records that are retrieved from 
a given partition have the same schema. If any "
++ "of the Pulsar messages that are pulled but cannot be parsed or 
written with the configured Record Reader or "
++ "Record Writer, the contents of the message will be written to a 
separate FlowFile, and that FlowFile will be transferred to the "
++ "'parse.failure' relationship. Otherwise, each FlowFile is sent 
to the 'success' relationship and may contain many individual "
++ "messages within the single FlowFile. A 'record.count' attribute 
is added to indicate how many messages are contained in the "
++ "FlowFile. No two Pulsar messages will be placed into the same 
FlowFile if they have different schemas.")
+@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", 
"Ingress", "Topic", "PubSub", "Consume"})
+@WritesAttributes({
+@WritesAttribute(attribute = "record.count", description = "The number 
of records received")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183234826
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_X extends AbstractPulsarConsumerProcessor {
+
+private static final List PROPERTIES;
+private static final Set RELATIONSHIPS;
+
+static {
+final List properties = new ArrayList<>();
+properties.add(PULSAR_CLIENT_SERVICE);
+properties.add(TOPIC);
+properties.add(SUBSCRIPTION);
+properties.add(ASYNC_ENABLED);
+properties.add(MAX_ASYNC_REQUESTS);
+properties.add(ACK_TIMEOUT);
+properties.add(PRIORITY_LEVEL);
+properties.add(RECEIVER_QUEUE_SIZE);
+properties.add(SUBSCRIPTION_TYPE);
+properties.add(MAX_WAIT_TIME);
+
+PROPERTIES = Collections.unmodifiableList(properties);
+
+final Set relationships = new HashSet<>();
+relationships.add(REL_SUCCESS);
+RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+}
+
+@Override
+public Set getRelationships() {
+return RELATIONSHIPS;
+}
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+return PROPERTIES;
+}
+
+@Override
+public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+
+try {
+if (context.getProperty(ASYNC_ENABLED).asBoolean()) {
+// Launch consumers
+consumeAsync(context, session);
+
+// Handle completed consumers
+handleAsync(context, session);
+
+} else {
+consume(context, session);
+}
+} catch (PulsarClientException e) {
+getLogger().error("Unable to consume from Pulsar Topic ", e);
+context.yield();
+throw new ProcessException(e);
+}
+
+}
+
+private void handleAsync(ProcessContext context, ProcessSession 
session) {
+
+try {
+Future done = consumerService.take();
+Message msg = done.get();
+
+if 

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183235065
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+
+@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", 
"Send", "Message", "PubSub", "1.0"})
+@CapabilityDescription("Sends the contents of a FlowFile as individual 
records to Apache Pulsar using the Pulsar 1.x client API. "
++ "The contents of the FlowFile are expected to be record-oriented 
data that can be read by the configured Record Reader. "
++ "The complementary NiFi processor for fetching messages is 
ConsumePulsarRecord_1_0.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Pulsar for this FlowFile. This attribute is added 
only to "
++ "FlowFiles that are routed to success.")
+@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, 
ConsumePulsarRecord_1_X.class})
+public class PublishPulsarRecord_1_X extends 
AbstractPulsarProducerProcessor {
+
+private static final List PROPERTIES;
+private static final Set RELATIONSHIPS;
+
+static {
+final List properties = new ArrayList<>();
+properties.add(PULSAR_CLIENT_SERVICE);
+properties.add(RECORD_READER);
+properties.add(RECORD_WRITER);
+properties.add(TOPIC);
+properties.add(ASYNC_ENABLED);
+properties.add(MAX_ASYNC_REQUESTS);
+properties.add(BATCHING_ENABLED);
+

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183209029
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-nar/src/main/resources/META-INF/NOTICE
 ---
@@ -0,0 +1,612 @@
+nifi-druid-controller-service-api-nar
--- End diff --

Needs to be changed to the pulsar client service.


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183208823
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/PoolableResource.java
 ---
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar.pool;
+
+/**
+ * Service interface for any object that can be pooled for re-use., which
+ * defines methods for closing the object, effectively marking it no longer
+ * usable.
+ *
+ * @author david
--- End diff --

Please remove.


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183208994
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/ResourceFactory.java
 ---
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar.pool;
+
+import java.util.Properties;
+
+/**
+ * Factory pattern interface for @PoolableResource objects. Concrete 
implementations
+ * of this interface will be responsible for the creation of 
@PoolableResource objects
+ * based on the Properties passed in.
+ *
+ * @author david
--- End diff --

Please remove


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183210297
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarProducerProcessor.java
 ---
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar;
+
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.nifi.pulsar.PulsarProducer;
+import org.apache.nifi.pulsar.cache.LRUCache;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.PulsarClientException;
+import 
org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+
+public abstract class AbstractPulsarProducerProcessor extends 
AbstractPulsarProcessor {
+
+public static final String MSG_COUNT = "msg.count";
+public static final String TOPIC_NAME = "topic.name";
+
+static final AllowableValue COMPRESSION_TYPE_NONE = new 
AllowableValue("NONE", "None", "No compression");
+static final AllowableValue COMPRESSION_TYPE_LZ4 = new 
AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm.");
+static final AllowableValue COMPRESSION_TYPE_ZLIB = new 
AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm");
+
+static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = 
new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a 
custom partition");
+static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION 
= new AllowableValue("RoundRobinPartition", "Round Robin Partition", "Route 
messages to all "
+   
+ "partitions in a round robin 
manner");
+static final AllowableValue MESSAGE_ROUTING_MODE_SINGLE_PARTITION = 
new AllowableValue("SinglePartition", "Single Partition", "Route messages to a 
single partition");
+
+public static final PropertyDescriptor TOPIC = new 
PropertyDescriptor.Builder()
+.name("topic")
+.displayName("Topic Name")
+.description("The name of the Pulsar Topic.")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+.build();
+
+public static final PropertyDescriptor ASYNC_ENABLED = new 
PropertyDescriptor.Builder()
+.name("Async Enabled")
+.description("Control whether the messages will be sent 
asyncronously or not. Messages sent"
++ " syncronously will be acknowledged immediately 
before processing the next 

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183234732
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord_1_X.java
 ---
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+
+@CapabilityDescription("Consumes messages from Apache Pulsar specifically 
built against the Pulsar 1.x Consumer API. "
++ "The complementary NiFi processor for sending messages is 
PublishPulsarRecord_1_0. Please note that, at this time, "
++ "the Processor assumes that all records that are retrieved from 
a given partition have the same schema. If any "
++ "of the Pulsar messages that are pulled but cannot be parsed or 
written with the configured Record Reader or "
++ "Record Writer, the contents of the message will be written to a 
separate FlowFile, and that FlowFile will be transferred to the "
++ "'parse.failure' relationship. Otherwise, each FlowFile is sent 
to the 'success' relationship and may contain many individual "
++ "messages within the single FlowFile. A 'record.count' attribute 
is added to indicate how many messages are contained in the "
++ "FlowFile. No two Pulsar messages will be placed into the same 
FlowFile if they have different schemas.")
+@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", 
"Ingress", "Topic", "PubSub", "Consume"})
+@WritesAttributes({
+@WritesAttribute(attribute = "record.count", description = "The number 
of records received")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183234849
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_X extends AbstractPulsarConsumerProcessor {
+
+private static final List PROPERTIES;
+private static final Set RELATIONSHIPS;
+
+static {
+final List properties = new ArrayList<>();
+properties.add(PULSAR_CLIENT_SERVICE);
+properties.add(TOPIC);
+properties.add(SUBSCRIPTION);
+properties.add(ASYNC_ENABLED);
+properties.add(MAX_ASYNC_REQUESTS);
+properties.add(ACK_TIMEOUT);
+properties.add(PRIORITY_LEVEL);
+properties.add(RECEIVER_QUEUE_SIZE);
+properties.add(SUBSCRIPTION_TYPE);
+properties.add(MAX_WAIT_TIME);
+
+PROPERTIES = Collections.unmodifiableList(properties);
+
+final Set relationships = new HashSet<>();
+relationships.add(REL_SUCCESS);
+RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+}
+
+@Override
+public Set getRelationships() {
+return RELATIONSHIPS;
+}
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+return PROPERTIES;
+}
+
+@Override
+public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+
+try {
+if (context.getProperty(ASYNC_ENABLED).asBoolean()) {
+// Launch consumers
+consumeAsync(context, session);
+
+// Handle completed consumers
+handleAsync(context, session);
+
+} else {
+consume(context, session);
+}
+} catch (PulsarClientException e) {
+getLogger().error("Unable to consume from Pulsar Topic ", e);
+context.yield();
+throw new ProcessException(e);
+}
+
+}
+
+private void handleAsync(ProcessContext context, ProcessSession 
session) {
+
+try {
+Future done = consumerService.take();
+Message msg = done.get();
+
+if 

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183235123
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+
+@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", 
"Send", "Message", "PubSub", "1.0"})
+@CapabilityDescription("Sends the contents of a FlowFile as individual 
records to Apache Pulsar using the Pulsar 1.x client API. "
++ "The contents of the FlowFile are expected to be record-oriented 
data that can be read by the configured Record Reader. "
++ "The complementary NiFi processor for fetching messages is 
ConsumePulsarRecord_1_0.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Pulsar for this FlowFile. This attribute is added 
only to "
++ "FlowFiles that are routed to success.")
+@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, 
ConsumePulsarRecord_1_X.class})
+public class PublishPulsarRecord_1_X extends 
AbstractPulsarProducerProcessor {
+
+private static final List PROPERTIES;
+private static final Set RELATIONSHIPS;
+
+static {
+final List properties = new ArrayList<>();
+properties.add(PULSAR_CLIENT_SERVICE);
+properties.add(RECORD_READER);
+properties.add(RECORD_WRITER);
+properties.add(TOPIC);
+properties.add(ASYNC_ENABLED);
+properties.add(MAX_ASYNC_REQUESTS);
+properties.add(BATCHING_ENABLED);
+

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183234534
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord_1_X.java
 ---
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+
+@CapabilityDescription("Consumes messages from Apache Pulsar specifically 
built against the Pulsar 1.x Consumer API. "
++ "The complementary NiFi processor for sending messages is 
PublishPulsarRecord_1_0. Please note that, at this time, "
++ "the Processor assumes that all records that are retrieved from 
a given partition have the same schema. If any "
++ "of the Pulsar messages that are pulled but cannot be parsed or 
written with the configured Record Reader or "
++ "Record Writer, the contents of the message will be written to a 
separate FlowFile, and that FlowFile will be transferred to the "
++ "'parse.failure' relationship. Otherwise, each FlowFile is sent 
to the 'success' relationship and may contain many individual "
++ "messages within the single FlowFile. A 'record.count' attribute 
is added to indicate how many messages are contained in the "
++ "FlowFile. No two Pulsar messages will be placed into the same 
FlowFile if they have different schemas.")
+@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", 
"Ingress", "Topic", "PubSub", "Consume"})
+@WritesAttributes({
+@WritesAttribute(attribute = "record.count", description = "The number 
of records received")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183235163
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsar_1_X.java
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"})
+@CapabilityDescription("Sends the contents of a FlowFile as a message to 
Apache Pulsar using the Pulsar 1.X Producer API."
++ "The messages to send may be individual FlowFiles or may be 
delimited, using a "
++ "user-specified delimiter, such as a new-line. "
++ "The complementary NiFi processor for fetching messages is 
ConsumePulsar_1_X.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Pulsar for this FlowFile. This attribute is added 
only to "
++ "FlowFiles that are routed to success.")
+public class PublishPulsar_1_X extends AbstractPulsarProducerProcessor {
+
+private static final List PROPERTIES;
+private static final Set RELATIONSHIPS;
+
+static {
+final List properties = new ArrayList<>();
+properties.add(PULSAR_CLIENT_SERVICE);
+properties.add(TOPIC);
+properties.add(ASYNC_ENABLED);
+properties.add(MAX_ASYNC_REQUESTS);
+properties.add(BATCHING_ENABLED);
+properties.add(BATCHING_MAX_MESSAGES);
+properties.add(BATCH_INTERVAL);
+properties.add(BLOCK_IF_QUEUE_FULL);
+properties.add(COMPRESSION_TYPE);
+properties.add(MESSAGE_ROUTING_MODE);
+properties.add(PENDING_MAX_MESSAGES);
+
+PROPERTIES = Collections.unmodifiableList(properties);
+
+final Set relationships = new HashSet<>();
+relationships.add(REL_SUCCESS);
+relationships.add(REL_FAILURE);
+RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+}
+
+@Override
+public Set getRelationships() {
+return RELATIONSHIPS;
+}
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+return PROPERTIES;
+}
+
+@Override
+public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+
+FlowFile flowFile = session.get();
+
+if (flowFile == null) {
+return;
+}
+
+final ComponentLog logger = getLogger();
+final String topic = 
context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+
+if (StringUtils.isBlank(topic)) {
+

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183234539
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord_1_X.java
 ---
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+
+@CapabilityDescription("Consumes messages from Apache Pulsar specifically 
built against the Pulsar 1.x Consumer API. "
++ "The complementary NiFi processor for sending messages is 
PublishPulsarRecord_1_0. Please note that, at this time, "
++ "the Processor assumes that all records that are retrieved from 
a given partition have the same schema. If any "
++ "of the Pulsar messages that are pulled but cannot be parsed or 
written with the configured Record Reader or "
++ "Record Writer, the contents of the message will be written to a 
separate FlowFile, and that FlowFile will be transferred to the "
++ "'parse.failure' relationship. Otherwise, each FlowFile is sent 
to the 'success' relationship and may contain many individual "
++ "messages within the single FlowFile. A 'record.count' attribute 
is added to indicate how many messages are contained in the "
++ "FlowFile. No two Pulsar messages will be placed into the same 
FlowFile if they have different schemas.")
+@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", 
"Ingress", "Topic", "PubSub", "Consume"})
+@WritesAttributes({
+@WritesAttribute(attribute = "record.count", description = "The number 
of records received")
+})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183208835
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientPool.java
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+
+
+@Tags({"Pulsar"})
--- End diff --

Could use others like "client" and "pool."


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183234861
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/InFlightMessageMonitor.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Helper class to monitor the asynchronous submission of a large number
+ * of records to Apache Pulsar.
+ *
+ * @author david
--- End diff --

Please remove.


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183234779
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_X extends AbstractPulsarConsumerProcessor {
+
+private static final List PROPERTIES;
+private static final Set RELATIONSHIPS;
+
+static {
+final List properties = new ArrayList<>();
+properties.add(PULSAR_CLIENT_SERVICE);
+properties.add(TOPIC);
+properties.add(SUBSCRIPTION);
+properties.add(ASYNC_ENABLED);
+properties.add(MAX_ASYNC_REQUESTS);
+properties.add(ACK_TIMEOUT);
+properties.add(PRIORITY_LEVEL);
+properties.add(RECEIVER_QUEUE_SIZE);
+properties.add(SUBSCRIPTION_TYPE);
+properties.add(MAX_WAIT_TIME);
+
+PROPERTIES = Collections.unmodifiableList(properties);
+
+final Set relationships = new HashSet<>();
+relationships.add(REL_SUCCESS);
+RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+}
+
+@Override
+public Set getRelationships() {
+return RELATIONSHIPS;
+}
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+return PROPERTIES;
+}
+
+@Override
+public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+
+try {
+if (context.getProperty(ASYNC_ENABLED).asBoolean()) {
+// Launch consumers
+consumeAsync(context, session);
+
+// Handle completed consumers
+handleAsync(context, session);
+
+} else {
+consume(context, session);
+}
+} catch (PulsarClientException e) {
+getLogger().error("Unable to consume from Pulsar Topic ", e);
+context.yield();
+throw new ProcessException(e);
+}
+
+}
+
+private void handleAsync(ProcessContext context, ProcessSession 
session) {
+
+try {
+Future done = consumerService.take();
+Message msg = done.get();
+
+if 

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183209099
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+@Tags({ "Pulsar"})
+@CapabilityDescription("Standard ControllerService implementation of 
PulsarClientService.")
+public class StandardPulsarClientPool extends AbstractControllerService 
implements PulsarClientPool {
+
+public static final PropertyDescriptor PULSAR_SERVICE_URL = new 
PropertyDescriptor
+.Builder().name("PULSAR_SERVICE_URL")
+.displayName("Pulsar Service URL")
+.description("URL for the Pulsar cluster, e.g localhost:6650")
+.required(true)
+.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = 
new PropertyDescriptor.Builder()
+.name("Maximum concurrent lookup-requests")
+.description("Number of concurrent lookup-requests allowed on 
each broker-connection to prevent "
++ "overload on broker. (default: 5000) It should be 
configured with higher value only in case "
++ "of it requires to produce/subscribe on thousands of 
topics")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("5000")
+.build();
+
+public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new 
PropertyDescriptor.Builder()
+.name("Maximum connects per Pulsar broker")
+.description("Sets the max number of connection that the 
client library will open to a single broker.\n" +
+"By default, the connection pool will use a single 
connection for all the producers and consumers. " +
+"Increasing this parameter may improve throughput when 
using many producers over a high latency connection")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
--- End diff --

> Consider adding 
expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).

In fact, you might want to consider that for all of these properties so you 
can make it more customizable.


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183235146
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+
+@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", 
"Send", "Message", "PubSub", "1.0"})
+@CapabilityDescription("Sends the contents of a FlowFile as individual 
records to Apache Pulsar using the Pulsar 1.x client API. "
++ "The contents of the FlowFile are expected to be record-oriented 
data that can be read by the configured Record Reader. "
++ "The complementary NiFi processor for fetching messages is 
ConsumePulsarRecord_1_0.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Pulsar for this FlowFile. This attribute is added 
only to "
++ "FlowFiles that are routed to success.")
+@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, 
ConsumePulsarRecord_1_X.class})
+public class PublishPulsarRecord_1_X extends 
AbstractPulsarProducerProcessor {
+
+private static final List PROPERTIES;
+private static final Set RELATIONSHIPS;
+
+static {
+final List properties = new ArrayList<>();
+properties.add(PULSAR_CLIENT_SERVICE);
+properties.add(RECORD_READER);
+properties.add(RECORD_WRITER);
+properties.add(TOPIC);
+properties.add(ASYNC_ENABLED);
+properties.add(MAX_ASYNC_REQUESTS);
+properties.add(BATCHING_ENABLED);
+

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183235215
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/RecordBasedConst.java
 ---
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+public final class RecordBasedConst {
+
+public static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+.name("record-reader")
+.displayName("Record Reader")
+.description("The Record Reader to use for incoming FlowFiles")
+.identifiesControllerService(RecordReaderFactory.class)
+.expressionLanguageSupported(false)
+.required(true)
+.build();
+
+public static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+.name("record-writer")
+.displayName("Record Writer")
+.description("The Record Writer to use in order to serialize 
the data before sending to Pulsar")
+.identifiesControllerService(RecordSetWriterFactory.class)
+.expressionLanguageSupported(false)
+.required(true)
+.build();
+
+private RecordBasedConst() {
--- End diff --

Not needed.


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183235033
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java
 ---
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER;
+import static 
org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.StringUtils;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+
+@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", 
"Send", "Message", "PubSub", "1.0"})
+@CapabilityDescription("Sends the contents of a FlowFile as individual 
records to Apache Pulsar using the Pulsar 1.x client API. "
++ "The contents of the FlowFile are expected to be record-oriented 
data that can be read by the configured Record Reader. "
++ "The complementary NiFi processor for fetching messages is 
ConsumePulsarRecord_1_0.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@WritesAttribute(attribute = "msg.count", description = "The number of 
messages that were sent to Pulsar for this FlowFile. This attribute is added 
only to "
++ "FlowFiles that are routed to success.")
+@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, 
ConsumePulsarRecord_1_X.class})
+public class PublishPulsarRecord_1_X extends 
AbstractPulsarProducerProcessor {
+
+private static final List PROPERTIES;
+private static final Set RELATIONSHIPS;
+
+static {
+final List properties = new ArrayList<>();
+properties.add(PULSAR_CLIENT_SERVICE);
+properties.add(RECORD_READER);
+properties.add(RECORD_WRITER);
+properties.add(TOPIC);
+properties.add(ASYNC_ENABLED);
+properties.add(MAX_ASYNC_REQUESTS);
+properties.add(BATCHING_ENABLED);
+

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183234838
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.pulsar.pubsub;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor;
+import org.apache.nifi.pulsar.PulsarClientPool;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"})
+@CapabilityDescription("Consumes messages from Apache Pulsar "
++ "The complementary NiFi processor for sending messages is 
PublishPulsar.")
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+public class ConsumePulsar_1_X extends AbstractPulsarConsumerProcessor {
+
+private static final List PROPERTIES;
+private static final Set RELATIONSHIPS;
+
+static {
+final List properties = new ArrayList<>();
+properties.add(PULSAR_CLIENT_SERVICE);
+properties.add(TOPIC);
+properties.add(SUBSCRIPTION);
+properties.add(ASYNC_ENABLED);
+properties.add(MAX_ASYNC_REQUESTS);
+properties.add(ACK_TIMEOUT);
+properties.add(PRIORITY_LEVEL);
+properties.add(RECEIVER_QUEUE_SIZE);
+properties.add(SUBSCRIPTION_TYPE);
+properties.add(MAX_WAIT_TIME);
+
+PROPERTIES = Collections.unmodifiableList(properties);
+
+final Set relationships = new HashSet<>();
+relationships.add(REL_SUCCESS);
+RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+}
+
+@Override
+public Set getRelationships() {
+return RELATIONSHIPS;
+}
+
+@Override
+protected List getSupportedPropertyDescriptors() {
+return PROPERTIES;
+}
+
+@Override
+public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+
+try {
+if (context.getProperty(ASYNC_ENABLED).asBoolean()) {
+// Launch consumers
+consumeAsync(context, session);
+
+// Handle completed consumers
+handleAsync(context, session);
+
+} else {
+consume(context, session);
+}
+} catch (PulsarClientException e) {
+getLogger().error("Unable to consume from Pulsar Topic ", e);
+context.yield();
+throw new ProcessException(e);
+}
+
+}
+
+private void handleAsync(ProcessContext context, ProcessSession 
session) {
+
+try {
+Future done = consumerService.take();
+Message msg = done.get();
+
+if 

[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183209046
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+@Tags({ "Pulsar"})
--- End diff --

Should be fleshed out with more tags.


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183235251
  
--- Diff: nifi-nar-bundles/pom.xml ---
@@ -93,6 +93,7 @@
 nifi-spark-bundle
 nifi-atlas-bundle
 nifi-druid-bundle
+nifi-pulsar-bundle
--- End diff --

You need to add a dependency declaration for the NAR in 
`nifi-assembly/pom.xml`. Just putting that note here since there's no really 
good place to track that in the review.


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183208828
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientPool.java
 ---
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+
+
+@Tags({"Pulsar"})
+@CapabilityDescription("Provides the ability to create Pulsar Producer / 
Consumer instances on demand, based on the configuration."
+ + "properties defined")
+/**
+ * Service definition for apache Pulsar Client ControllerService
+ * responsible for maintaining a pool of @PulsarProducer and
+ * @PulsarConsumer objects.
+ *
+ * Since both of these objects can be reused, in a manner similar
+ * to database connections, and the cost to create these objects is
+ * relatively high. The PulsarClientPool keeps these objects in pools
+ * for re-use.
+ *
+ * @author david
--- End diff --

Please remove.


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183209058
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+@Tags({ "Pulsar"})
+@CapabilityDescription("Standard ControllerService implementation of 
PulsarClientService.")
+public class StandardPulsarClientPool extends AbstractControllerService 
implements PulsarClientPool {
+
+public static final PropertyDescriptor PULSAR_SERVICE_URL = new 
PropertyDescriptor
+.Builder().name("PULSAR_SERVICE_URL")
+.displayName("Pulsar Service URL")
+.description("URL for the Pulsar cluster, e.g localhost:6650")
+.required(true)
+.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
--- End diff --

Consider adding 
`expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)`.


---


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183209071
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java
 ---
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.pulsar;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.pulsar.pool.PulsarConsumerFactory;
+import org.apache.nifi.pulsar.pool.PulsarProducerFactory;
+import org.apache.nifi.pulsar.pool.ResourcePool;
+import org.apache.nifi.pulsar.pool.ResourcePoolImpl;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.pulsar.client.api.ClientConfiguration;
+import org.apache.pulsar.client.api.PulsarClient;
+import 
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.auth.AuthenticationTls;
+
+@Tags({ "Pulsar"})
+@CapabilityDescription("Standard ControllerService implementation of 
PulsarClientService.")
+public class StandardPulsarClientPool extends AbstractControllerService 
implements PulsarClientPool {
+
+public static final PropertyDescriptor PULSAR_SERVICE_URL = new 
PropertyDescriptor
+.Builder().name("PULSAR_SERVICE_URL")
+.displayName("Pulsar Service URL")
+.description("URL for the Pulsar cluster, e.g localhost:6650")
+.required(true)
+.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+.build();
+
+public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = 
new PropertyDescriptor.Builder()
+.name("Maximum concurrent lookup-requests")
+.description("Number of concurrent lookup-requests allowed on 
each broker-connection to prevent "
++ "overload on broker. (default: 5000) It should be 
configured with higher value only in case "
++ "of it requires to produce/subscribe on thousands of 
topics")
+.required(false)
+.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+.defaultValue("5000")
--- End diff --

Are you sure this is a sane default? Your description suggests that 5,000 
is the upper end of where most users would want to be.


---


[GitHub] nifi pull request #2638: NIFI-5082: Added support for custom Oracle timestam...

2018-04-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/2638


---


[jira] [Commented] (NIFI-5082) SQL processors do not handle Avro conversion of Oracle timestamps correctly

2018-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-5082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447423#comment-16447423
 ] 

ASF GitHub Bot commented on NIFI-5082:
--

Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/2638


> SQL processors do not handle Avro conversion of Oracle timestamps correctly
> ---
>
> Key: NIFI-5082
> URL: https://issues.apache.org/jira/browse/NIFI-5082
> Project: Apache NiFi
>  Issue Type: Bug
>Reporter: Matt Burgess
>Assignee: Matt Burgess
>Priority: Major
>
> In JdbcCommon (used by such processors as ExecuteSQL and QueryDatabaseTable), 
> if a ResultSet column is not a CLOB or BLOB, its value is retrieved using 
> getObject(), then further processing is done based on the SQL type and/or the 
> Java class of the value.
> However, in Oracle when getObject() is called on a Timestamp column, it 
> returns an Oracle-specific TIMESTAMP class which does not inherit from 
> java.sql.Timestamp or java.sql.Date. Thus the processing "falls through" and 
> its value is attempted to be inserted as a string, which violates the Avro 
> schema (which correctly recognized it as a long of timestamp logical type).
> At least for Oracle, the right way to process a Timestamp column is to call 
> getTimestamp() rather than getObject(), the former returns a 
> java.sql.Timestamp object which would correctly be processed by the current 
> code. I would hope that all drivers would support this but we would want to 
> test on (at least) MySQL, Oracle, and PostgreSQL.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5082) SQL processors do not handle Avro conversion of Oracle timestamps correctly

2018-04-22 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-5082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447422#comment-16447422
 ] 

ASF subversion and git services commented on NIFI-5082:
---

Commit ba32879ec81cdfa3c44734ab8b210b13fa0581ac in nifi's branch 
refs/heads/master from [~ca9mbu]
[ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=ba32879 ]

NIFI-5082: Added support for custom Oracle timestamp types to Avro conversion

This closes #2638

Signed-off-by: Mike Thomsen 


> SQL processors do not handle Avro conversion of Oracle timestamps correctly
> ---
>
> Key: NIFI-5082
> URL: https://issues.apache.org/jira/browse/NIFI-5082
> Project: Apache NiFi
>  Issue Type: Bug
>Reporter: Matt Burgess
>Assignee: Matt Burgess
>Priority: Major
>
> In JdbcCommon (used by such processors as ExecuteSQL and QueryDatabaseTable), 
> if a ResultSet column is not a CLOB or BLOB, its value is retrieved using 
> getObject(), then further processing is done based on the SQL type and/or the 
> Java class of the value.
> However, in Oracle when getObject() is called on a Timestamp column, it 
> returns an Oracle-specific TIMESTAMP class which does not inherit from 
> java.sql.Timestamp or java.sql.Date. Thus the processing "falls through" and 
> its value is attempted to be inserted as a string, which violates the Avro 
> schema (which correctly recognized it as a long of timestamp logical type).
> At least for Oracle, the right way to process a Timestamp column is to call 
> getTimestamp() rather than getObject(), the former returns a 
> java.sql.Timestamp object which would correctly be processed by the current 
> code. I would hope that all drivers would support this but we would want to 
> test on (at least) MySQL, Oracle, and PostgreSQL.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (NIFI-5106) Add provenance reporting to GetSolr

2018-04-22 Thread Johannes Peter (JIRA)
Johannes Peter created NIFI-5106:


 Summary: Add provenance reporting to GetSolr
 Key: NIFI-5106
 URL: https://issues.apache.org/jira/browse/NIFI-5106
 Project: Apache NiFi
  Issue Type: Improvement
Reporter: Johannes Peter
Assignee: Johannes Peter






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5106) Add provenance reporting to GetSolr

2018-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-5106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447329#comment-16447329
 ] 

ASF GitHub Bot commented on NIFI-5106:
--

Github user JohannesDaniel commented on the issue:

https://github.com/apache/nifi/pull/2650
  
@MikeThomsen Hi, just saw that GetSolr does not create receive provenance 
events when data is retrieved and written into flowfiles. Added that.


> Add provenance reporting to GetSolr
> ---
>
> Key: NIFI-5106
> URL: https://issues.apache.org/jira/browse/NIFI-5106
> Project: Apache NiFi
>  Issue Type: Improvement
>Reporter: Johannes Peter
>Assignee: Johannes Peter
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] nifi issue #2650: NIFI-5106 Add provenance reporting to GetSolr

2018-04-22 Thread JohannesDaniel
Github user JohannesDaniel commented on the issue:

https://github.com/apache/nifi/pull/2650
  
@MikeThomsen Hi, just saw that GetSolr does not create receive provenance 
events when data is retrieved and written into flowfiles. Added that.


---


[GitHub] nifi pull request #2638: NIFI-5082: Added support for custom Oracle timestam...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2638#discussion_r183256566
  
--- Diff: 
nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
 ---
@@ -350,6 +350,8 @@ private static RecordFieldType getFieldType(final int 
sqlType) {
 return RecordFieldType.TIME;
 case Types.TIMESTAMP:
 case Types.TIMESTAMP_WITH_TIMEZONE:
+case -101: // Oracle's TIMESTAMP WITH TIME ZONE
--- End diff --

@mattyb149 @markap14 Given how proprietary Oracle can be, I don't see how 
this could hurt because it's going to be difficult to say what they'll do in 
the future and how Calcite might respond to that.


---


[jira] [Commented] (NIFI-5082) SQL processors do not handle Avro conversion of Oracle timestamps correctly

2018-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-5082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447421#comment-16447421
 ] 

ASF GitHub Bot commented on NIFI-5082:
--

Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2638#discussion_r183256566
  
--- Diff: 
nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java
 ---
@@ -350,6 +350,8 @@ private static RecordFieldType getFieldType(final int 
sqlType) {
 return RecordFieldType.TIME;
 case Types.TIMESTAMP:
 case Types.TIMESTAMP_WITH_TIMEZONE:
+case -101: // Oracle's TIMESTAMP WITH TIME ZONE
--- End diff --

@mattyb149 @markap14 Given how proprietary Oracle can be, I don't see how 
this could hurt because it's going to be difficult to say what they'll do in 
the future and how Calcite might respond to that.


> SQL processors do not handle Avro conversion of Oracle timestamps correctly
> ---
>
> Key: NIFI-5082
> URL: https://issues.apache.org/jira/browse/NIFI-5082
> Project: Apache NiFi
>  Issue Type: Bug
>Reporter: Matt Burgess
>Assignee: Matt Burgess
>Priority: Major
>
> In JdbcCommon (used by such processors as ExecuteSQL and QueryDatabaseTable), 
> if a ResultSet column is not a CLOB or BLOB, its value is retrieved using 
> getObject(), then further processing is done based on the SQL type and/or the 
> Java class of the value.
> However, in Oracle when getObject() is called on a Timestamp column, it 
> returns an Oracle-specific TIMESTAMP class which does not inherit from 
> java.sql.Timestamp or java.sql.Date. Thus the processing "falls through" and 
> its value is attempted to be inserted as a string, which violates the Avro 
> schema (which correctly recognized it as a long of timestamp logical type).
> At least for Oracle, the right way to process a Timestamp column is to call 
> getTimestamp() rather than getObject(), the former returns a 
> java.sql.Timestamp object which would correctly be processed by the current 
> code. I would hope that all drivers would support this but we would want to 
> test on (at least) MySQL, Oracle, and PostgreSQL.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] nifi pull request #2650: NIFI-5106 Add provenance reporting to GetSolr

2018-04-22 Thread JohannesDaniel
GitHub user JohannesDaniel opened a pull request:

https://github.com/apache/nifi/pull/2650

NIFI-5106 Add provenance reporting to GetSolr

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [ ] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [ ] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/JohannesDaniel/nifi 
NIFI-5106-provenanceGetSolr

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2650.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2650


commit 43f326e2b78cd5a0d40d1647b3a28b640fe8e784
Author: JohannesDaniel 
Date:   2018-04-22T19:06:33Z

Added provenance reporting




---


[jira] [Commented] (NIFI-5106) Add provenance reporting to GetSolr

2018-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-5106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447327#comment-16447327
 ] 

ASF GitHub Bot commented on NIFI-5106:
--

GitHub user JohannesDaniel opened a pull request:

https://github.com/apache/nifi/pull/2650

NIFI-5106 Add provenance reporting to GetSolr

Thank you for submitting a contribution to Apache NiFi.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

### For all changes:
- [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
 in the commit message?

- [ ] Does your PR title start with NIFI- where  is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.

- [ ] Has your PR been rebased against the latest commit within the target 
branch (typically master)?

- [ ] Is your initial contribution a single, squashed commit?

### For code changes:
- [ ] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
- [ ] Have you written or updated unit tests to verify your changes?
- [ ] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under [ASF 
2.0](http://www.apache.org/legal/resolved.html#category-a)? 
- [ ] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
- [ ] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
- [ ] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?

### For documentation related changes:
- [ ] Have you ensured that format looks appropriate for the output in 
which it is rendered?

### Note:
Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/JohannesDaniel/nifi 
NIFI-5106-provenanceGetSolr

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/nifi/pull/2650.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2650


commit 43f326e2b78cd5a0d40d1647b3a28b640fe8e784
Author: JohannesDaniel 
Date:   2018-04-22T19:06:33Z

Added provenance reporting




> Add provenance reporting to GetSolr
> ---
>
> Key: NIFI-5106
> URL: https://issues.apache.org/jira/browse/NIFI-5106
> Project: Apache NiFi
>  Issue Type: Improvement
>Reporter: Johannes Peter
>Assignee: Johannes Peter
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (NIFI-5106) Add provenance reporting to GetSolr

2018-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-5106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447346#comment-16447346
 ] 

ASF GitHub Bot commented on NIFI-5106:
--

Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/2650


> Add provenance reporting to GetSolr
> ---
>
> Key: NIFI-5106
> URL: https://issues.apache.org/jira/browse/NIFI-5106
> Project: Apache NiFi
>  Issue Type: Improvement
>Reporter: Johannes Peter
>Assignee: Johannes Peter
>Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] nifi pull request #2650: NIFI-5106 Add provenance reporting to GetSolr

2018-04-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/nifi/pull/2650


---


[GitHub] nifi-minifi pull request #122: MINIFI-451 Updating Apache parent pom to vers...

2018-04-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/nifi-minifi/pull/122


---


[GitHub] nifi-minifi pull request #117: MINIFI-424 Adding the ability to evaluate boo...

2018-04-22 Thread JPercivall
Github user JPercivall commented on a diff in the pull request:

https://github.com/apache/nifi-minifi/pull/117#discussion_r183258883
  
--- Diff: 
minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java 
---
@@ -1176,11 +1191,18 @@ public boolean accept(final File dir, final String 
filename) {
 
 @SuppressWarnings({"rawtypes", "unchecked"})
 public void start() throws IOException, InterruptedException {
+Properties bootstrapProperties = getBootstrapProperties();
+
+final String confDir = 
bootstrapProperties.getProperty(CONF_DIR_KEY);
+final File configFile = new 
File(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY));
+final Properties transformationConfigProperties = 
getConfigTransformationProperties(bootstrapProperties);
+
+for(String name: 
transformationConfigProperties.stringPropertyNames()) {
+defaultLogger.error("config property: name:[" + name + "] 
value:[" + transformationConfigProperties.getProperty(name) + "] ");
--- End diff --

Most definitely better as `info`


---


[jira] [Commented] (NIFI-5105) Update AWS SDK (Spring 2018)

2018-04-22 Thread Otto Fowler (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-5105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447269#comment-16447269
 ] 

Otto Fowler commented on NIFI-5105:
---

I think this is a good idea.  In NIFI-5022 I had to update.

> Update AWS SDK (Spring 2018)
> 
>
> Key: NIFI-5105
> URL: https://issues.apache.org/jira/browse/NIFI-5105
> Project: Apache NiFi
>  Issue Type: Improvement
>Affects Versions: 1.6.0
>Reporter: James Wing
>Priority: Minor
>
> Update the AWS SDK version used by nifi-aws-bundle to a recent SDK, with 
> support for newer AWS features, regions, etc.  As part of the upgrade, we 
> should specify the individual SDK sub-component maven coordinates we actually 
> use, rather than the entire SDK, to reduce the size of binary distributions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on a diff in the pull request:

https://github.com/apache/nifi/pull/2614#discussion_r183235809
  
--- Diff: 
nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/pom.xml ---
@@ -0,0 +1,67 @@
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+4.0.0
+
+
+org.apache.nifi
+nifi-pulsar-bundle
+1.7.0-SNAPSHOT
+
+
+nifi-pulsar-client-service
+jar
+
+
+
+org.apache.nifi
+nifi-pulsar-client-service-api
+1.7.0-SNAPSHOT
--- End diff --

This should be marked as provided as well.


---


[GitHub] nifi issue #2587: NIFI-4185 Add XML Record Reader

2018-04-22 Thread JohannesDaniel
Github user JohannesDaniel commented on the issue:

https://github.com/apache/nifi/pull/2587
  
@markap14 
- Added EL for record format property
- Removed record tag validation


---


[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service

2018-04-22 Thread MikeThomsen
Github user MikeThomsen commented on the issue:

https://github.com/apache/nifi/pull/2614
  
When I added it to nifi-assembly, I got this error on startup:

```
java.util.ServiceConfigurationError: org.apache.nifi.processor.Processor: 
Provider org.apache.nifi.processors.pulsar.pubsub.ConsumePulsarRecord_1_X could 
not be instantiated
at java.util.ServiceLoader.fail(ServiceLoader.java:232)
at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
at 
java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at 
org.apache.nifi.nar.ExtensionManager.loadExtensions(ExtensionManager.java:148)
at 
org.apache.nifi.nar.ExtensionManager.discoverExtensions(ExtensionManager.java:123)
at org.apache.nifi.web.server.JettyServer.start(JettyServer.java:771)
at org.apache.nifi.NiFi.(NiFi.java:157)
at org.apache.nifi.NiFi.(NiFi.java:71)
at org.apache.nifi.NiFi.main(NiFi.java:292)
Caused by: java.lang.NoClassDefFoundError: 
org/apache/nifi/serialization/MalformedRecordException
at java.lang.Class.getDeclaredConstructors0(Native Method)
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671)
at java.lang.Class.getConstructor0(Class.java:3075)
at java.lang.Class.newInstance(Class.java:412)
at 
java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
... 8 common frames omitted
Caused by: java.lang.ClassNotFoundException: 
org.apache.nifi.serialization.MalformedRecordException
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 13 common frames omitted
```

Your declaration for `nifi-record` looks right, so we'll have to dig deeper 
once you get these changes done.


---


[jira] [Commented] (NIFI-4185) Add XML record reader & writer services

2018-04-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/NIFI-4185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447249#comment-16447249
 ] 

ASF GitHub Bot commented on NIFI-4185:
--

Github user JohannesDaniel commented on the issue:

https://github.com/apache/nifi/pull/2587
  
@markap14 
- Added EL for record format property
- Removed record tag validation


> Add XML record reader & writer services
> ---
>
> Key: NIFI-4185
> URL: https://issues.apache.org/jira/browse/NIFI-4185
> Project: Apache NiFi
>  Issue Type: New Feature
>  Components: Extensions
>Affects Versions: 1.3.0
>Reporter: Andy LoPresto
>Assignee: Johannes Peter
>Priority: Major
>  Labels: json, records, xml
>
> With the addition of the {{RecordReader}} and {{RecordSetWriter}} paradigm, 
> XML conversion has not yet been targeted. This will replace the previous 
> ticket for XML to JSON conversion. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)