[GitHub] nifi pull request #2882: NIFI-4914
Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r234765596 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +public class StandardPulsarClientService extends AbstractControllerService implements PulsarClientService { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.build(); + +public static final PropertyDescriptor ALLOW_TLS_INSECURE_CONNECTION = new PropertyDescriptor.Builder() --- End diff -- This property should be removed as its meaning is too vague. There are a lot of ways a TLS connection could be considered or made insecure. If you mean to allow for untrusted certs, and I really dont understand why we would, then it should be clear to what it is doing. ---
[GitHub] nifi pull request #2882: NIFI-4914
Github user david-streamlio closed the pull request at: https://github.com/apache/nifi/pull/2882 ---
[GitHub] nifi pull request #2882: NIFI-4914
Github user david-streamlio commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r219709641 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/pom.xml --- @@ -0,0 +1,78 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + +org.apache.nifi +nifi-pulsar-bundle +1.8.0-SNAPSHOT + + +nifi-pulsar-processors +jar + + + +org.apache.nifi +nifi-api + + +org.apache.nifi +nifi-record-serialization-service-api + + +org.apache.nifi +nifi-record + + +org.apache.nifi +nifi-utils +1.8.0-SNAPSHOT + + +org.apache.nifi +nifi-ssl-context-service-api + + +org.apache.nifi +nifi-pulsar-client-service-api +1.8.0-SNAPSHOT +provided + + +org.apache.pulsar +pulsar-client +2.0.0-rc1-incubating --- End diff -- Fixed ---
[GitHub] nifi pull request #2882: NIFI-4914
Github user david-streamlio commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r219709639 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java --- @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements.See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientService; +import org.apache.nifi.pulsar.cache.LRUCache; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +public abstract class AbstractPulsarConsumerProcessor extends AbstractProcessor { + +static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); +static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); +static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " ++ "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages"); + +static final AllowableValue CONSUME = new AllowableValue(ConsumerCryptoFailureAction.CONSUME.name(), "Consume", +"Mark the message as consumed despite being unable to decrypt the contents"); +static final AllowableValue DISCARD = new AllowableValue(ConsumerCryptoFailureAction.DISCARD.name(), "Discard", +"Discard the message and don't perform any addtional processing on the message"); +static final AllowableValue FAIL = new AllowableValue(ConsumerCryptoFailureAction.FAIL.name(), "Fail", +"Report a failure condition, and the route the message contents to the FAILED relationship."); --- End diff -- Fixed ---
[GitHub] nifi pull request #2882: NIFI-4914
Github user david-streamlio commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r219709622 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java --- @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements.See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientService; +import org.apache.nifi.pulsar.cache.LRUCache; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +public abstract class AbstractPulsarConsumerProcessor extends AbstractProcessor { + +static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); +static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); +static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " ++ "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages"); + +static final AllowableValue CONSUME = new AllowableValue(ConsumerCryptoFailureAction.CONSUME.name(), "Consume", +"Mark the message as consumed despite being unable to decrypt the contents"); +static final AllowableValue DISCARD = new AllowableValue(ConsumerCryptoFailureAction.DISCARD.name(), "Discard", +"Discard the message and don't perform any addtional processing on the message"); +static final AllowableValue FAIL = new AllowableValue(ConsumerCryptoFailureAction.FAIL.name(), "Fail", +"Report a failure condition, and the route the message contents to the FAILED relationship."); + +public static final Relationship REL_SUCCESS = new Relationship.Builder() +.name("success") +.description("FlowFiles for which all content was consumed from Pulsar.") +.build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder() --- End diff -- Good point. I have moved this relationship to the ConsumePulsarRecord class, which will route data that it receives, but cannot read with the configured
[GitHub] nifi pull request #2882: NIFI-4914
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r218858965 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java --- @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements.See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientService; +import org.apache.nifi.pulsar.cache.LRUCache; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +public abstract class AbstractPulsarConsumerProcessor extends AbstractProcessor { + +static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); +static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); +static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " ++ "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages"); + +static final AllowableValue CONSUME = new AllowableValue(ConsumerCryptoFailureAction.CONSUME.name(), "Consume", +"Mark the message as consumed despite being unable to decrypt the contents"); +static final AllowableValue DISCARD = new AllowableValue(ConsumerCryptoFailureAction.DISCARD.name(), "Discard", +"Discard the message and don't perform any addtional processing on the message"); +static final AllowableValue FAIL = new AllowableValue(ConsumerCryptoFailureAction.FAIL.name(), "Fail", +"Report a failure condition, and the route the message contents to the FAILED relationship."); --- End diff -- typo: "and then route the message content" ---
[GitHub] nifi pull request #2882: NIFI-4914
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r218857973 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/.gitignore --- @@ -0,0 +1 @@ +/target/ --- End diff -- same here ---
[GitHub] nifi pull request #2882: NIFI-4914
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r218858305 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/pom.xml --- @@ -0,0 +1,78 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + +org.apache.nifi +nifi-pulsar-bundle +1.8.0-SNAPSHOT + + +nifi-pulsar-processors +jar + + + +org.apache.nifi +nifi-api + + +org.apache.nifi +nifi-record-serialization-service-api + + +org.apache.nifi +nifi-record + + +org.apache.nifi +nifi-utils +1.8.0-SNAPSHOT + + +org.apache.nifi +nifi-ssl-context-service-api + + +org.apache.nifi +nifi-pulsar-client-service-api +1.8.0-SNAPSHOT +provided + + +org.apache.pulsar +pulsar-client +2.0.0-rc1-incubating --- End diff -- can we set the pulsar version as a property in the root pom of the bundle and reference that version? Also upgrade to 2.1.1 if possible? ---
[GitHub] nifi pull request #2882: NIFI-4914
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r218851139 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/.gitignore --- @@ -0,0 +1 @@ +/target/ --- End diff -- same here ---
[GitHub] nifi pull request #2882: NIFI-4914
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r218857914 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-nar/.gitignore --- @@ -0,0 +1,2 @@ +/target/ --- End diff -- same comment here ---
[GitHub] nifi pull request #2882: NIFI-4914
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r218860810 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java --- @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements.See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientService; +import org.apache.nifi.pulsar.cache.LRUCache; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +public abstract class AbstractPulsarConsumerProcessor extends AbstractProcessor { + +static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); +static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); +static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " ++ "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages"); + +static final AllowableValue CONSUME = new AllowableValue(ConsumerCryptoFailureAction.CONSUME.name(), "Consume", +"Mark the message as consumed despite being unable to decrypt the contents"); +static final AllowableValue DISCARD = new AllowableValue(ConsumerCryptoFailureAction.DISCARD.name(), "Discard", +"Discard the message and don't perform any addtional processing on the message"); +static final AllowableValue FAIL = new AllowableValue(ConsumerCryptoFailureAction.FAIL.name(), "Fail", +"Report a failure condition, and the route the message contents to the FAILED relationship."); + +public static final Relationship REL_SUCCESS = new Relationship.Builder() +.name("success") +.description("FlowFiles for which all content was consumed from Pulsar.") +.build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder() --- End diff -- If consumer does not allow input - in what case do we route flow files in that relationship and what will be the content/attributes of that flow file? ---
[GitHub] nifi pull request #2882: NIFI-4914
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r218850426 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/pom.xml --- @@ -0,0 +1,40 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + +org.apache.nifi +nifi-pulsar-bundle +1.8.0-SNAPSHOT + + +nifi-pulsar-client-service-api +jar + + + +org.apache.nifi +nifi-api +provided + + +org.apache.pulsar +pulsar-client +2.0.1-incubating --- End diff -- 2.1.1-incubating has been released 2 days ago - should be available in mvn repo shortly ---
[GitHub] nifi pull request #2882: NIFI-4914
Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r218849480 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/.gitignore --- @@ -0,0 +1 @@ +/target/ --- End diff -- we probably don't want that file, no? ---
[GitHub] nifi pull request #2882: NIFI-4914
Github user david-streamlio commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r207644839 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +public class StandardPulsarClientService extends AbstractControllerService implements PulsarClientService { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.build(); + +public static final PropertyDescriptor ALLOW_TLS_INSECURE_CONNECTION = new PropertyDescriptor.Builder() +.name("Allow TLS insecure connection") +.displayName("Allow TLS insecure connection") +.description("Whether the Pulsar client will accept untrusted TLS certificate from broker or not.") +.required(false) +.allowableValues("true", "false") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.defaultValue("false") +.build(); + +public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() +.name("Maximum concurrent lookup-requests") +.description("Number of concurrent lookup-requests allowed on each broker-connection.") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.defaultValue("5000") +.build(); + +public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder() +.name("Maximum connects per Pulsar broker") +.description("Sets the max number of connection that the client library will open to a single broker.\n" + +"By default, the connection pool will use a single connection for all the producers and consumers. " + +"Increasing this parameter may improve throughput when using many producers over a high latency connection") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.defaultValue("1") +.build(); +
[GitHub] nifi pull request #2882: NIFI-4914
Github user david-streamlio commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r207643777 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +public class StandardPulsarClientService extends AbstractControllerService implements PulsarClientService { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.build(); + +public static final PropertyDescriptor ALLOW_TLS_INSECURE_CONNECTION = new PropertyDescriptor.Builder() --- End diff -- This property allows the Pulsar client to accept an untrusted TLS certificate from broker, which may be an edge use case, but is still a situation that the Pulsar client API exposes, and I want to enable users to configure the client as they would in any other situation. ---
[GitHub] nifi pull request #2882: NIFI-4914
Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r207611496 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +public class StandardPulsarClientService extends AbstractControllerService implements PulsarClientService { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.build(); + +public static final PropertyDescriptor ALLOW_TLS_INSECURE_CONNECTION = new PropertyDescriptor.Builder() +.name("Allow TLS insecure connection") +.displayName("Allow TLS insecure connection") +.description("Whether the Pulsar client will accept untrusted TLS certificate from broker or not.") +.required(false) +.allowableValues("true", "false") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.defaultValue("false") +.build(); + +public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() +.name("Maximum concurrent lookup-requests") +.description("Number of concurrent lookup-requests allowed on each broker-connection.") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.defaultValue("5000") +.build(); + +public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder() +.name("Maximum connects per Pulsar broker") +.description("Sets the max number of connection that the client library will open to a single broker.\n" + +"By default, the connection pool will use a single connection for all the producers and consumers. " + +"Increasing this parameter may improve throughput when using many producers over a high latency connection") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.defaultValue("1") +.build(); + +
[GitHub] nifi pull request #2882: NIFI-4914
Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r207611364 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +public class StandardPulsarClientService extends AbstractControllerService implements PulsarClientService { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.build(); + +public static final PropertyDescriptor ALLOW_TLS_INSECURE_CONNECTION = new PropertyDescriptor.Builder() --- End diff -- This probably should not be an option and the security context service should encapsulate such things. ---