[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693755#comment-16693755 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on the issue: https://github.com/apache/nifi/pull/3178 @joewitt What was the command you used to generate the error above? I want to attempt to reproduce it locally if possible. The basic issue is that since this test message was sent asynchronously, there is a lag between when it is processed (and throws the error), moved to the failureQueue, and then eventually routed to the FAILURE relationship. For my local tests, I had to introduce a lag in the process by setting the number of iterations on the run command to 100. Apparently, this value needs to be larger for a parallel build, so I will increase the number of iterations to 5K. But I would like to test it using the values you used to ensure that value is sufficient > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693683#comment-16693683 ] ASF GitHub Bot commented on NIFI-4914: -- Github user joewitt commented on the issue: https://github.com/apache/nifi/pull/3178 the output log had "14:11:44.805 [pool-67-thread-1] ERROR org.apache.nifi.processors.pulsar.pubsub.PublishPulsarRecord - PublishPulsarRecord[id=9c298cea-0e2a-4a59-8864-92e95563d7f4] Unable to publish to topic " not sure if that is from the same test or not > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693679#comment-16693679 ] ASF GitHub Bot commented on NIFI-4914: -- Github user joewitt commented on the issue: https://github.com/apache/nifi/pull/3178 build on large machine now has a unit test failure [ERROR] Failures: [ERROR] TestAsyncPublishPulsarRecord.pulsarClientExceptionTest:56 expected:<1> but was:<0> [INFO] [ERROR] Tests run: 70, Failures: 1, Errors: 0, Skipped: 0 > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693652#comment-16693652 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on a diff in the pull request: https://github.com/apache/nifi/pull/3178#discussion_r235123524 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +public class StandardPulsarClientService extends AbstractControllerService implements PulsarClientService { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor.Builder() +.name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.build(); + +public static final PropertyDescriptor ACCEPT_UNTRUSTED_TLS_CERTIFICATE_FROM_BROKER = new PropertyDescriptor.Builder() +.name("ACCEPT_UNTRUSTED_TLS_CERTIFICATE_FROM_BROKER") +.displayName("Allow TLS insecure connection") +.description("If a valid trusted certificate is provided in the 'TLS Trust Certs File Path' property of this controller service," ++ " then, by default, all communication between this controller service and the Apache Pulsar broker will be secured via" ++ " TLS and only use the trusted TLS certificate from broker. Setting this property to 'false' will allow this controller" ++ " service to accept an untrusted TLS certificate from broker as well. This property should only be set to false if you trust" ++ " the broker you are connecting to, but do not have access to the TLS certificate file.") +.required(false) +.allowableValues("true", "false") +.defaultValue("false") +.build(); + +public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() +.name("CONCURRENT_LOOKUP_REQUESTS") +.displayName("Maximum concurrent lookup-requests") +.description("Number of concurrent lookup-requests allowed on each broker-connection.") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693651#comment-16693651 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on a diff in the pull request: https://github.com/apache/nifi/pull/3178#discussion_r235123261 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarProducerProcessor.java --- @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientService; +import org.apache.nifi.pulsar.cache.LRUCache; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.shade.org.apache.commons.collections.CollectionUtils; + +public abstract class AbstractPulsarProducerProcessor extends AbstractProcessor { + +public static final String MSG_COUNT = "msg.count"; +public static final String TOPIC_NAME = "topic.name"; + +static final AllowableValue COMPRESSION_TYPE_NONE = new AllowableValue("NONE", "None", "No compression"); +static final AllowableValue COMPRESSION_TYPE_LZ4 = new AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm."); +static final AllowableValue COMPRESSION_TYPE_ZLIB = new AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm"); + +static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a custom partition"); +static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION = new AllowableValue("RoundRobinPartition", "Round Robin Partition", "Route messages to all " + + "partitions in a round robin manner"); +static final AllowableValue MESSAGE_ROUTING_MODE_SINGLE_PARTITION = new AllowableValue("SinglePartition", "Single Partition", "Route messages to a single partition"); + +public static final Relationship REL_SUCCESS = new Relationship.Builder() +.name("success") +.description("FlowFiles for which all content was sent to Pulsar.") +.build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder() +.name("failure") +.description("Any FlowFile that cannot be sent to
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693593#comment-16693593 ] ASF GitHub Bot commented on NIFI-4914: -- Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/3178#discussion_r235107847 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar.java --- @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +@SeeAlso({PublishPulsar.class, ConsumePulsarRecord.class, PublishPulsarRecord.class}) +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar. The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar extends AbstractPulsarConsumerProcessor { + +@Override +public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { +try { +Consumer consumer = getConsumer(context, getConsumerId(context, session.get())); + +if (consumer == null) { +context.yield(); +return; +} + +if (context.getProperty(ASYNC_ENABLED).asBoolean()) { +consumeAsync(consumer, context, session); +handleAsync(consumer, context, session); +} else { +consume(consumer, context, session); +} +} catch (PulsarClientException e) { +getLogger().error("Unable to consume from Pulsar Topic ", e); +context.yield(); +throw new ProcessException(e); +} +} + +private void handleAsync(final Consumer consumer, ProcessContext context, ProcessSession session) { +try { +Future> done = getConsumerService().poll(50, TimeUnit.MILLISECONDS); + +if (done != null) { + Message msg = done.get(); + + if (msg != null) { + FlowFile flowFile = null; + final byte[] value = msg.getData(); + if (value != null && value.length > 0) { + flowFile = session.create(); + flowFile = session.write(flowFile, out -> { + out.write(value); + }); + + session.getProvenanceReporter().receive(flowFile, getPulsarClientService().getPulsarBrokerRootURL() + "/" + consumer.getTopic()); + session.transfer(flowFile, REL_SUCCESS); + session.commit(); + } + // Acknowledge consuming the message + getAckService().submit(new Callable() { + @Override +
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693588#comment-16693588 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on a diff in the pull request: https://github.com/apache/nifi/pull/3178#discussion_r235107339 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/cache/LRUCache.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar.cache; + +import java.io.Closeable; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +public class LRUCache { --- End diff -- @joewitt The LRU Cache code wasn't copied from anywhere, and is just my naive implementation of the LRU Cache. Most java-based examples I see utilize a LinkedHashMap and I do not. I do appreciate your need to ensure 100% Apache compliance with the code, so I can provide an implementation of the LRU cache based on the https://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/map/LRUMap.html class instead, if you think that would be better/safer from a licensing perspective > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693595#comment-16693595 ] ASF GitHub Bot commented on NIFI-4914: -- Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/3178#discussion_r235108402 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord.java --- @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +@CapabilityDescription("Consumes messages from Apache Pulsar. " ++ "The complementary NiFi processor for sending messages is PublishPulsarRecord. Please note that, at this time, " ++ "the Processor assumes that all records that are retrieved have the same schema. If any of the Pulsar messages " ++ "that are pulled but cannot be parsed or written with the configured Record Reader or Record Writer, the contents " ++ "of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the 'parse.failure' " ++ "relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual " ++ "messages within the single FlowFile. A 'record.count' attribute is added to indicate how many messages are contained in the " ++ "FlowFile. No two Pulsar messages will be placed into the same FlowFile if they have different schemas.") +@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@WritesAttributes({ +@WritesAttribute(attribute = "record.count", description = "The number of records received") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@SeeAlso({PublishPulsar.class,
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693590#comment-16693590 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on a diff in the pull request: https://github.com/apache/nifi/pull/3178#discussion_r235107594 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +public class StandardPulsarClientService extends AbstractControllerService implements PulsarClientService { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor.Builder() +.name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.build(); + +public static final PropertyDescriptor ACCEPT_UNTRUSTED_TLS_CERTIFICATE_FROM_BROKER = new PropertyDescriptor.Builder() --- End diff -- Ok, I will remove that property > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693586#comment-16693586 ] ASF GitHub Bot commented on NIFI-4914: -- Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/3178#discussion_r235106784 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarProducerProcessor.java --- @@ -0,0 +1,472 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientService; +import org.apache.nifi.pulsar.cache.LRUCache; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.MessageRoutingMode; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.shade.org.apache.commons.collections.CollectionUtils; + +public abstract class AbstractPulsarProducerProcessor extends AbstractProcessor { + +public static final String MSG_COUNT = "msg.count"; +public static final String TOPIC_NAME = "topic.name"; + +static final AllowableValue COMPRESSION_TYPE_NONE = new AllowableValue("NONE", "None", "No compression"); +static final AllowableValue COMPRESSION_TYPE_LZ4 = new AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm."); +static final AllowableValue COMPRESSION_TYPE_ZLIB = new AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm"); + +static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a custom partition"); +static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION = new AllowableValue("RoundRobinPartition", "Round Robin Partition", "Route messages to all " + + "partitions in a round robin manner"); +static final AllowableValue MESSAGE_ROUTING_MODE_SINGLE_PARTITION = new AllowableValue("SinglePartition", "Single Partition", "Route messages to a single partition"); + +public static final Relationship REL_SUCCESS = new Relationship.Builder() +.name("success") +.description("FlowFiles for which all content was sent to Pulsar.") +.build(); + +public static final Relationship REL_FAILURE = new Relationship.Builder() +.name("failure") +.description("Any FlowFile that cannot be sent to Pulsar will
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693566#comment-16693566 ] ASF GitHub Bot commented on NIFI-4914: -- Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/3178#discussion_r235102831 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/pom.xml --- @@ -0,0 +1,84 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + +org.apache.nifi +nifi-pulsar-bundle +1.9.0-SNAPSHOT + + +nifi-pulsar-processors +jar + + + +org.apache.nifi +nifi-api + + +org.apache.nifi +nifi-record-serialization-service-api + + +org.apache.nifi +nifi-record + + +org.apache.nifi +nifi-utils +1.9.0-SNAPSHOT + + +org.apache.nifi +nifi-ssl-context-service-api + + +org.apache.nifi +nifi-pulsar-client-service-api +1.9.0-SNAPSHOT +provided + + +org.apache.pulsar +pulsar-client +${pulsar.version} + + +org.apache.nifi +nifi-mock +test +1.9.0-SNAPSHOT + + +org.slf4j +slf4j-simple +test + + +junit +junit +test + + + org.apache.commons + commons-lang3 + 3.7 --- End diff -- this should move up to current release which I think is 3.8.1 > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693567#comment-16693567 ] ASF GitHub Bot commented on NIFI-4914: -- Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/3178#discussion_r235102894 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/pom.xml --- @@ -0,0 +1,84 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + +org.apache.nifi +nifi-pulsar-bundle +1.9.0-SNAPSHOT + + +nifi-pulsar-processors +jar + + + +org.apache.nifi +nifi-api + + +org.apache.nifi +nifi-record-serialization-service-api + + +org.apache.nifi +nifi-record + + +org.apache.nifi +nifi-utils +1.9.0-SNAPSHOT + + +org.apache.nifi +nifi-ssl-context-service-api + + +org.apache.nifi +nifi-pulsar-client-service-api +1.9.0-SNAPSHOT +provided + + +org.apache.pulsar +pulsar-client +${pulsar.version} + + +org.apache.nifi +nifi-mock +test +1.9.0-SNAPSHOT + + +org.slf4j +slf4j-simple +test + + +junit +junit +test + + + org.apache.commons + commons-lang3 + 3.7 --- End diff -- assuming we really need it > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693561#comment-16693561 ] ASF GitHub Bot commented on NIFI-4914: -- Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/3178#discussion_r235102049 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +public class StandardPulsarClientService extends AbstractControllerService implements PulsarClientService { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor.Builder() +.name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.build(); + +public static final PropertyDescriptor ACCEPT_UNTRUSTED_TLS_CERTIFICATE_FROM_BROKER = new PropertyDescriptor.Builder() +.name("ACCEPT_UNTRUSTED_TLS_CERTIFICATE_FROM_BROKER") +.displayName("Allow TLS insecure connection") +.description("If a valid trusted certificate is provided in the 'TLS Trust Certs File Path' property of this controller service," ++ " then, by default, all communication between this controller service and the Apache Pulsar broker will be secured via" ++ " TLS and only use the trusted TLS certificate from broker. Setting this property to 'false' will allow this controller" ++ " service to accept an untrusted TLS certificate from broker as well. This property should only be set to false if you trust" ++ " the broker you are connecting to, but do not have access to the TLS certificate file.") +.required(false) +.allowableValues("true", "false") +.defaultValue("false") +.build(); + +public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() +.name("CONCURRENT_LOOKUP_REQUESTS") +.displayName("Maximum concurrent lookup-requests") +.description("Number of concurrent lookup-requests allowed on each broker-connection.") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693557#comment-16693557 ] ASF GitHub Bot commented on NIFI-4914: -- Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/3178#discussion_r235101491 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java --- @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +public class StandardPulsarClientService extends AbstractControllerService implements PulsarClientService { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor.Builder() +.name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.build(); + +public static final PropertyDescriptor ACCEPT_UNTRUSTED_TLS_CERTIFICATE_FROM_BROKER = new PropertyDescriptor.Builder() --- End diff -- @david-streamlio this is the property i am saying we should eliminate. It's fine that pulsar supports this but we dont need to from nifi. We spend a ton of time/energy on security and we need to get more serious on it. I see no reason to keep this here. If we later find this to be a real problem then we could talk about ways to improve the situation by helping them generate proper certs/etc.. Also, I dont think we claim this is ok 'if you trust the broker you're connecting to' - the point is it could be any broker... This is inherently not a good thing to use. We should solve the problem. We can do that later. > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693545#comment-16693545 ] ASF GitHub Bot commented on NIFI-4914: -- Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/3178#discussion_r235099937 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/cache/LRUCache.java --- @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar.cache; + +import java.io.Closeable; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +public class LRUCache { --- End diff -- @david-streamlio can you confirm this is a uniquely written class for this purpose and not taken from elsewhere? I ask because I've had the displeasure of finding copied code before and it makes for some build/release messes and this is the type of thing that often hits that trigger. This is fresh/clean room stuff? > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693535#comment-16693535 ] ASF GitHub Bot commented on NIFI-4914: -- Github user joewitt commented on the issue: https://github.com/apache/nifi/pull/3178 cool thanks. can you please take a look at my comment regarding the security related property. > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693443#comment-16693443 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on the issue: https://github.com/apache/nifi/pull/3178 @joewitt I dug into the build logs and found that the issue with the parallel build was due to having the wrong version for both of the Pulsar NAR files defined in the NiFi-assembly/pom.xml file. Which I fixed and pushed in the second [commit](https://github.com/apache/nifi/pull/3178/commits/0c85ce122b1bf06149320a2475aa396838de9493). ` org.apache.nifi nifi-pulsar-client-service-nar 1.9.0-SNAPSHOT nar org.apache.nifi nifi-pulsar-nar 1.9.0-SNAPSHOT nar ` > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693424#comment-16693424 ] ASF GitHub Bot commented on NIFI-4914: -- Github user joewitt commented on the issue: https://github.com/apache/nifi/pull/3178 just did a full clean build with 6 threads on an older macbook and no problems. i'll try to reproduce my build issue from yesterday on a much higher thread build/machine and advise if issue i saw remains > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693412#comment-16693412 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on the issue: https://github.com/apache/nifi/pull/3178 @joewitt I have incorporated all of the changes / corrections that came from both your comments as well as @pvillard31's [changes](https://github.com/apache/nifi/pull/2882#issuecomment-431839080) from September 21st. I am more of an (git idiot) than a git expert so the last two times I tried to rebase, squash, etc. I ended up either in a state where the PR wouldn't merge, or I had brought in several hundred commits from other people, or the branch was rejected, etc. Since this module is self-contained, I found it easier to just start with a fresh branch off of the latest master, so I have been doing that for the 1.6.x, 1.7.x, 1.8.x, and now 1.9.x release. > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16693397#comment-16693397 ] ASF GitHub Bot commented on NIFI-4914: -- Github user joewitt commented on the issue: https://github.com/apache/nifi/pull/3178 @david-streamlio i had comments on the previous PR. Can you please review those/pull them forward to this. In the future you could rebase, squash, and force push the PR if you want to get back to a clean state but the history could all stay on the same PR. It helps both you and reviewer. I'd love to help you get this thing in but it is a bit elusive to get the timing right between contrib, review cycles, pr resets, etc.. > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692506#comment-16692506 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on the issue: https://github.com/apache/nifi/pull/2882 Created new PR after rebasing to NiFi 1.9.0-Snapshot branch. https://github.com/apache/nifi/pull/3178 > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692505#comment-16692505 ] ASF GitHub Bot commented on NIFI-4914: -- GitHub user david-streamlio opened a pull request: https://github.com/apache/nifi/pull/3178 NIFI-4914: Add Apache Pulsar processors Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/david-streamlio/nifi NIFI-4914 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/3178.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3178 > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692201#comment-16692201 ] ASF GitHub Bot commented on NIFI-4914: -- Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r234765596 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +public class StandardPulsarClientService extends AbstractControllerService implements PulsarClientService { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.build(); + +public static final PropertyDescriptor ALLOW_TLS_INSECURE_CONNECTION = new PropertyDescriptor.Builder() --- End diff -- This property should be removed as its meaning is too vague. There are a lot of ways a TLS connection could be considered or made insecure. If you mean to allow for untrusted certs, and I really dont understand why we would, then it should be clear to what it is doing. > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692199#comment-16692199 ] ASF GitHub Bot commented on NIFI-4914: -- Github user joewitt commented on the issue: https://github.com/apache/nifi/pull/2882 @david-streamlio its not likely we'll put a feature like this into an already released line. It doesn't fit our semver scheme. Also, on my local branch i have the changes i needed from your last commit so this would actually build against master. However, if I do parallel builds the build gets hung up. Single threaded builds are fine. I cannot tell why yet. If I do a full clean build parallel just on master things are fine. So, i believe there is something in the way these tests run that break parallel builds > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692132#comment-16692132 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on the issue: https://github.com/apache/nifi/pull/2882 @rumbin I am going to submit a new PR, it is just easier to create a fresh branch than rebasing to a new version of NiFi. > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692107#comment-16692107 ] ASF GitHub Bot commented on NIFI-4914: -- Github user rumbin commented on the issue: https://github.com/apache/nifi/pull/2882 @david-streamlio why did you close it? Will you submit a new PR? > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16692081#comment-16692081 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio closed the pull request at: https://github.com/apache/nifi/pull/2882 > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690657#comment-16690657 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on the issue: https://github.com/apache/nifi/pull/2882 Is there a way to get this into a 1.8.x release rather than the 1.9.x? > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16690008#comment-16690008 ] ASF GitHub Bot commented on NIFI-4914: -- Github user joewitt commented on the issue: https://github.com/apache/nifi/pull/2882 am building against master - is 1.9.0. changing it to 1.9.0-SNAPSHOT and ignoring last commit addresses it. travis fails for a similar reason i'd guess. Probably makes sense to ditch the merge commits, squash, rebase to latest. But in any case is building now i think > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689991#comment-16689991 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on the issue: https://github.com/apache/nifi/pull/2882 As a sanity check I did the following on my local MacBook, and was able to build NiFi successfully from my branch: cd /tmp git clone g...@github.com:david-streamlio/nifi.git cd nifi/ git checkout NIFI-4914 mvn clean install -DskipTests ... [INFO] [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 09:48 min [INFO] Finished at: 2018-11-16T12:45:01-08:00 [INFO] Final Memory: 318M/1815M [INFO] > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689975#comment-16689975 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on the issue: https://github.com/apache/nifi/pull/2882 @joewitt So the build doesn't work for you locally? I am able to build it locally. The latest commit was trying suggestion number 2 from the following wiki page that is given in the build output: https://cwiki.apache.org//confluence/display/MAVEN/ProjectBuildingException https://cwiki.apache.org/confluence/display/MAVEN/UnresolvableModelException "You intent to use a parent POM from the local filesystem but Maven didn't use that. Please verify that the element in the child is properly set and that the POM at that location has actually the version you want to use." > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689959#comment-16689959 ] ASF GitHub Bot commented on NIFI-4914: -- Github user joewitt commented on the issue: https://github.com/apache/nifi/pull/2882 the build doesn't work...at least for me or travis. also not with the new patch. i've not looked into why yet but we dont use relative paths anywhere that i know of so not sure what the latest change was for/doing > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689934#comment-16689934 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on the issue: https://github.com/apache/nifi/pull/2882 Thanks @joewitt I was double comparing the nifi-pulsar-bundle/pom.xml to other bundles in the nifi-nar-bundles, e.g. nifi-redis-bundle/pom.xml, nifi-rethinkdb-bundle/pom.xml, and nidi-parquet-bundle/pom.xml and they all define their parent module as follows: org.apache.nifi nifi-parquet-bundle 1.8.0-SNAPSHOT Which is what I have as the definition in the nifi-pulsar-bundle/pom.xml file > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16689931#comment-16689931 ] ASF GitHub Bot commented on NIFI-4914: -- Github user joewitt commented on the issue: https://github.com/apache/nifi/pull/2882 travis and local builds dont work due to some parent relative path issues. can you please confirm you're following the pattern that other components follow with regards to pom contents > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16675584#comment-16675584 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on the issue: https://github.com/apache/nifi/pull/2882 @rumbin I am merging in changes from @pvillard31, and correcting some minor issues that cause the processors to hang in certain situations. ETA is by Monday 11/12 for my next commit with these changes > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16674700#comment-16674700 ] ASF GitHub Bot commented on NIFI-4914: -- Github user rumbin commented on the issue: https://github.com/apache/nifi/pull/2882 Is there an estimation when this may be merged? I'm also very much interested in using Pulsar within NiFi, so I can provide some application testing, if needed... > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16659403#comment-16659403 ] ASF GitHub Bot commented on NIFI-4914: -- Github user pvillard31 commented on the issue: https://github.com/apache/nifi/pull/2882 Hi @david-streamlio - sorry it took so long, I've been busy with a lot of things... I've not been able to complete all my tests successfully but I wanted to give a status with my tests so far. All the code changes I did are available here: 8ca2db6712b9fc843a3d2ee7917ecc6db2dd54f2 Most of the changes are just to be consistent with the code base but some have been required to make things work. In addition to that, I have the following comments: if we leave the ack timeout to 0sec, we get the following error 2018-10-22 08:23:18,998 WARN [Timer-Driven Process Thread-8] o.a.n.controller.tasks.ConnectableTask Administratively Yielding ConsumePulsar[id=1a70a923-0166-1000-e698-63a0160cc922] due to uncaught Exception: java.lang.IllegalArgumentException: Ack timeout should be should be greater than 1000 ms java.lang.IllegalArgumentException: Ack timeout should be should be greater than 1000 ms at org.apache.pulsar.shade.com.google.common.base.Preconditions.checkArgument(Preconditions.java:122) at org.apache.pulsar.client.impl.ConsumerBuilderImpl.ackTimeout(ConsumerBuilderImpl.java:147) at org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor.getConsumerBulder(AbstractPulsarConsumerProcessor.java:361) at org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor.getConsumer(AbstractPulsarConsumerProcessor.java:338) at org.apache.nifi.processors.pulsar.pubsub.ConsumePulsar.onTrigger(ConsumePulsar.java:46) at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27) at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165) at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203) at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Is it something that changed with the latest version of Pulsar? In the consume processors, I'd add the topic name as attribute of flowfile and possibly other attributes that could be useful for later routing/filtering. When stopping the processors or stopping NiFi, some processors does not stop cleanly and it requires a process kill for NiFi to shutdown. During my last test, it was on ConsumePulsar but it could be on other processors as well. > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625241#comment-16625241 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r219709641 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/pom.xml --- @@ -0,0 +1,78 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + +org.apache.nifi +nifi-pulsar-bundle +1.8.0-SNAPSHOT + + +nifi-pulsar-processors +jar + + + +org.apache.nifi +nifi-api + + +org.apache.nifi +nifi-record-serialization-service-api + + +org.apache.nifi +nifi-record + + +org.apache.nifi +nifi-utils +1.8.0-SNAPSHOT + + +org.apache.nifi +nifi-ssl-context-service-api + + +org.apache.nifi +nifi-pulsar-client-service-api +1.8.0-SNAPSHOT +provided + + +org.apache.pulsar +pulsar-client +2.0.0-rc1-incubating --- End diff -- Fixed > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625240#comment-16625240 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r219709639 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java --- @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements.See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientService; +import org.apache.nifi.pulsar.cache.LRUCache; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +public abstract class AbstractPulsarConsumerProcessor extends AbstractProcessor { + +static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); +static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); +static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " ++ "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages"); + +static final AllowableValue CONSUME = new AllowableValue(ConsumerCryptoFailureAction.CONSUME.name(), "Consume", +"Mark the message as consumed despite being unable to decrypt the contents"); +static final AllowableValue DISCARD = new AllowableValue(ConsumerCryptoFailureAction.DISCARD.name(), "Discard", +"Discard the message and don't perform any addtional processing on the message"); +static final AllowableValue FAIL = new AllowableValue(ConsumerCryptoFailureAction.FAIL.name(), "Fail", +"Report a failure condition, and the route the message contents to the FAILED relationship."); --- End diff -- Fixed > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 >
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16625239#comment-16625239 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r219709622 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java --- @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements.See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientService; +import org.apache.nifi.pulsar.cache.LRUCache; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +public abstract class AbstractPulsarConsumerProcessor extends AbstractProcessor { + +static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); +static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); +static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " ++ "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages"); + +static final AllowableValue CONSUME = new AllowableValue(ConsumerCryptoFailureAction.CONSUME.name(), "Consume", +"Mark the message as consumed despite being unable to decrypt the contents"); +static final AllowableValue DISCARD = new AllowableValue(ConsumerCryptoFailureAction.DISCARD.name(), "Discard", +"Discard the message and don't perform any addtional processing on the message"); +static final AllowableValue FAIL = new AllowableValue(ConsumerCryptoFailureAction.FAIL.name(), "Fail", +"Report a failure condition, and the route the message contents to the FAILED relationship."); + +public static final Relationship REL_SUCCESS = new Relationship.Builder() +.name("success") +.description("FlowFiles for which all content was consumed from Pulsar.") +.build(); + +public static final
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623477#comment-16623477 ] ASF GitHub Bot commented on NIFI-4914: -- Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r218858305 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/pom.xml --- @@ -0,0 +1,78 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + +org.apache.nifi +nifi-pulsar-bundle +1.8.0-SNAPSHOT + + +nifi-pulsar-processors +jar + + + +org.apache.nifi +nifi-api + + +org.apache.nifi +nifi-record-serialization-service-api + + +org.apache.nifi +nifi-record + + +org.apache.nifi +nifi-utils +1.8.0-SNAPSHOT + + +org.apache.nifi +nifi-ssl-context-service-api + + +org.apache.nifi +nifi-pulsar-client-service-api +1.8.0-SNAPSHOT +provided + + +org.apache.pulsar +pulsar-client +2.0.0-rc1-incubating --- End diff -- can we set the pulsar version as a property in the root pom of the bundle and reference that version? Also upgrade to 2.1.1 if possible? > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623473#comment-16623473 ] ASF GitHub Bot commented on NIFI-4914: -- Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r218850426 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/pom.xml --- @@ -0,0 +1,40 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + +org.apache.nifi +nifi-pulsar-bundle +1.8.0-SNAPSHOT + + +nifi-pulsar-client-service-api +jar + + + +org.apache.nifi +nifi-api +provided + + +org.apache.pulsar +pulsar-client +2.0.1-incubating --- End diff -- 2.1.1-incubating has been released 2 days ago - should be available in mvn repo shortly > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623480#comment-16623480 ] ASF GitHub Bot commented on NIFI-4914: -- Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r218857973 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/.gitignore --- @@ -0,0 +1 @@ +/target/ --- End diff -- same here > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623475#comment-16623475 ] ASF GitHub Bot commented on NIFI-4914: -- Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r218849480 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/.gitignore --- @@ -0,0 +1 @@ +/target/ --- End diff -- we probably don't want that file, no? > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623479#comment-16623479 ] ASF GitHub Bot commented on NIFI-4914: -- Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r218860810 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java --- @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements.See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientService; +import org.apache.nifi.pulsar.cache.LRUCache; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +public abstract class AbstractPulsarConsumerProcessor extends AbstractProcessor { + +static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); +static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); +static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " ++ "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages"); + +static final AllowableValue CONSUME = new AllowableValue(ConsumerCryptoFailureAction.CONSUME.name(), "Consume", +"Mark the message as consumed despite being unable to decrypt the contents"); +static final AllowableValue DISCARD = new AllowableValue(ConsumerCryptoFailureAction.DISCARD.name(), "Discard", +"Discard the message and don't perform any addtional processing on the message"); +static final AllowableValue FAIL = new AllowableValue(ConsumerCryptoFailureAction.FAIL.name(), "Fail", +"Report a failure condition, and the route the message contents to the FAILED relationship."); + +public static final Relationship REL_SUCCESS = new Relationship.Builder() +.name("success") +.description("FlowFiles for which all content was consumed from Pulsar.") +.build(); + +public static final
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623476#comment-16623476 ] ASF GitHub Bot commented on NIFI-4914: -- Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r218858965 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java --- @@ -0,0 +1,412 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements.See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License.You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientService; +import org.apache.nifi.pulsar.cache.LRUCache; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; + +public abstract class AbstractPulsarConsumerProcessor extends AbstractProcessor { + +static final AllowableValue EXCLUSIVE = new AllowableValue("Exclusive", "Exclusive", "There can be only 1 consumer on the same topic with the same subscription name"); +static final AllowableValue SHARED = new AllowableValue("Shared", "Shared", "Multiple consumer will be able to use the same subscription name and the messages"); +static final AllowableValue FAILOVER = new AllowableValue("Failover", "Failover", "Multiple consumer will be able to use the same subscription name but only 1 consumer " ++ "will receive the messages. If that consumer disconnects, one of the other connected consumers will start receiving messages"); + +static final AllowableValue CONSUME = new AllowableValue(ConsumerCryptoFailureAction.CONSUME.name(), "Consume", +"Mark the message as consumed despite being unable to decrypt the contents"); +static final AllowableValue DISCARD = new AllowableValue(ConsumerCryptoFailureAction.DISCARD.name(), "Discard", +"Discard the message and don't perform any addtional processing on the message"); +static final AllowableValue FAIL = new AllowableValue(ConsumerCryptoFailureAction.FAIL.name(), "Fail", +"Report a failure condition, and the route the message contents to the FAILED relationship."); --- End diff -- typo: "and then route the message content" > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key:
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623478#comment-16623478 ] ASF GitHub Bot commented on NIFI-4914: -- Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r218857914 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-nar/.gitignore --- @@ -0,0 +1,2 @@ +/target/ --- End diff -- same comment here > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16623474#comment-16623474 ] ASF GitHub Bot commented on NIFI-4914: -- Github user pvillard31 commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r218851139 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/.gitignore --- @@ -0,0 +1 @@ +/target/ --- End diff -- same here > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16599184#comment-16599184 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on the issue: https://github.com/apache/nifi/pull/2882 FWIW, this PR includes the following Record-Oriented processors: org.apache.nifi.processors.pulsar.pubsub.ConsumePulsarRecord org.apache.nifi.processors.pulsar.pubsub.PublishPulsarRecord As for the security issue you mentioned, we are actually adding an additional layer of security between Pulsar and NiFi by enabling connections to be secured with user supplied TLS certificates > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16599156#comment-16599156 ] ASF GitHub Bot commented on NIFI-4914: -- Github user joewitt commented on the issue: https://github.com/apache/nifi/pull/2882 not clear when we'll be cutting a 1.8 release so i'm not sure about safety but this is clearly a cool capability and it is just a matter of finding a committer to review it with sufficient bandwidth and expertise. things impacting security are super important and not being record oriented makes it less useful for sure. I haven't looked at the details in a while to see if you added that. I'd go so far as to recommend not offering a non record approach but i wouldnt say that is a rule - just a recommendation > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16599144#comment-16599144 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on the issue: https://github.com/apache/nifi/pull/2882 Any update on this? Am I safe to assume we are going to make the 1.8 release? > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568649#comment-16568649 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r207644839 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +public class StandardPulsarClientService extends AbstractControllerService implements PulsarClientService { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.build(); + +public static final PropertyDescriptor ALLOW_TLS_INSECURE_CONNECTION = new PropertyDescriptor.Builder() +.name("Allow TLS insecure connection") +.displayName("Allow TLS insecure connection") +.description("Whether the Pulsar client will accept untrusted TLS certificate from broker or not.") +.required(false) +.allowableValues("true", "false") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.defaultValue("false") +.build(); + +public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() +.name("Maximum concurrent lookup-requests") +.description("Number of concurrent lookup-requests allowed on each broker-connection.") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.defaultValue("5000") +.build(); + +public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder() +.name("Maximum connects per Pulsar broker") +.description("Sets the max number of connection that the client library will open to a single broker.\n" + +"By default, the connection pool will use a single connection for all the producers and consumers. " + +"Increasing this parameter may improve throughput when using many producers over a high latency connection") +.required(false)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568646#comment-16568646 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r207643777 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +public class StandardPulsarClientService extends AbstractControllerService implements PulsarClientService { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.build(); + +public static final PropertyDescriptor ALLOW_TLS_INSECURE_CONNECTION = new PropertyDescriptor.Builder() --- End diff -- This property allows the Pulsar client to accept an untrusted TLS certificate from broker, which may be an edge use case, but is still a situation that the Pulsar client API exposes, and I want to enable users to configure the client as they would in any other situation. > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568496#comment-16568496 ] ASF GitHub Bot commented on NIFI-4914: -- Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r207611496 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +public class StandardPulsarClientService extends AbstractControllerService implements PulsarClientService { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.build(); + +public static final PropertyDescriptor ALLOW_TLS_INSECURE_CONNECTION = new PropertyDescriptor.Builder() +.name("Allow TLS insecure connection") +.displayName("Allow TLS insecure connection") +.description("Whether the Pulsar client will accept untrusted TLS certificate from broker or not.") +.required(false) +.allowableValues("true", "false") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.defaultValue("false") +.build(); + +public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() +.name("Maximum concurrent lookup-requests") +.description("Number of concurrent lookup-requests allowed on each broker-connection.") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.defaultValue("5000") +.build(); + +public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder() +.name("Maximum connects per Pulsar broker") +.description("Sets the max number of connection that the client library will open to a single broker.\n" + +"By default, the connection pool will use a single connection for all the producers and consumers. " + +"Increasing this parameter may improve throughput when using many producers over a high latency connection") +.required(false) +
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568494#comment-16568494 ] ASF GitHub Bot commented on NIFI-4914: -- Github user joewitt commented on a diff in the pull request: https://github.com/apache/nifi/pull/2882#discussion_r207611364 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientService.java --- @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.annotation.lifecycle.OnShutdown; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +public class StandardPulsarClientService extends AbstractControllerService implements PulsarClientService { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) +.build(); + +public static final PropertyDescriptor ALLOW_TLS_INSECURE_CONNECTION = new PropertyDescriptor.Builder() --- End diff -- This probably should not be an option and the security context service should encapsulate such things. > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567587#comment-16567587 ] ASF GitHub Bot commented on NIFI-4914: -- Github user joewitt commented on the issue: https://github.com/apache/nifi/pull/2882 sorry @david-streamlio i def would like to get it in as well. i will try to help soon > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567472#comment-16567472 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on the issue: https://github.com/apache/nifi/pull/2882 Any update on this? I don't want to miss another release window if possible... =) > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16540481#comment-16540481 ] ASF GitHub Bot commented on NIFI-4914: -- Github user alopresto commented on the issue: https://github.com/apache/nifi/pull/2702 Your local `master` is in sync with your repository (`origin`) but not the Apache GitHub repository (`upstream`). You need to do the following: ``` $ git checkout master $ git pull upstream master $ git checkout NIFI-4914 $ git rebase master ``` > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16540473#comment-16540473 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on the issue: https://github.com/apache/nifi/pull/2702 I tried rebase-ing against master, but it had no effect. I think that is because my "master" is a fork of the 1.7.0 branch... Anyways, here is the output of the rebase commands: "Davids-MacBook-Pro:nifi david$ git rebase master Current branch NIFI-4914 is up to date. Davids-MacBook-Pro:nifi david$ git checkout master Switched to branch 'master' Your branch is up to date with 'origin/master'." All the pom's are still using version 1.7.0-SNAPSHOT. > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16473608#comment-16473608 ] ASF GitHub Bot commented on NIFI-4914: -- Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2614 @david-streamlio Ok, I think I figured out what happened. At some point, it looks like you accidentally did a pull on upstream master into your branch. The fact that you keep having over 200 commits even with rebasing against master very strongly suggests that. What I did to verify was I checked out your branch, pushed it to my fork and saw 210ish commits with a merge conflict into my master. So I locally rebased against master, did a forced push and it dropped it down to ~7 commits. So carefully follow the four steps I gave you: 1. git checkout master 2. git pull upstream master 3. git checkout NIFI-4914 4. git rebase master **Make sure** that `upstream` is changed to whatever you call `apache/nifi` on github.com. The do: `git push origin --force NIFI-4914` and it should all clear up to a few commits. > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16473605#comment-16473605 ] ASF GitHub Bot commented on NIFI-4914: -- Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2614 Huh, I just checked out your latest branch and it's clean as a whistle WRT rebasing and appears 100% up to date w/ the upstream master. So not sure what the heck is going on here. Try doing a `git push origin --force NIFI-4914`. That should forcefully knock out whatever is going wrong. > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16473298#comment-16473298 ] ASF GitHub Bot commented on NIFI-4914: -- Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2614 At that point, this should work now to clear things up hopefully: `git push origin --force NIFI-4914` > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16473290#comment-16473290 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on the issue: https://github.com/apache/nifi/pull/2614 Davids-MacBook-Pro:nifi david$ git rebase --continue Applying: Changed artifact versions to 1.7.0-SNAPSHOT Applying: Fixed issues identified during code review Applying: Removed invalid characters left over from merge Using index info to reconstruct a base tree... M nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/pom.xml M nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java Falling back to patching base and 3-way merge... No changes -- Patch already applied. Davids-MacBook-Pro:nifi david$ git status On branch NIFI-4914 Your branch and 'origin/NIFI-4914' have diverged, and have 210 and 217 different commits each, respectively. (use "git pull" to merge the remote branch into yours) > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16473264#comment-16473264 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on the issue: https://github.com/apache/nifi/pull/2614 Davids-MacBook-Pro:nifi david$ git checkout NIFI-4914 Switched to branch 'NIFI-4914' Your branch is up to date with 'origin/NIFI-4914'. Davids-MacBook-Pro:nifi david$ git rebase master First, rewinding head to replay your work on top of it... Applying: Added Apache Pulsar Processors and Controller Service Applying: Changed code to use new ExpressionLanguageScope Enum Applying: Changed artifact versions to 1.7.0-SNAPSHOT Applying: Added Apache Pulsar Processors and Controller Service Using index info to reconstruct a base tree... M nifi-nar-bundles/pom.xml .git/rebase-apply/patch:848: trailing whitespace. .git/rebase-apply/patch:854: trailing whitespace. .git/rebase-apply/patch:857: trailing whitespace. .git/rebase-apply/patch:859: space before tab in indent. .git/rebase-apply/patch:865: trailing whitespace. warning: squelched 161 whitespace errors warning: 166 lines add whitespace errors. Falling back to patching base and 3-way merge... Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/pom.xml CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/pom.xml Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestPublishPulsar_1_X.java CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestPublishPulsar_1_X.java Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsar_1_X.java CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsar_1_X.java Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsarRecord_1_X.java CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/TestConsumePulsarRecord_1_X.java Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/AbstractPulsarProcessorTest.java CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/test/java/org/apache/nifi/processors/pulsar/pubsub/AbstractPulsarProcessorTest.java Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsar_1_X.java CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsar_1_X.java Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarProducerProcessor.java CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarProducerProcessor.java Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarConsumerProcessor.java Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/pom.xml CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/pom.xml Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-nar/pom.xml CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-nar/pom.xml Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/pom.xml CONFLICT (add/add): Merge conflict in nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/pom.xml Auto-merging nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-nar/pom.xml CONFLICT
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16473261#comment-16473261 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on the issue: https://github.com/apache/nifi/pull/2614 `Davids-MacBook-Pro:nifi david$ git checkout NIFI-4914 Switched to branch 'NIFI-4914' Your branch is up to date with 'origin/NIFI-4914'. Davids-MacBook-Pro:nifi david$ git rebase --continue No rebase in progress? ` > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16473258#comment-16473258 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on the issue: https://github.com/apache/nifi/pull/2614 run the rebase against my 'master' branch or my 'NIFI-4914' branch? > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16473243#comment-16473243 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on the issue: https://github.com/apache/nifi/pull/2614 Yea, I'm not sure what happened First I did the following 4 steps you recommended: As a rule of thumb, this is how you want to do this sort of update: git checkout master git pull upstream master git checkout YOUR_BRANCH git rebase master Then I checkout made the changes, ran mvn -Pcontrib-check clean install and had a clean install. When I tried to do a git push, I got the following error Davids-MacBook-Pro:nifi david$ git push origin To https://github.com/david-streamlio/nifi.git ! [rejected]NIFI-4914 -> NIFI-4914 (non-fast-forward) error: failed to push some refs to 'https://github.com/david-streamlio/nifi.git' hint: Updates were rejected because the tip of your current branch is behind hint: its remote counterpart. Integrate the remote changes (e.g. hint: 'git pull ...') before pushing again. So, I fixed the conflicts, committed them and pushed again. > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16431542#comment-16431542 ] ASF GitHub Bot commented on NIFI-4914: -- Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2614 @david-streamlio you brought in a whole lot of commits from other people with that merge from upstream/master. I checked out your branch and did a rebase on it (`git rebase master`) and that seemed to clear it up. I would recommend doing that on your branch locally and verifying that that "merge commit" with the commit message `Merge remote-tracking branch 'upstream/master' into NIFI-4914` goes away. As a rule of thumb, this is how you want to do this sort of update: 1. git checkout master 2. git pull upstream master 3. git checkout YOUR_BRANCH 4. git rebase master Once you've done that, the last command will replay your commits on top of the most recent version of upstream/master. > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16428500#comment-16428500 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on the issue: https://github.com/apache/nifi/pull/2553 Thanks for the update.I have merged my changes that came from your code review into the NIFI-4914-rebase PR as well. That PR has extends the base Pulsar processor capabilities to include RecordBased processing as well > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16404866#comment-16404866 ] Joseph Witt commented on NIFI-4914: --- fix version can be applied when merged. The linked PR is closed and there is no other linked PR but know work is on-going. This Jira needs to be updated > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-4914) Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, PublishPulsarRecord
[ https://issues.apache.org/jira/browse/NIFI-4914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16380659#comment-16380659 ] ASF GitHub Bot commented on NIFI-4914: -- Github user david-streamlio commented on the issue: https://github.com/apache/nifi/pull/2493 I created a separate JIRA for record-based processors.https://issues.apache.org/jira/browse/NIFI-4914, but would like to get these processors into the release as well. Primarily because Pulsar doesn't currently have schemas or a schema registry, so writing records to Pulsar doesn't help the downstream consumers much. Unless they use the ConsumePulsarRecord processor. > Implement record model processor for Pulsar, i.e. ConsumePulsarRecord, > PublishPulsarRecord > -- > > Key: NIFI-4914 > URL: https://issues.apache.org/jira/browse/NIFI-4914 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.6.0 >Reporter: David Kjerrumgaard >Priority: Minor > Fix For: 1.6.0 > > Original Estimate: 168h > Remaining Estimate: 168h > > Create record-based processors for Apache Pulsar -- This message was sent by Atlassian JIRA (v7.6.3#76005)