[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user david-streamlio closed the pull request at: https://github.com/apache/nifi/pull/2614 ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183235809 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/pom.xml --- @@ -0,0 +1,67 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + +org.apache.nifi +nifi-pulsar-bundle +1.7.0-SNAPSHOT + + +nifi-pulsar-client-service +jar + + + +org.apache.nifi +nifi-pulsar-client-service-api +1.7.0-SNAPSHOT --- End diff -- This should be marked as provided as well. ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183234826 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_X extends AbstractPulsarConsumerProcessor { + +private static final List PROPERTIES; +private static final Set RELATIONSHIPS; + +static { +final List properties = new ArrayList<>(); +properties.add(PULSAR_CLIENT_SERVICE); +properties.add(TOPIC); +properties.add(SUBSCRIPTION); +properties.add(ASYNC_ENABLED); +properties.add(MAX_ASYNC_REQUESTS); +properties.add(ACK_TIMEOUT); +properties.add(PRIORITY_LEVEL); +properties.add(RECEIVER_QUEUE_SIZE); +properties.add(SUBSCRIPTION_TYPE); +properties.add(MAX_WAIT_TIME); + +PROPERTIES = Collections.unmodifiableList(properties); + +final Set relationships = new HashSet<>(); +relationships.add(REL_SUCCESS); +RELATIONSHIPS = Collections.unmodifiableSet(relationships); +} + +@Override +public Set getRelationships() { +return RELATIONSHIPS; +} + +@Override +protected List getSupportedPropertyDescriptors() { +return PROPERTIES; +} + +@Override +public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + +try { +if (context.getProperty(ASYNC_ENABLED).asBoolean()) { +// Launch consumers +consumeAsync(context, session); + +// Handle completed consumers +handleAsync(context, session); + +} else { +consume(context, session); +} +} catch (PulsarClientException e) { +getLogger().error("Unable to consume from Pulsar Topic ", e); +context.yield(); +throw new ProcessException(e); +} + +} + +private void handleAsync(ProcessContext context, ProcessSession session) { + +try { +Future done = consumerService.take(); +Message msg = done.get(); + +if
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183235065 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java --- @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER; +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicLong; + + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; + +@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "1.0"}) +@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Pulsar using the Pulsar 1.x client API. " ++ "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. " ++ "The complementary NiFi processor for fetching messages is ConsumePulsarRecord_1_0.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to " ++ "FlowFiles that are routed to success.") +@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, ConsumePulsarRecord_1_X.class}) +public class PublishPulsarRecord_1_X extends AbstractPulsarProducerProcessor { + +private static final List PROPERTIES; +private static final Set RELATIONSHIPS; + +static { +final List properties = new ArrayList<>(); +properties.add(PULSAR_CLIENT_SERVICE); +properties.add(RECORD_READER); +properties.add(RECORD_WRITER); +properties.add(TOPIC); +properties.add(ASYNC_ENABLED); +properties.add(MAX_ASYNC_REQUESTS); +properties.add(BATCHING_ENABLED); +
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183233581 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord_1_X.java --- @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER; +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; + +@CapabilityDescription("Consumes messages from Apache Pulsar specifically built against the Pulsar 1.x Consumer API. " ++ "The complementary NiFi processor for sending messages is PublishPulsarRecord_1_0. Please note that, at this time, " ++ "the Processor assumes that all records that are retrieved from a given partition have the same schema. If any " ++ "of the Pulsar messages that are pulled but cannot be parsed or written with the configured Record Reader or " ++ "Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the " ++ "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual " ++ "messages within the single FlowFile. A 'record.count' attribute is added to indicate how many messages are contained in the " ++ "FlowFile. No two Pulsar messages will be placed into the same FlowFile if they have different schemas.") +@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@WritesAttributes({ +@WritesAttribute(attribute = "record.count", description = "The number of records received") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183234943 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java --- @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER; +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicLong; + + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; + +@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "1.0"}) +@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Pulsar using the Pulsar 1.x client API. " ++ "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. " ++ "The complementary NiFi processor for fetching messages is ConsumePulsarRecord_1_0.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to " ++ "FlowFiles that are routed to success.") +@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, ConsumePulsarRecord_1_X.class}) +public class PublishPulsarRecord_1_X extends AbstractPulsarProducerProcessor { + +private static final List PROPERTIES; +private static final Set RELATIONSHIPS; + +static { +final List properties = new ArrayList<>(); +properties.add(PULSAR_CLIENT_SERVICE); +properties.add(RECORD_READER); +properties.add(RECORD_WRITER); +properties.add(TOPIC); +properties.add(ASYNC_ENABLED); +properties.add(MAX_ASYNC_REQUESTS); +properties.add(BATCHING_ENABLED); +
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183235109 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java --- @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER; +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicLong; + + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; + +@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "1.0"}) +@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Pulsar using the Pulsar 1.x client API. " ++ "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. " ++ "The complementary NiFi processor for fetching messages is ConsumePulsarRecord_1_0.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to " ++ "FlowFiles that are routed to success.") +@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, ConsumePulsarRecord_1_X.class}) +public class PublishPulsarRecord_1_X extends AbstractPulsarProducerProcessor { + +private static final List PROPERTIES; +private static final Set RELATIONSHIPS; + +static { +final List properties = new ArrayList<>(); +properties.add(PULSAR_CLIENT_SERVICE); +properties.add(RECORD_READER); +properties.add(RECORD_WRITER); +properties.add(TOPIC); +properties.add(ASYNC_ENABLED); +properties.add(MAX_ASYNC_REQUESTS); +properties.add(BATCHING_ENABLED); +
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183234762 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_X extends AbstractPulsarConsumerProcessor { + +private static final List PROPERTIES; +private static final Set RELATIONSHIPS; + +static { +final List properties = new ArrayList<>(); +properties.add(PULSAR_CLIENT_SERVICE); +properties.add(TOPIC); +properties.add(SUBSCRIPTION); +properties.add(ASYNC_ENABLED); +properties.add(MAX_ASYNC_REQUESTS); +properties.add(ACK_TIMEOUT); +properties.add(PRIORITY_LEVEL); +properties.add(RECEIVER_QUEUE_SIZE); +properties.add(SUBSCRIPTION_TYPE); +properties.add(MAX_WAIT_TIME); + +PROPERTIES = Collections.unmodifiableList(properties); + +final Set relationships = new HashSet<>(); +relationships.add(REL_SUCCESS); +RELATIONSHIPS = Collections.unmodifiableSet(relationships); +} + +@Override +public Set getRelationships() { +return RELATIONSHIPS; +} + +@Override +protected List getSupportedPropertyDescriptors() { +return PROPERTIES; +} + +@Override +public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + +try { +if (context.getProperty(ASYNC_ENABLED).asBoolean()) { +// Launch consumers +consumeAsync(context, session); + +// Handle completed consumers +handleAsync(context, session); + +} else { +consume(context, session); +} +} catch (PulsarClientException e) { +getLogger().error("Unable to consume from Pulsar Topic ", e); +context.yield(); +throw new ProcessException(e); +} + +} + +private void handleAsync(ProcessContext context, ProcessSession session) { + +try { +Future done = consumerService.take(); +Message msg = done.get(); + +if
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183209203 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.pulsar.pool.ResourcePoolImpl; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +@Tags({ "Pulsar"}) +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.") +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() +.name("Maximum concurrent lookup-requests") +.description("Number of concurrent lookup-requests allowed on each broker-connection to prevent " ++ "overload on broker. (default: 5000) It should be configured with higher value only in case " ++ "of it requires to produce/subscribe on thousands of topics") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("5000") +.build(); + +public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder() +.name("Maximum connects per Pulsar broker") +.description("Sets the max number of connection that the client library will open to a single broker.\n" + +"By default, the connection pool will use a single connection for all the producers and consumers. " + +"Increasing this parameter may improve throughput when using many producers over a high latency connection") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("1") +.build(); + +public static final PropertyDescriptor IO_THREADS = new PropertyDescriptor.Builder() +.name("I/O Threads") +.description("The number of threads to be used for handling connections to brokers (default: 1 thread)") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183234572 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord_1_X.java --- @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER; +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; + +@CapabilityDescription("Consumes messages from Apache Pulsar specifically built against the Pulsar 1.x Consumer API. " ++ "The complementary NiFi processor for sending messages is PublishPulsarRecord_1_0. Please note that, at this time, " ++ "the Processor assumes that all records that are retrieved from a given partition have the same schema. If any " ++ "of the Pulsar messages that are pulled but cannot be parsed or written with the configured Record Reader or " ++ "Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the " ++ "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual " ++ "messages within the single FlowFile. A 'record.count' attribute is added to indicate how many messages are contained in the " ++ "FlowFile. No two Pulsar messages will be placed into the same FlowFile if they have different schemas.") +@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@WritesAttributes({ +@WritesAttribute(attribute = "record.count", description = "The number of records received") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183209029 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-nar/src/main/resources/META-INF/NOTICE --- @@ -0,0 +1,612 @@ +nifi-druid-controller-service-api-nar --- End diff -- Needs to be changed to the pulsar client service. ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183208823 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/PoolableResource.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar.pool; + +/** + * Service interface for any object that can be pooled for re-use., which + * defines methods for closing the object, effectively marking it no longer + * usable. + * + * @author david --- End diff -- Please remove. ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183235163 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsar_1_X.java --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; + +@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"}) +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Pulsar using the Pulsar 1.X Producer API." ++ "The messages to send may be individual FlowFiles or may be delimited, using a " ++ "user-specified delimiter, such as a new-line. " ++ "The complementary NiFi processor for fetching messages is ConsumePulsar_1_X.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to " ++ "FlowFiles that are routed to success.") +public class PublishPulsar_1_X extends AbstractPulsarProducerProcessor { + +private static final List PROPERTIES; +private static final Set RELATIONSHIPS; + +static { +final List properties = new ArrayList<>(); +properties.add(PULSAR_CLIENT_SERVICE); +properties.add(TOPIC); +properties.add(ASYNC_ENABLED); +properties.add(MAX_ASYNC_REQUESTS); +properties.add(BATCHING_ENABLED); +properties.add(BATCHING_MAX_MESSAGES); +properties.add(BATCH_INTERVAL); +properties.add(BLOCK_IF_QUEUE_FULL); +properties.add(COMPRESSION_TYPE); +properties.add(MESSAGE_ROUTING_MODE); +properties.add(PENDING_MAX_MESSAGES); + +PROPERTIES = Collections.unmodifiableList(properties); + +final Set relationships = new HashSet<>(); +relationships.add(REL_SUCCESS); +relationships.add(REL_FAILURE); +RELATIONSHIPS = Collections.unmodifiableSet(relationships); +} + +@Override +public Set getRelationships() { +return RELATIONSHIPS; +} + +@Override +protected List getSupportedPropertyDescriptors() { +return PROPERTIES; +} + +@Override +public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + +FlowFile flowFile = session.get(); + +if (flowFile == null) { +return; +} + +final ComponentLog logger = getLogger(); +final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); + +if (StringUtils.isBlank(topic)) { +
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183208994 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/ResourceFactory.java --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar.pool; + +import java.util.Properties; + +/** + * Factory pattern interface for @PoolableResource objects. Concrete implementations + * of this interface will be responsible for the creation of @PoolableResource objects + * based on the Properties passed in. + * + * @author david --- End diff -- Please remove ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183210297 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarProducerProcessor.java --- @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarProducer; +import org.apache.nifi.pulsar.cache.LRUCache; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConfiguration; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; + +public abstract class AbstractPulsarProducerProcessor extends AbstractPulsarProcessor { + +public static final String MSG_COUNT = "msg.count"; +public static final String TOPIC_NAME = "topic.name"; + +static final AllowableValue COMPRESSION_TYPE_NONE = new AllowableValue("NONE", "None", "No compression"); +static final AllowableValue COMPRESSION_TYPE_LZ4 = new AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm."); +static final AllowableValue COMPRESSION_TYPE_ZLIB = new AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm"); + +static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a custom partition"); +static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION = new AllowableValue("RoundRobinPartition", "Round Robin Partition", "Route messages to all " + + "partitions in a round robin manner"); +static final AllowableValue MESSAGE_ROUTING_MODE_SINGLE_PARTITION = new AllowableValue("SinglePartition", "Single Partition", "Route messages to a single partition"); + +public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() +.name("topic") +.displayName("Topic Name") +.description("The name of the Pulsar Topic.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.build(); + +public static final PropertyDescriptor ASYNC_ENABLED = new PropertyDescriptor.Builder() +.name("Async Enabled") +.description("Control whether the messages will be sent asyncronously or not. Messages sent" ++ " syncronously will be acknowledged immediately before processing the next
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183234732 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord_1_X.java --- @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER; +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; + +@CapabilityDescription("Consumes messages from Apache Pulsar specifically built against the Pulsar 1.x Consumer API. " ++ "The complementary NiFi processor for sending messages is PublishPulsarRecord_1_0. Please note that, at this time, " ++ "the Processor assumes that all records that are retrieved from a given partition have the same schema. If any " ++ "of the Pulsar messages that are pulled but cannot be parsed or written with the configured Record Reader or " ++ "Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the " ++ "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual " ++ "messages within the single FlowFile. A 'record.count' attribute is added to indicate how many messages are contained in the " ++ "FlowFile. No two Pulsar messages will be placed into the same FlowFile if they have different schemas.") +@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@WritesAttributes({ +@WritesAttribute(attribute = "record.count", description = "The number of records received") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183234849 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_X extends AbstractPulsarConsumerProcessor { + +private static final List PROPERTIES; +private static final Set RELATIONSHIPS; + +static { +final List properties = new ArrayList<>(); +properties.add(PULSAR_CLIENT_SERVICE); +properties.add(TOPIC); +properties.add(SUBSCRIPTION); +properties.add(ASYNC_ENABLED); +properties.add(MAX_ASYNC_REQUESTS); +properties.add(ACK_TIMEOUT); +properties.add(PRIORITY_LEVEL); +properties.add(RECEIVER_QUEUE_SIZE); +properties.add(SUBSCRIPTION_TYPE); +properties.add(MAX_WAIT_TIME); + +PROPERTIES = Collections.unmodifiableList(properties); + +final Set relationships = new HashSet<>(); +relationships.add(REL_SUCCESS); +RELATIONSHIPS = Collections.unmodifiableSet(relationships); +} + +@Override +public Set getRelationships() { +return RELATIONSHIPS; +} + +@Override +protected List getSupportedPropertyDescriptors() { +return PROPERTIES; +} + +@Override +public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + +try { +if (context.getProperty(ASYNC_ENABLED).asBoolean()) { +// Launch consumers +consumeAsync(context, session); + +// Handle completed consumers +handleAsync(context, session); + +} else { +consume(context, session); +} +} catch (PulsarClientException e) { +getLogger().error("Unable to consume from Pulsar Topic ", e); +context.yield(); +throw new ProcessException(e); +} + +} + +private void handleAsync(ProcessContext context, ProcessSession session) { + +try { +Future done = consumerService.take(); +Message msg = done.get(); + +if
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183235146 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java --- @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER; +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicLong; + + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; + +@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "1.0"}) +@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Pulsar using the Pulsar 1.x client API. " ++ "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. " ++ "The complementary NiFi processor for fetching messages is ConsumePulsarRecord_1_0.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to " ++ "FlowFiles that are routed to success.") +@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, ConsumePulsarRecord_1_X.class}) +public class PublishPulsarRecord_1_X extends AbstractPulsarProducerProcessor { + +private static final List PROPERTIES; +private static final Set RELATIONSHIPS; + +static { +final List properties = new ArrayList<>(); +properties.add(PULSAR_CLIENT_SERVICE); +properties.add(RECORD_READER); +properties.add(RECORD_WRITER); +properties.add(TOPIC); +properties.add(ASYNC_ENABLED); +properties.add(MAX_ASYNC_REQUESTS); +properties.add(BATCHING_ENABLED); +
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183235215 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/RecordBasedConst.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriterFactory; + +public final class RecordBasedConst { + +public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() +.name("record-reader") +.displayName("Record Reader") +.description("The Record Reader to use for incoming FlowFiles") +.identifiesControllerService(RecordReaderFactory.class) +.expressionLanguageSupported(false) +.required(true) +.build(); + +public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() +.name("record-writer") +.displayName("Record Writer") +.description("The Record Writer to use in order to serialize the data before sending to Pulsar") +.identifiesControllerService(RecordSetWriterFactory.class) +.expressionLanguageSupported(false) +.required(true) +.build(); + +private RecordBasedConst() { --- End diff -- Not needed. ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183235123 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java --- @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER; +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicLong; + + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; + +@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "1.0"}) +@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Pulsar using the Pulsar 1.x client API. " ++ "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. " ++ "The complementary NiFi processor for fetching messages is ConsumePulsarRecord_1_0.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to " ++ "FlowFiles that are routed to success.") +@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, ConsumePulsarRecord_1_X.class}) +public class PublishPulsarRecord_1_X extends AbstractPulsarProducerProcessor { + +private static final List PROPERTIES; +private static final Set RELATIONSHIPS; + +static { +final List properties = new ArrayList<>(); +properties.add(PULSAR_CLIENT_SERVICE); +properties.add(RECORD_READER); +properties.add(RECORD_WRITER); +properties.add(TOPIC); +properties.add(ASYNC_ENABLED); +properties.add(MAX_ASYNC_REQUESTS); +properties.add(BATCHING_ENABLED); +
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183234534 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord_1_X.java --- @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER; +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; + +@CapabilityDescription("Consumes messages from Apache Pulsar specifically built against the Pulsar 1.x Consumer API. " ++ "The complementary NiFi processor for sending messages is PublishPulsarRecord_1_0. Please note that, at this time, " ++ "the Processor assumes that all records that are retrieved from a given partition have the same schema. If any " ++ "of the Pulsar messages that are pulled but cannot be parsed or written with the configured Record Reader or " ++ "Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the " ++ "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual " ++ "messages within the single FlowFile. A 'record.count' attribute is added to indicate how many messages are contained in the " ++ "FlowFile. No two Pulsar messages will be placed into the same FlowFile if they have different schemas.") +@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@WritesAttributes({ +@WritesAttribute(attribute = "record.count", description = "The number of records received") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183234861 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/InFlightMessageMonitor.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Helper class to monitor the asynchronous submission of a large number + * of records to Apache Pulsar. + * + * @author david --- End diff -- Please remove. ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183234779 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_X extends AbstractPulsarConsumerProcessor { + +private static final List PROPERTIES; +private static final Set RELATIONSHIPS; + +static { +final List properties = new ArrayList<>(); +properties.add(PULSAR_CLIENT_SERVICE); +properties.add(TOPIC); +properties.add(SUBSCRIPTION); +properties.add(ASYNC_ENABLED); +properties.add(MAX_ASYNC_REQUESTS); +properties.add(ACK_TIMEOUT); +properties.add(PRIORITY_LEVEL); +properties.add(RECEIVER_QUEUE_SIZE); +properties.add(SUBSCRIPTION_TYPE); +properties.add(MAX_WAIT_TIME); + +PROPERTIES = Collections.unmodifiableList(properties); + +final Set relationships = new HashSet<>(); +relationships.add(REL_SUCCESS); +RELATIONSHIPS = Collections.unmodifiableSet(relationships); +} + +@Override +public Set getRelationships() { +return RELATIONSHIPS; +} + +@Override +protected List getSupportedPropertyDescriptors() { +return PROPERTIES; +} + +@Override +public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + +try { +if (context.getProperty(ASYNC_ENABLED).asBoolean()) { +// Launch consumers +consumeAsync(context, session); + +// Handle completed consumers +handleAsync(context, session); + +} else { +consume(context, session); +} +} catch (PulsarClientException e) { +getLogger().error("Unable to consume from Pulsar Topic ", e); +context.yield(); +throw new ProcessException(e); +} + +} + +private void handleAsync(ProcessContext context, ProcessSession session) { + +try { +Future done = consumerService.take(); +Message msg = done.get(); + +if
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183234539 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord_1_X.java --- @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER; +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; + +@CapabilityDescription("Consumes messages from Apache Pulsar specifically built against the Pulsar 1.x Consumer API. " ++ "The complementary NiFi processor for sending messages is PublishPulsarRecord_1_0. Please note that, at this time, " ++ "the Processor assumes that all records that are retrieved from a given partition have the same schema. If any " ++ "of the Pulsar messages that are pulled but cannot be parsed or written with the configured Record Reader or " ++ "Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the " ++ "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual " ++ "messages within the single FlowFile. A 'record.count' attribute is added to indicate how many messages are contained in the " ++ "FlowFile. No two Pulsar messages will be placed into the same FlowFile if they have different schemas.") +@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@WritesAttributes({ +@WritesAttribute(attribute = "record.count", description = "The number of records received") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183209099 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.pulsar.pool.ResourcePoolImpl; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +@Tags({ "Pulsar"}) +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.") +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() +.name("Maximum concurrent lookup-requests") +.description("Number of concurrent lookup-requests allowed on each broker-connection to prevent " ++ "overload on broker. (default: 5000) It should be configured with higher value only in case " ++ "of it requires to produce/subscribe on thousands of topics") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("5000") +.build(); + +public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder() +.name("Maximum connects per Pulsar broker") +.description("Sets the max number of connection that the client library will open to a single broker.\n" + +"By default, the connection pool will use a single connection for all the producers and consumers. " + +"Increasing this parameter may improve throughput when using many producers over a high latency connection") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) --- End diff -- > Consider adding expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY). In fact, you might want to consider that for all of these properties so you can make it more customizable. ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183235251 --- Diff: nifi-nar-bundles/pom.xml --- @@ -93,6 +93,7 @@ nifi-spark-bundle nifi-atlas-bundle nifi-druid-bundle +nifi-pulsar-bundle --- End diff -- You need to add a dependency declaration for the NAR in `nifi-assembly/pom.xml`. Just putting that note here since there's no really good place to track that in the review. ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183208828 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientPool.java --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.pulsar.pool.ResourcePool; + + +@Tags({"Pulsar"}) +@CapabilityDescription("Provides the ability to create Pulsar Producer / Consumer instances on demand, based on the configuration." + + "properties defined") +/** + * Service definition for apache Pulsar Client ControllerService + * responsible for maintaining a pool of @PulsarProducer and + * @PulsarConsumer objects. + * + * Since both of these objects can be reused, in a manner similar + * to database connections, and the cost to create these objects is + * relatively high. The PulsarClientPool keeps these objects in pools + * for re-use. + * + * @author david --- End diff -- Please remove. ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183209058 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.pulsar.pool.ResourcePoolImpl; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +@Tags({ "Pulsar"}) +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.") +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) --- End diff -- Consider adding `expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)`. ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183208835 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientPool.java --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.pulsar.pool.ResourcePool; + + +@Tags({"Pulsar"}) --- End diff -- Could use others like "client" and "pool." ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183235033 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java --- @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER; +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicLong; + + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; + +@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "1.0"}) +@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Pulsar using the Pulsar 1.x client API. " ++ "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. " ++ "The complementary NiFi processor for fetching messages is ConsumePulsarRecord_1_0.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to " ++ "FlowFiles that are routed to success.") +@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, ConsumePulsarRecord_1_X.class}) +public class PublishPulsarRecord_1_X extends AbstractPulsarProducerProcessor { + +private static final List PROPERTIES; +private static final Set RELATIONSHIPS; + +static { +final List properties = new ArrayList<>(); +properties.add(PULSAR_CLIENT_SERVICE); +properties.add(RECORD_READER); +properties.add(RECORD_WRITER); +properties.add(TOPIC); +properties.add(ASYNC_ENABLED); +properties.add(MAX_ASYNC_REQUESTS); +properties.add(BATCHING_ENABLED); +
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183234838 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_X extends AbstractPulsarConsumerProcessor { + +private static final List PROPERTIES; +private static final Set RELATIONSHIPS; + +static { +final List properties = new ArrayList<>(); +properties.add(PULSAR_CLIENT_SERVICE); +properties.add(TOPIC); +properties.add(SUBSCRIPTION); +properties.add(ASYNC_ENABLED); +properties.add(MAX_ASYNC_REQUESTS); +properties.add(ACK_TIMEOUT); +properties.add(PRIORITY_LEVEL); +properties.add(RECEIVER_QUEUE_SIZE); +properties.add(SUBSCRIPTION_TYPE); +properties.add(MAX_WAIT_TIME); + +PROPERTIES = Collections.unmodifiableList(properties); + +final Set relationships = new HashSet<>(); +relationships.add(REL_SUCCESS); +RELATIONSHIPS = Collections.unmodifiableSet(relationships); +} + +@Override +public Set getRelationships() { +return RELATIONSHIPS; +} + +@Override +protected List getSupportedPropertyDescriptors() { +return PROPERTIES; +} + +@Override +public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + +try { +if (context.getProperty(ASYNC_ENABLED).asBoolean()) { +// Launch consumers +consumeAsync(context, session); + +// Handle completed consumers +handleAsync(context, session); + +} else { +consume(context, session); +} +} catch (PulsarClientException e) { +getLogger().error("Unable to consume from Pulsar Topic ", e); +context.yield(); +throw new ProcessException(e); +} + +} + +private void handleAsync(ProcessContext context, ProcessSession session) { + +try { +Future done = consumerService.take(); +Message msg = done.get(); + +if
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183209046 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.pulsar.pool.ResourcePoolImpl; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +@Tags({ "Pulsar"}) --- End diff -- Should be fleshed out with more tags. ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183209071 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.pulsar.pool.ResourcePoolImpl; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +@Tags({ "Pulsar"}) +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.") +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() +.name("Maximum concurrent lookup-requests") +.description("Number of concurrent lookup-requests allowed on each broker-connection to prevent " ++ "overload on broker. (default: 5000) It should be configured with higher value only in case " ++ "of it requires to produce/subscribe on thousands of topics") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("5000") --- End diff -- Are you sure this is a sane default? Your description suggests that 5,000 is the upper end of where most users would want to be. ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r180328774 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/pom.xml --- @@ -0,0 +1,67 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + +org.apache.nifi +nifi-pulsar-bundle +1.7.0-SNAPSHOT + + +nifi-pulsar-client-service +jar + + + +org.apache.nifi +nifi-pulsar-client-service-api +1.6.0-SNAPSHOT --- End diff -- Needs to be updated to 1.7.0-SNAPSHOT ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r180328802 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/pom.xml --- @@ -0,0 +1,67 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + +org.apache.nifi +nifi-pulsar-bundle +1.7.0-SNAPSHOT + + +nifi-pulsar-client-service +jar + + + +org.apache.nifi +nifi-pulsar-client-service-api +1.6.0-SNAPSHOT + + +org.apache.nifi +nifi-api +provided + + +org.apache.nifi +nifi-processor-utils +1.7.0-SNAPSHOT +provided + + +org.apache.nifi +nifi-ssl-context-service-api +provided + + +org.apache.nifi +nifi-mock +1.6.0-SNAPSHOT --- End diff -- Needs to be updated to 1.7.0-SNAPSHOT ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r180329047 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-nar/pom.xml --- @@ -0,0 +1,50 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + +org.apache.nifi +nifi-pulsar-bundle +1.7.0-SNAPSHOT + + +nifi-pulsar-nar +nar + + + +org.apache.nifi +nifi-pulsar-client-service-api-nar +1.6.0-SNAPSHOT +nar + + + +org.apache.nifi +nifi-pulsar-processors +1.6.0-SNAPSHOT + + + + org.apache.nifi + nifi-pulsar-client-service + 1.6.0-SNAPSHOT --- End diff -- Needs to be updated to 1.7.0-SNAPSHOT and indentation to fix ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r180329019 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-nar/pom.xml --- @@ -0,0 +1,50 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + +org.apache.nifi +nifi-pulsar-bundle +1.7.0-SNAPSHOT + + +nifi-pulsar-nar +nar + + + +org.apache.nifi +nifi-pulsar-client-service-api-nar +1.6.0-SNAPSHOT +nar + + + +org.apache.nifi +nifi-pulsar-processors +1.6.0-SNAPSHOT --- End diff -- Needs to be updated to 1.7.0-SNAPSHOT ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r180328729 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-nar/pom.xml --- @@ -0,0 +1,35 @@ + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> + 4.0.0 + + +org.apache.nifi +nifi-pulsar-bundle +1.7.0-SNAPSHOT + + + nifi-pulsar-client-service-api-nar + nar + + + +org.apache.nifi +nifi-pulsar-client-service-api +1.6.0-SNAPSHOT --- End diff -- Needs to be updated to 1.7.0-SNAPSHOT ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r180328937 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-nar/pom.xml --- @@ -0,0 +1,50 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + +org.apache.nifi +nifi-pulsar-bundle +1.7.0-SNAPSHOT + + +nifi-pulsar-nar +nar + + + +org.apache.nifi +nifi-pulsar-client-service-api-nar +1.6.0-SNAPSHOT --- End diff -- Needs to be updated to 1.7.0-SNAPSHOT and indentation to fix ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
GitHub user david-streamlio opened a pull request: https://github.com/apache/nifi/pull/2614 Added Apache Pulsar Processors and Controller Service Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/david-streamlio/nifi NIFI-4914 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2614.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2614 commit bcfdae70506e4b1f2aab03a8bcfc9f55f859597c Author: David KjerrumgaardDate: 2018-04-06T22:42:30Z Added Apache Pulsar Processors and Controller Service ---