[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user david-streamlio closed the pull request at: https://github.com/apache/nifi/pull/2553 ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user david-streamlio commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174898281 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.pulsar.pool.ResourcePoolImpl; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +@Tags({ "Pulsar"}) +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.") +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() +.name("Maximum concurrent lookup-requests") +.description("Number of concurrent lookup-requests allowed on each broker-connection to prevent " ++ "overload on broker. (default: 5000) It should be configured with higher value only in case " ++ "of it requires to produce/subscribe on thousands of topics") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("5000") +.build(); + +public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder() +.name("Maximum connects per Pulsar broker") +.description("Sets the max number of connection that the client library will open to a single broker.\n" + +"By default, the connection pool will use a single connection for all the producers and consumers. " + +"Increasing this parameter may improve throughput when using many producers over a high latency connection") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("1") +.build(); + +public static final PropertyDescriptor IO_THREADS = new PropertyDescriptor.Builder() +.name("I/O Threads") +.description("The number of threads to be used for handling connections to brokers (default: 1 thread)") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174883410 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarConsumer; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor { + +static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); +static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); +static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " ++ "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages"); + +protected static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() +.name("topic") +.displayName("Topic Name") +.description("The name of the Pulsar Topic.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder() +.name("Subscription") +.displayName("Subscription Name") +.description("The name of the Pulsar subscription to consume from.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174881995 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarConsumer; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor { + +static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); +static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); +static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " ++ "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages"); + +protected static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() +.name("topic") +.displayName("Topic Name") +.description("The name of the Pulsar Topic.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder() +.name("Subscription") +.displayName("Subscription Name") +.description("The name of the Pulsar subscription to consume from.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174874403 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/pool/ResourcePoolImpl.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar.pool; + +import java.util.Iterator; +import java.util.Properties; +import java.util.Vector; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + + +public class ResourcePoolImpl implements ResourcePool { + +private final Lock lock = new ReentrantLock(); +private final Condition poolAvailable = lock.newCondition(); +private int max_resources; +private final Vector pool; + +private final ResourceExceptionHandler resourceExceptionHandler; +private final ResourceFactory factory; + +public ResourcePoolImpl(ResourceFactory factory, int max_resources) { +this(factory, new ResourceExceptionHandlerImpl(), max_resources); +} + +public ResourcePoolImpl(ResourceFactory factory, ResourceExceptionHandler handler, int max_resources) { +lock.lock(); +try { +this.factory = factory; +this.resourceExceptionHandler = handler; +this.max_resources = max_resources; +this.pool = new Vector(max_resources); +} finally { +lock.unlock(); +} +} + +private R createResource(Properties props) { +R resource = null; +try { + +resource = factory.create(props); + +if (resource == null) --- End diff -- Curly brackets. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174881541 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarConsumer; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor { + +static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); +static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); +static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " ++ "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages"); + +protected static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() +.name("topic") +.displayName("Topic Name") +.description("The name of the Pulsar Topic.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder() +.name("Subscription") +.displayName("Subscription Name") +.description("The name of the Pulsar subscription to consume from.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174881132 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarConsumer; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor { + +static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); +static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); +static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " ++ "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages"); + +protected static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() +.name("topic") +.displayName("Topic Name") +.description("The name of the Pulsar Topic.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder() +.name("Subscription") +.displayName("Subscription Name") +.description("The name of the Pulsar subscription to consume from.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174873036 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.pulsar.pool.ResourcePoolImpl; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +@Tags({ "Pulsar"}) +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.") +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() +.name("Maximum concurrent lookup-requests") +.description("Number of concurrent lookup-requests allowed on each broker-connection to prevent " ++ "overload on broker. (default: 5000) It should be configured with higher value only in case " ++ "of it requires to produce/subscribe on thousands of topics") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("5000") +.build(); + +public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder() +.name("Maximum connects per Pulsar broker") +.description("Sets the max number of connection that the client library will open to a single broker.\n" + +"By default, the connection pool will use a single connection for all the producers and consumers. " + +"Increasing this parameter may improve throughput when using many producers over a high latency connection") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("1") +.build(); + +public static final PropertyDescriptor IO_THREADS = new PropertyDescriptor.Builder() +.name("I/O Threads") +.description("The number of threads to be used for handling connections to brokers (default: 1 thread)") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174880893 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarConsumer; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor { + +static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); +static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); +static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " ++ "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages"); + +protected static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() +.name("topic") +.displayName("Topic Name") +.description("The name of the Pulsar Topic.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder() +.name("Subscription") +.displayName("Subscription Name") +.description("The name of the Pulsar subscription to consume from.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174872527 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.pulsar.pool.ResourcePoolImpl; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +@Tags({ "Pulsar"}) +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.") +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() +.name("Maximum concurrent lookup-requests") +.description("Number of concurrent lookup-requests allowed on each broker-connection to prevent " ++ "overload on broker. (default: 5000) It should be configured with higher value only in case " ++ "of it requires to produce/subscribe on thousands of topics") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("5000") +.build(); + +public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder() +.name("Maximum connects per Pulsar broker") +.description("Sets the max number of connection that the client library will open to a single broker.\n" + +"By default, the connection pool will use a single connection for all the producers and consumers. " + +"Increasing this parameter may improve throughput when using many producers over a high latency connection") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("1") +.build(); + +public static final PropertyDescriptor IO_THREADS = new PropertyDescriptor.Builder() +.name("I/O Threads") +.description("The number of threads to be used for handling connections to brokers (default: 1 thread)") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174878320 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/test/java/org/apache/nifi/pulsar/TestStandardPulsarClientService.java --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class TestStandardPulsarClientService { + +@Before +public void init() { --- End diff -- Looks like you can delete this. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174882988 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarConsumer; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor { + +static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); +static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); +static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " ++ "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages"); + +protected static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() +.name("topic") +.displayName("Topic Name") +.description("The name of the Pulsar Topic.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder() +.name("Subscription") +.displayName("Subscription Name") +.description("The name of the Pulsar subscription to consume from.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174884957 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/PublishPulsar_1_0.java --- @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarProducer; +import org.apache.nifi.pulsar.cache.LRUCache; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConfiguration; +import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; +import org.apache.pulsar.client.api.PulsarClientException; + +@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"}) +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Pulsar using the Pulsar 1.21 Producer API." ++ "The messages to send may be individual FlowFiles or may be delimited, using a " ++ "user-specified delimiter, such as a new-line. " ++ "The complementary NiFi processor for fetching messages is ConsumePulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to " ++ "FlowFiles that are routed to success.") +public class PublishPulsar_1_0 extends AbstractPulsarProcessor { + +protected static final String MSG_COUNT = "msg.count"; + +static final AllowableValue COMPRESSION_TYPE_NONE = new AllowableValue("NONE", "None", "No compression"); +static final AllowableValue COMPRESSION_TYPE_LZ4 = new AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm."); +static final AllowableValue COMPRESSION_TYPE_ZLIB = new AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm"); + +static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a custom partition"); +static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION = new AllowableValue("RoundRobinPartition", "Round Robin Partition", "Route messages to all " + + "partitions in a round robin manner"); +static final AllowableValue
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174883778 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/PublishPulsar_1_0.java --- @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarProducer; +import org.apache.nifi.pulsar.cache.LRUCache; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConfiguration; +import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; +import org.apache.pulsar.client.api.PulsarClientException; + +@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"}) +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Pulsar using the Pulsar 1.21 Producer API." ++ "The messages to send may be individual FlowFiles or may be delimited, using a " ++ "user-specified delimiter, such as a new-line. " ++ "The complementary NiFi processor for fetching messages is ConsumePulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to " ++ "FlowFiles that are routed to success.") +public class PublishPulsar_1_0 extends AbstractPulsarProcessor { --- End diff -- Consider changing it to `PublishPulsar_1_X`. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174884531 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/PublishPulsar_1_0.java --- @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarProducer; +import org.apache.nifi.pulsar.cache.LRUCache; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConfiguration; +import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; +import org.apache.pulsar.client.api.PulsarClientException; + +@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"}) +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Pulsar using the Pulsar 1.21 Producer API." ++ "The messages to send may be individual FlowFiles or may be delimited, using a " ++ "user-specified delimiter, such as a new-line. " ++ "The complementary NiFi processor for fetching messages is ConsumePulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to " ++ "FlowFiles that are routed to success.") +public class PublishPulsar_1_0 extends AbstractPulsarProcessor { + +protected static final String MSG_COUNT = "msg.count"; + +static final AllowableValue COMPRESSION_TYPE_NONE = new AllowableValue("NONE", "None", "No compression"); +static final AllowableValue COMPRESSION_TYPE_LZ4 = new AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm."); +static final AllowableValue COMPRESSION_TYPE_ZLIB = new AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm"); + +static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a custom partition"); +static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION = new AllowableValue("RoundRobinPartition", "Round Robin Partition", "Route messages to all " + + "partitions in a round robin manner"); +static final AllowableValue
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174879831 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarConsumer; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor { --- End diff -- Also note that as it is an incubator project and someone really doesn't want to track 1.X going forward AND complains that you broke compatibility for them by staying up to date with 1.X, that's on them. Incubator projects are by definition moving targets and should be handled that way during risk assessment by teams using them. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174872630 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.pulsar.pool.ResourcePoolImpl; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +@Tags({ "Pulsar"}) +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.") +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() +.name("Maximum concurrent lookup-requests") +.description("Number of concurrent lookup-requests allowed on each broker-connection to prevent " ++ "overload on broker. (default: 5000) It should be configured with higher value only in case " ++ "of it requires to produce/subscribe on thousands of topics") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("5000") +.build(); + +public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder() +.name("Maximum connects per Pulsar broker") +.description("Sets the max number of connection that the client library will open to a single broker.\n" + +"By default, the connection pool will use a single connection for all the producers and consumers. " + +"Increasing this parameter may improve throughput when using many producers over a high latency connection") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("1") +.build(); + +public static final PropertyDescriptor IO_THREADS = new PropertyDescriptor.Builder() +.name("I/O Threads") +.description("The number of threads to be used for handling connections to brokers (default: 1 thread)") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174874448 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/pool/ResourcePoolImpl.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar.pool; + +import java.util.Iterator; +import java.util.Properties; +import java.util.Vector; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + + +public class ResourcePoolImpl implements ResourcePool { + +private final Lock lock = new ReentrantLock(); +private final Condition poolAvailable = lock.newCondition(); +private int max_resources; +private final Vector pool; + +private final ResourceExceptionHandler resourceExceptionHandler; +private final ResourceFactory factory; + +public ResourcePoolImpl(ResourceFactory factory, int max_resources) { +this(factory, new ResourceExceptionHandlerImpl(), max_resources); +} + +public ResourcePoolImpl(ResourceFactory factory, ResourceExceptionHandler handler, int max_resources) { +lock.lock(); +try { +this.factory = factory; +this.resourceExceptionHandler = handler; +this.max_resources = max_resources; +this.pool = new Vector(max_resources); +} finally { +lock.unlock(); +} +} + +private R createResource(Properties props) { +R resource = null; +try { + +resource = factory.create(props); --- End diff -- There's some extraneous white space around this. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174884100 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/PublishPulsar_1_0.java --- @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarProducer; +import org.apache.nifi.pulsar.cache.LRUCache; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConfiguration; +import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; +import org.apache.pulsar.client.api.PulsarClientException; + +@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"}) +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Pulsar using the Pulsar 1.21 Producer API." ++ "The messages to send may be individual FlowFiles or may be delimited, using a " ++ "user-specified delimiter, such as a new-line. " ++ "The complementary NiFi processor for fetching messages is ConsumePulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to " ++ "FlowFiles that are routed to success.") +public class PublishPulsar_1_0 extends AbstractPulsarProcessor { + +protected static final String MSG_COUNT = "msg.count"; + +static final AllowableValue COMPRESSION_TYPE_NONE = new AllowableValue("NONE", "None", "No compression"); +static final AllowableValue COMPRESSION_TYPE_LZ4 = new AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm."); +static final AllowableValue COMPRESSION_TYPE_ZLIB = new AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm"); + +static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a custom partition"); +static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION = new AllowableValue("RoundRobinPartition", "Round Robin Partition", "Route messages to all " + + "partitions in a round robin manner"); +static final AllowableValue
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174878122 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/pool/ResourcePoolImpl.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar.pool; + +import java.util.Iterator; +import java.util.Properties; +import java.util.Vector; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + + +public class ResourcePoolImpl implements ResourcePool { + +private final Lock lock = new ReentrantLock(); +private final Condition poolAvailable = lock.newCondition(); +private int max_resources; +private final Vector pool; + +private final ResourceExceptionHandler resourceExceptionHandler; +private final ResourceFactory factory; + +public ResourcePoolImpl(ResourceFactory factory, int max_resources) { +this(factory, new ResourceExceptionHandlerImpl(), max_resources); +} + +public ResourcePoolImpl(ResourceFactory factory, ResourceExceptionHandler handler, int max_resources) { +lock.lock(); +try { +this.factory = factory; +this.resourceExceptionHandler = handler; +this.max_resources = max_resources; +this.pool = new Vector(max_resources); +} finally { +lock.unlock(); +} +} + +private R createResource(Properties props) { +R resource = null; +try { + +resource = factory.create(props); + +if (resource == null) +throw new ResourceCreationException("Unable to create resource"); + +} catch (Exception e) { +resourceExceptionHandler.handle(e); +} +return resource; +} + + +/* + * Shutdown the pool and release the resources + */ +public void close() { + +Iterator itr = pool.iterator(); +while (itr.hasNext()) { +itr.next().close(); +} + +} + +public boolean isEmpty() { +return (pool.isEmpty()); +} + +public boolean isFull() { +return (pool != null && pool.size() == max_resources); +} + +@Override +public R acquire(Properties props) throws InterruptedException { +lock.lock(); +try { +while (max_resources <= 0) { +poolAvailable.await(); +} + +--max_resources; + +if (pool != null) { +int size = pool.size(); +if (size > 0) +return pool.remove(size - 1); +} +return createResource(props); +} finally { +lock.unlock(); +} +} + +@Override +public void evict(R resource) { +lock.lock(); +try { + +// Attempt to close the connection +if (!resource.isClosed()) --- End diff -- Curly brackets. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174879028 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/pom.xml --- @@ -0,0 +1,78 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + +org.apache.nifi +nifi-pulsar-bundle +1.6.0-SNAPSHOT + + +nifi-pulsar-processors +jar + + + +org.apache.nifi +nifi-api + + +org.apache.nifi +nifi-record-serialization-service-api + + +org.apache.nifi +nifi-record + + +org.apache.nifi +nifi-utils +1.6.0-SNAPSHOT + + +org.apache.nifi +nifi-ssl-context-service-api + + +org.apache.nifi +nifi-pulsar-client-service-api +1.6.0-SNAPSHOT +provided + + + org.apache.pulsar --- End diff -- Broken indentation level. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174872872 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.pulsar.pool.ResourcePoolImpl; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +@Tags({ "Pulsar"}) +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.") +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() +.name("Maximum concurrent lookup-requests") +.description("Number of concurrent lookup-requests allowed on each broker-connection to prevent " ++ "overload on broker. (default: 5000) It should be configured with higher value only in case " ++ "of it requires to produce/subscribe on thousands of topics") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("5000") +.build(); + +public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder() +.name("Maximum connects per Pulsar broker") +.description("Sets the max number of connection that the client library will open to a single broker.\n" + +"By default, the connection pool will use a single connection for all the producers and consumers. " + +"Increasing this parameter may improve throughput when using many producers over a high latency connection") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("1") +.build(); + +public static final PropertyDescriptor IO_THREADS = new PropertyDescriptor.Builder() +.name("I/O Threads") +.description("The number of threads to be used for handling connections to brokers (default: 1 thread)") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174879478 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarConsumer; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor { --- End diff -- You should consider changing this to `ConsumePulsar_1_X` to warn users that you may be moving the internal client compatibility forward if let's say 1.4 breaks compatibility with the current 1.2 branch in the incubator. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174866298 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/PoolableResource.java --- @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar.pool; + +public interface PoolableResource { + +public void close(); --- End diff -- Javadoc. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174882429 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarConsumer; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor { + +static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); +static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); +static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " ++ "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages"); + +protected static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() +.name("topic") +.displayName("Topic Name") +.description("The name of the Pulsar Topic.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder() +.name("Subscription") +.displayName("Subscription Name") +.description("The name of the Pulsar subscription to consume from.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174882522 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/ConsumePulsar_1_0.java --- @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarConsumer; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_0 extends AbstractPulsarProcessor { + +static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); +static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); +static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " ++ "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages"); + +protected static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() +.name("topic") +.displayName("Topic Name") +.description("The name of the Pulsar Topic.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +static final PropertyDescriptor SUBSCRIPTION = new PropertyDescriptor.Builder() +.name("Subscription") +.displayName("Subscription Name") +.description("The name of the Pulsar subscription to consume from.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +.build(); + +public static final
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174884863 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/PublishPulsar_1_0.java --- @@ -0,0 +1,373 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarProducer; +import org.apache.nifi.pulsar.cache.LRUCache; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConfiguration; +import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; +import org.apache.pulsar.client.api.PulsarClientException; + +@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"}) +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Pulsar using the Pulsar 1.21 Producer API." ++ "The messages to send may be individual FlowFiles or may be delimited, using a " ++ "user-specified delimiter, such as a new-line. " ++ "The complementary NiFi processor for fetching messages is ConsumePulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to " ++ "FlowFiles that are routed to success.") +public class PublishPulsar_1_0 extends AbstractPulsarProcessor { + +protected static final String MSG_COUNT = "msg.count"; + +static final AllowableValue COMPRESSION_TYPE_NONE = new AllowableValue("NONE", "None", "No compression"); +static final AllowableValue COMPRESSION_TYPE_LZ4 = new AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm."); +static final AllowableValue COMPRESSION_TYPE_ZLIB = new AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm"); + +static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a custom partition"); +static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION = new AllowableValue("RoundRobinPartition", "Round Robin Partition", "Route messages to all " + + "partitions in a round robin manner"); +static final AllowableValue
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174868443 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/ResourceFactory.java --- @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar.pool; + +import java.util.Properties; + +public interface ResourceFactory { + +public R create(Properties props) throws ResourceCreationException; --- End diff -- Javadoc ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174874129 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java --- @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.pulsar.pool.ResourcePoolImpl; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +@Tags({ "Pulsar"}) +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.") +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() +.name("Maximum concurrent lookup-requests") +.description("Number of concurrent lookup-requests allowed on each broker-connection to prevent " ++ "overload on broker. (default: 5000) It should be configured with higher value only in case " ++ "of it requires to produce/subscribe on thousands of topics") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("5000") +.build(); + +public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder() +.name("Maximum connects per Pulsar broker") +.description("Sets the max number of connection that the client library will open to a single broker.\n" + +"By default, the connection pool will use a single connection for all the producers and consumers. " + +"Increasing this parameter may improve throughput when using many producers over a high latency connection") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("1") +.build(); + +public static final PropertyDescriptor IO_THREADS = new PropertyDescriptor.Builder() +.name("I/O Threads") +.description("The number of threads to be used for handling connections to brokers (default: 1 thread)") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174878824 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/test/java/org/apache/nifi/pulsar/TestStandardPulsarClientService.java --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class TestStandardPulsarClientService { + +@Before +public void init() { + +} + +@Test +public void testService() throws InitializationException { +final TestRunner runner = TestRunners.newTestRunner(TestProcessor.class); +final PulsarClientPool service = new StandardPulsarClientPool(); +runner.addControllerService("test-good", service); + +runner.setProperty(service, StandardPulsarClientPool.PULSAR_SERVICE_URL, "localhost:6667"); +// runner.enableControllerService(service); --- End diff -- I think you might actually needs this. If that's not the case, it should be removed. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174865149 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/pom.xml --- @@ -0,0 +1,40 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + +org.apache.nifi +nifi-pulsar-bundle +1.6.0-SNAPSHOT + + +nifi-pulsar-client-service-api +jar + + + +org.apache.nifi +nifi-api +provided + + + org.apache.pulsar --- End diff -- Nit: indent level is broken here. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174866349 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/PoolableResource.java --- @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar.pool; + +public interface PoolableResource { + +public void close(); + +public boolean isClosed(); --- End diff -- Javadoc ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174866791 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/ResourceExceptionHandler.java --- @@ -0,0 +1,23 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar.pool; + +public interface ResourceExceptionHandler { + +void handle(Exception exc); --- End diff -- Javadoc ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174865317 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientPool.java --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.pulsar.pool.ResourcePool; + + +@Tags({"Pulsar"}) +@CapabilityDescription("Provides the ability to create Pulsar Producer / Consumer instances on demand, based on the configuration." + + "properties defined") +public interface PulsarClientPool extends ControllerService { + +public ResourcePool getProducerPool(); --- End diff -- There should be a basic javadoc here. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2553#discussion_r174865373 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientPool.java --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.pulsar.pool.ResourcePool; + + +@Tags({"Pulsar"}) +@CapabilityDescription("Provides the ability to create Pulsar Producer / Consumer instances on demand, based on the configuration." + + "properties defined") +public interface PulsarClientPool extends ControllerService { + +public ResourcePool getProducerPool(); + +public ResourcePool getConsumerPool(); --- End diff -- Same here. ---
[GitHub] nifi pull request #2553: Nifi 4908 rebase
GitHub user david-streamlio opened a pull request: https://github.com/apache/nifi/pull/2553 Nifi 4908 rebase Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/david-streamlio/nifi NIFI-4908-rebase Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2553.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2553 commit e4f8550159132d85d0e19d61f24e087e51742dee Author: David KjerrumgaardDate: 2018-02-24T01:40:22Z Added Pulsar processors and Controller Service commit b8b9601f2469df811b59b1019f0b7052dff9abd3 Author: David Kjerrumgaard Date: 2018-02-26T19:19:26Z Updated all nifi component version references to 1.6.0-SNAPSHOT commit 2ebdc06394d3d6f5fa71ec07fc5d086e81f4e632 Author: David Kjerrumgaard Date: 2018-02-26T19:48:56Z Merge remote-tracking branch 'upstream/master' into NIFI-4908 commit d4e82ecc49e62596264db206f26b3b6a93f4b021 Author: David Kjerrumgaard Date: 2018-02-28T18:18:35Z Refactored Processors to indicate the supported version of Pulsar, i.e _1_0 commit 8b24b2496d404a65e6427273395ae73e558a5531 Author: David Kjerrumgaard Date: 2018-02-24T01:40:22Z Added Pulsar processors and Controller Service commit 29ca7f2b653503de9ad13da9f6c49cc2f311eaae Author: David Kjerrumgaard Date: 2018-02-26T19:19:26Z Updated all nifi component version references to 1.6.0-SNAPSHOT commit 3e30c9530ab0cd844fb900607dd00ff1a4014221 Author: David Kjerrumgaard Date: 2018-02-28T18:18:35Z Refactored Processors to indicate the supported version of Pulsar, i.e _1_0 commit 20b451d66ba6f7bbcc057f752a25101e33580469 Author: David Kjerrumgaard Date: 2018-02-24T01:40:22Z Added Pulsar processors and Controller Service commit 475491195d2f1fc2c24a4f403d6fc6cfd30982b0 Author: David Kjerrumgaard Date: 2018-02-26T19:19:26Z Updated all nifi component version references to 1.6.0-SNAPSHOT commit 046e28703914f66efbea27a6112bc88b33750bdf Author: David Kjerrumgaard Date: 2018-02-28T18:18:35Z Refactored Processors to indicate the supported version of Pulsar, i.e _1_0 commit f36b0b61680c55a5ac0279a44cdc580b42f20118 Author: David Kjerrumgaard Date: 2018-03-10T03:01:06Z Combined Pulsar components into a single bundle commit 7de9f99be1724178e768e89737a1b2d57e45f08b Author: David Kjerrumgaard Date: 2018-03-10T05:48:25Z Merged with HEAD commit 03e8ef2a19381cd3249eb45c471c36b4ae265566 Author: David Kjerrumgaard Date: 2018-03-10T06:26:46Z Fixed parent.relativePath in POM files commit e61bc9b559cac45200159efa6d875d68a5713cfa Author: David Kjerrumgaard Date: 2018-03-10T06:29:43Z Added nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-nar/ module commit 27d3f86315dd91fc8e61d4399caa289304bf5fd2 Author: David Kjerrumgaard Date: 2018-03-10T06:41:34Z Added dependency versions that