[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors
Github user david-streamlio commented on a diff in the pull request: https://github.com/apache/nifi/pull/3178#discussion_r235123524 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +public class StandardPulsarClientService extends AbstractControllerService implements PulsarClientService { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor.Builder() +.name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.build(); + +public static final PropertyDescriptor ACCEPT_UNTRUSTED_TLS_CERTIFICATE_FROM_BROKER = new PropertyDescriptor.Builder() +.name("ACCEPT_UNTRUSTED_TLS_CERTIFICATE_FROM_BROKER") +.displayName("Allow TLS insecure connection") +.description("If a valid trusted certificate is provided in the 'TLS Trust Certs File Path' property of this controller service," ++ " then, by default, all communication between this controller service and the Apache Pulsar broker will be secured via" ++ " TLS and only use the trusted TLS certificate from broker. Setting this property to 'false' will allow this controller" ++ " service to accept an untrusted TLS certificate from broker as well. This property should only be set to false if you trust" ++ " the broker you are connecting to, but do not have access to the TLS certificate file.") +.required(false) +.allowableValues("true", "false") +.defaultValue("false") +.build(); + +public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() +.name("CONCURRENT_LOOKUP_REQUESTS") +.displayName("Maximum concurrent lookup-requests") +.description("Number of concurrent lookup-requests allowed on each broker-connection.") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.defaultValue("5000") +.build(); + +public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder() +.name("CONNECTIONS_PER_BROKER") +
[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors
Github user david-streamlio commented on a diff in the pull request: https://github.com/apache/nifi/pull/3178#discussion_r235123261 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarProducerProcessor.java --- @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientService; +import org.apache.nifi.pulsar.cache.LRUCache; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.shade.org.apache.commons.collections.CollectionUtils; + +public abstract class AbstractPulsarProducerProcessor extends AbstractProcessor { + +public static final String MSG_COUNT = "msg.count"; +public static final String TOPIC_NAME = "topic.name"; + +static final AllowableValue COMPRESSION_TYPE_NONE = new AllowableValue("NONE", "None", "No compression"); +static final AllowableValue COMPRESSION_TYPE_LZ4 = new AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm."); +static final AllowableValue COMPRESSION_TYPE_ZLIB = new AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm"); + +static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a custom partition"); +static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION = new AllowableValue("RoundRobinPartition", "Round Robin Partition", "Route messages to all " + + "partitions in a round robin manner"); +static final AllowableValue MESSAGE_ROUTING_MODE_SINGLE_PARTITION = new AllowableValue("SinglePartition", "Single Partition", "Route messages to a single partition"); + +public static final Relationship REL_SUCCESS = new Relationship.Builder() +.name("success") +.description("FlowFiles for which all content was sent to Pulsar.") +.build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder() +.name("failure") +.description("Any FlowFile that cannot be sent to Pulsar will be routed to this Relationship") +.build(); + +public static final PropertyDescriptor PULSAR_CLIENT_SERVICE = new PropertyDescriptor.Builder() +.name("PULSAR_CLIENT_SERVICE") +
[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors
Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/3178#discussion_r235108402 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord.java --- @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +@CapabilityDescription("Consumes messages from Apache Pulsar. " ++ "The complementary NiFi processor for sending messages is PublishPulsarRecord. Please note that, at this time, " ++ "the Processor assumes that all records that are retrieved have the same schema. If any of the Pulsar messages " ++ "that are pulled but cannot be parsed or written with the configured Record Reader or Record Writer, the contents " ++ "of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the 'parse.failure' " ++ "relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual " ++ "messages within the single FlowFile. A 'record.count' attribute is added to indicate how many messages are contained in the " ++ "FlowFile. No two Pulsar messages will be placed into the same FlowFile if they have different schemas.") +@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@WritesAttributes({ +@WritesAttribute(attribute = "record.count", description = "The number of records received") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@SeeAlso({PublishPulsar.class, ConsumePulsar.class, PublishPulsarRecord.class}) +public class ConsumePulsarRecord extends AbstractPulsarConsumerProcessor { + +public static final String MSG_COUNT = "record.count"; + +public static final PropertyDescriptor
[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors
Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/3178#discussion_r235107847 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar.java --- @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +@SeeAlso({PublishPulsar.class, ConsumePulsarRecord.class, PublishPulsarRecord.class}) +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar. The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar extends AbstractPulsarConsumerProcessor { + +@Override +public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { +try { +Consumer consumer = getConsumer(context, getConsumerId(context, session.get())); + +if (consumer == null) { +context.yield(); +return; +} + +if (context.getProperty(ASYNC_ENABLED).asBoolean()) { +consumeAsync(consumer, context, session); +handleAsync(consumer, context, session); +} else { +consume(consumer, context, session); +} +} catch (PulsarClientException e) { +getLogger().error("Unable to consume from Pulsar Topic ", e); +context.yield(); +throw new ProcessException(e); +} +} + +private void handleAsync(final Consumer consumer, ProcessContext context, ProcessSession session) { +try { +Future> done = getConsumerService().poll(50, TimeUnit.MILLISECONDS); + +if (done != null) { + Message msg = done.get(); + + if (msg != null) { + FlowFile flowFile = null; + final byte[] value = msg.getData(); + if (value != null && value.length > 0) { + flowFile = session.create(); + flowFile = session.write(flowFile, out -> { + out.write(value); + }); + + session.getProvenanceReporter().receive(flowFile, getPulsarClientService().getPulsarBrokerRootURL() + "/" + consumer.getTopic()); + session.transfer(flowFile, REL_SUCCESS); + session.commit(); + } + // Acknowledge consuming the message + getAckService().submit(new Callable() { + @Override + public Object call() throws Exception { + return consumer.acknowledgeAsync(msg).get(); + } + }); + } +} else { +
[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors
Github user david-streamlio commented on a diff in the pull request: https://github.com/apache/nifi/pull/3178#discussion_r235107594 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +public class StandardPulsarClientService extends AbstractControllerService implements PulsarClientService { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor.Builder() +.name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.build(); + +public static final PropertyDescriptor ACCEPT_UNTRUSTED_TLS_CERTIFICATE_FROM_BROKER = new PropertyDescriptor.Builder() --- End diff -- Ok, I will remove that property ---
[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors
Github user david-streamlio commented on a diff in the pull request: https://github.com/apache/nifi/pull/3178#discussion_r235107339 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/cache/LRUCache.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar.cache; + +import java.io.Closeable; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +public class LRUCache { --- End diff -- @joewitt The LRU Cache code wasn't copied from anywhere, and is just my naive implementation of the LRU Cache. Most java-based examples I see utilize a LinkedHashMap and I do not. I do appreciate your need to ensure 100% Apache compliance with the code, so I can provide an implementation of the LRU cache based on the https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/map/LRUMap.html class instead, if you think that would be better/safer from a licensing perspective ---
[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors
Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/3178#discussion_r235106784 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarProducerProcessor.java --- @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientService; +import org.apache.nifi.pulsar.cache.LRUCache; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.shade.org.apache.commons.collections.CollectionUtils; + +public abstract class AbstractPulsarProducerProcessor extends AbstractProcessor { + +public static final String MSG_COUNT = "msg.count"; +public static final String TOPIC_NAME = "topic.name"; + +static final AllowableValue COMPRESSION_TYPE_NONE = new AllowableValue("NONE", "None", "No compression"); +static final AllowableValue COMPRESSION_TYPE_LZ4 = new AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm."); +static final AllowableValue COMPRESSION_TYPE_ZLIB = new AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm"); + +static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a custom partition"); +static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION = new AllowableValue("RoundRobinPartition", "Round Robin Partition", "Route messages to all " + + "partitions in a round robin manner"); +static final AllowableValue MESSAGE_ROUTING_MODE_SINGLE_PARTITION = new AllowableValue("SinglePartition", "Single Partition", "Route messages to a single partition"); + +public static final Relationship REL_SUCCESS = new Relationship.Builder() +.name("success") +.description("FlowFiles for which all content was sent to Pulsar.") +.build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder() +.name("failure") +.description("Any FlowFile that cannot be sent to Pulsar will be routed to this Relationship") +.build(); + +public static final PropertyDescriptor PULSAR_CLIENT_SERVICE = new PropertyDescriptor.Builder() +.name("PULSAR_CLIENT_SERVICE") +
[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors
Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/3178#discussion_r235102894 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/pom.xml --- @@ -0,0 +1,84 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + +org.apache.nifi +nifi-pulsar-bundle +1.9.0-SNAPSHOT + + +nifi-pulsar-processors +jar + + + +org.apache.nifi +nifi-api + + +org.apache.nifi +nifi-record-serialization-service-api + + +org.apache.nifi +nifi-record + + +org.apache.nifi +nifi-utils +1.9.0-SNAPSHOT + + +org.apache.nifi +nifi-ssl-context-service-api + + +org.apache.nifi +nifi-pulsar-client-service-api +1.9.0-SNAPSHOT +provided + + +org.apache.pulsar +pulsar-client +${pulsar.version} + + +org.apache.nifi +nifi-mock +test +1.9.0-SNAPSHOT + + +org.slf4j +slf4j-simple +test + + +junit +junit +test + + + org.apache.commons + commons-lang3 + 3.7 --- End diff -- assuming we really need it ---
[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors
Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/3178#discussion_r235102831 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/pom.xml --- @@ -0,0 +1,84 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + +org.apache.nifi +nifi-pulsar-bundle +1.9.0-SNAPSHOT + + +nifi-pulsar-processors +jar + + + +org.apache.nifi +nifi-api + + +org.apache.nifi +nifi-record-serialization-service-api + + +org.apache.nifi +nifi-record + + +org.apache.nifi +nifi-utils +1.9.0-SNAPSHOT + + +org.apache.nifi +nifi-ssl-context-service-api + + +org.apache.nifi +nifi-pulsar-client-service-api +1.9.0-SNAPSHOT +provided + + +org.apache.pulsar +pulsar-client +${pulsar.version} + + +org.apache.nifi +nifi-mock +test +1.9.0-SNAPSHOT + + +org.slf4j +slf4j-simple +test + + +junit +junit +test + + + org.apache.commons + commons-lang3 + 3.7 --- End diff -- this should move up to current release which I think is 3.8.1 ---
[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors
Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/3178#discussion_r235102049 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +public class StandardPulsarClientService extends AbstractControllerService implements PulsarClientService { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor.Builder() +.name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.build(); + +public static final PropertyDescriptor ACCEPT_UNTRUSTED_TLS_CERTIFICATE_FROM_BROKER = new PropertyDescriptor.Builder() +.name("ACCEPT_UNTRUSTED_TLS_CERTIFICATE_FROM_BROKER") +.displayName("Allow TLS insecure connection") +.description("If a valid trusted certificate is provided in the 'TLS Trust Certs File Path' property of this controller service," ++ " then, by default, all communication between this controller service and the Apache Pulsar broker will be secured via" ++ " TLS and only use the trusted TLS certificate from broker. Setting this property to 'false' will allow this controller" ++ " service to accept an untrusted TLS certificate from broker as well. This property should only be set to false if you trust" ++ " the broker you are connecting to, but do not have access to the TLS certificate file.") +.required(false) +.allowableValues("true", "false") +.defaultValue("false") +.build(); + +public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() +.name("CONCURRENT_LOOKUP_REQUESTS") +.displayName("Maximum concurrent lookup-requests") +.description("Number of concurrent lookup-requests allowed on each broker-connection.") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.defaultValue("5000") +.build(); + +public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder() +.name("CONNECTIONS_PER_BROKER") +
[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors
Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/3178#discussion_r235101491 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +public class StandardPulsarClientService extends AbstractControllerService implements PulsarClientService { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor.Builder() +.name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.build(); + +public static final PropertyDescriptor ACCEPT_UNTRUSTED_TLS_CERTIFICATE_FROM_BROKER = new PropertyDescriptor.Builder() --- End diff -- @david-streamlio this is the property i am saying we should eliminate. It's fine that pulsar supports this but we dont need to from nifi. We spend a ton of time/energy on security and we need to get more serious on it. I see no reason to keep this here. If we later find this to be a real problem then we could talk about ways to improve the situation by helping them generate proper certs/etc.. Also, I dont think we claim this is ok 'if you trust the broker you're connecting to' - the point is it could be any broker... This is inherently not a good thing to use. We should solve the problem. We can do that later. ---
[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors
Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/3178#discussion_r235099937 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/cache/LRUCache.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar.cache; + +import java.io.Closeable; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +public class LRUCache { --- End diff -- @david-streamlio can you confirm this is a uniquely written class for this purpose and not taken from elsewhere? I ask because I've had the displeasure of finding copied code before and it makes for some build/release messes and this is the type of thing that often hits that trigger. This is fresh/clean room stuff? ---
[GitHub] nifi pull request #3178: NIFI-4914: Add Apache Pulsar processors
GitHub user david-streamlio opened a pull request: https://github.com/apache/nifi/pull/3178 NIFI-4914: Add Apache Pulsar processors Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/david-streamlio/nifi NIFI-4914 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/3178.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3178 ---