[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183233581 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord_1_X.java --- @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER; +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; + +@CapabilityDescription("Consumes messages from Apache Pulsar specifically built against the Pulsar 1.x Consumer API. " ++ "The complementary NiFi processor for sending messages is PublishPulsarRecord_1_0. Please note that, at this time, " ++ "the Processor assumes that all records that are retrieved from a given partition have the same schema. If any " ++ "of the Pulsar messages that are pulled but cannot be parsed or written with the configured Record Reader or " ++ "Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the " ++ "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual " ++ "messages within the single FlowFile. A 'record.count' attribute is added to indicate how many messages are contained in the " ++ "FlowFile. No two Pulsar messages will be placed into the same FlowFile if they have different schemas.") +@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@WritesAttributes({ +@WritesAttribute(attribute = "record.count", description = "The number of records received") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183234943 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java --- @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER; +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicLong; + + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; + +@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "1.0"}) +@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Pulsar using the Pulsar 1.x client API. " ++ "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. " ++ "The complementary NiFi processor for fetching messages is ConsumePulsarRecord_1_0.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to " ++ "FlowFiles that are routed to success.") +@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, ConsumePulsarRecord_1_X.class}) +public class PublishPulsarRecord_1_X extends AbstractPulsarProducerProcessor { + +private static final List PROPERTIES; +private static final Set RELATIONSHIPS; + +static { +final List properties = new ArrayList<>(); +properties.add(PULSAR_CLIENT_SERVICE); +properties.add(RECORD_READER); +properties.add(RECORD_WRITER); +properties.add(TOPIC); +properties.add(ASYNC_ENABLED); +properties.add(MAX_ASYNC_REQUESTS); +properties.add(BATCHING_ENABLED); +
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183235109 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java --- @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER; +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicLong; + + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; + +@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "1.0"}) +@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Pulsar using the Pulsar 1.x client API. " ++ "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. " ++ "The complementary NiFi processor for fetching messages is ConsumePulsarRecord_1_0.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to " ++ "FlowFiles that are routed to success.") +@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, ConsumePulsarRecord_1_X.class}) +public class PublishPulsarRecord_1_X extends AbstractPulsarProducerProcessor { + +private static final List PROPERTIES; +private static final Set RELATIONSHIPS; + +static { +final List properties = new ArrayList<>(); +properties.add(PULSAR_CLIENT_SERVICE); +properties.add(RECORD_READER); +properties.add(RECORD_WRITER); +properties.add(TOPIC); +properties.add(ASYNC_ENABLED); +properties.add(MAX_ASYNC_REQUESTS); +properties.add(BATCHING_ENABLED); +
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183234762 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_X extends AbstractPulsarConsumerProcessor { + +private static final List PROPERTIES; +private static final Set RELATIONSHIPS; + +static { +final List properties = new ArrayList<>(); +properties.add(PULSAR_CLIENT_SERVICE); +properties.add(TOPIC); +properties.add(SUBSCRIPTION); +properties.add(ASYNC_ENABLED); +properties.add(MAX_ASYNC_REQUESTS); +properties.add(ACK_TIMEOUT); +properties.add(PRIORITY_LEVEL); +properties.add(RECEIVER_QUEUE_SIZE); +properties.add(SUBSCRIPTION_TYPE); +properties.add(MAX_WAIT_TIME); + +PROPERTIES = Collections.unmodifiableList(properties); + +final Set relationships = new HashSet<>(); +relationships.add(REL_SUCCESS); +RELATIONSHIPS = Collections.unmodifiableSet(relationships); +} + +@Override +public Set getRelationships() { +return RELATIONSHIPS; +} + +@Override +protected List getSupportedPropertyDescriptors() { +return PROPERTIES; +} + +@Override +public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + +try { +if (context.getProperty(ASYNC_ENABLED).asBoolean()) { +// Launch consumers +consumeAsync(context, session); + +// Handle completed consumers +handleAsync(context, session); + +} else { +consume(context, session); +} +} catch (PulsarClientException e) { +getLogger().error("Unable to consume from Pulsar Topic ", e); +context.yield(); +throw new ProcessException(e); +} + +} + +private void handleAsync(ProcessContext context, ProcessSession session) { + +try { +Future done = consumerService.take(); +Message msg = done.get(); + +if
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183209203 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.pulsar.pool.ResourcePoolImpl; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +@Tags({ "Pulsar"}) +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.") +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() +.name("Maximum concurrent lookup-requests") +.description("Number of concurrent lookup-requests allowed on each broker-connection to prevent " ++ "overload on broker. (default: 5000) It should be configured with higher value only in case " ++ "of it requires to produce/subscribe on thousands of topics") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("5000") +.build(); + +public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder() +.name("Maximum connects per Pulsar broker") +.description("Sets the max number of connection that the client library will open to a single broker.\n" + +"By default, the connection pool will use a single connection for all the producers and consumers. " + +"Increasing this parameter may improve throughput when using many producers over a high latency connection") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("1") +.build(); + +public static final PropertyDescriptor IO_THREADS = new PropertyDescriptor.Builder() +.name("I/O Threads") +.description("The number of threads to be used for handling connections to brokers (default: 1 thread)") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183234572 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord_1_X.java --- @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER; +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; + +@CapabilityDescription("Consumes messages from Apache Pulsar specifically built against the Pulsar 1.x Consumer API. " ++ "The complementary NiFi processor for sending messages is PublishPulsarRecord_1_0. Please note that, at this time, " ++ "the Processor assumes that all records that are retrieved from a given partition have the same schema. If any " ++ "of the Pulsar messages that are pulled but cannot be parsed or written with the configured Record Reader or " ++ "Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the " ++ "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual " ++ "messages within the single FlowFile. A 'record.count' attribute is added to indicate how many messages are contained in the " ++ "FlowFile. No two Pulsar messages will be placed into the same FlowFile if they have different schemas.") +@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@WritesAttributes({ +@WritesAttribute(attribute = "record.count", description = "The number of records received") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183234826 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_X extends AbstractPulsarConsumerProcessor { + +private static final List PROPERTIES; +private static final Set RELATIONSHIPS; + +static { +final List properties = new ArrayList<>(); +properties.add(PULSAR_CLIENT_SERVICE); +properties.add(TOPIC); +properties.add(SUBSCRIPTION); +properties.add(ASYNC_ENABLED); +properties.add(MAX_ASYNC_REQUESTS); +properties.add(ACK_TIMEOUT); +properties.add(PRIORITY_LEVEL); +properties.add(RECEIVER_QUEUE_SIZE); +properties.add(SUBSCRIPTION_TYPE); +properties.add(MAX_WAIT_TIME); + +PROPERTIES = Collections.unmodifiableList(properties); + +final Set relationships = new HashSet<>(); +relationships.add(REL_SUCCESS); +RELATIONSHIPS = Collections.unmodifiableSet(relationships); +} + +@Override +public Set getRelationships() { +return RELATIONSHIPS; +} + +@Override +protected List getSupportedPropertyDescriptors() { +return PROPERTIES; +} + +@Override +public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + +try { +if (context.getProperty(ASYNC_ENABLED).asBoolean()) { +// Launch consumers +consumeAsync(context, session); + +// Handle completed consumers +handleAsync(context, session); + +} else { +consume(context, session); +} +} catch (PulsarClientException e) { +getLogger().error("Unable to consume from Pulsar Topic ", e); +context.yield(); +throw new ProcessException(e); +} + +} + +private void handleAsync(ProcessContext context, ProcessSession session) { + +try { +Future done = consumerService.take(); +Message msg = done.get(); + +if
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183235065 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java --- @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER; +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicLong; + + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; + +@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "1.0"}) +@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Pulsar using the Pulsar 1.x client API. " ++ "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. " ++ "The complementary NiFi processor for fetching messages is ConsumePulsarRecord_1_0.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to " ++ "FlowFiles that are routed to success.") +@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, ConsumePulsarRecord_1_X.class}) +public class PublishPulsarRecord_1_X extends AbstractPulsarProducerProcessor { + +private static final List PROPERTIES; +private static final Set RELATIONSHIPS; + +static { +final List properties = new ArrayList<>(); +properties.add(PULSAR_CLIENT_SERVICE); +properties.add(RECORD_READER); +properties.add(RECORD_WRITER); +properties.add(TOPIC); +properties.add(ASYNC_ENABLED); +properties.add(MAX_ASYNC_REQUESTS); +properties.add(BATCHING_ENABLED); +
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183209029 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-nar/src/main/resources/META-INF/NOTICE --- @@ -0,0 +1,612 @@ +nifi-druid-controller-service-api-nar --- End diff -- Needs to be changed to the pulsar client service. ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183208823 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/PoolableResource.java --- @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar.pool; + +/** + * Service interface for any object that can be pooled for re-use., which + * defines methods for closing the object, effectively marking it no longer + * usable. + * + * @author david --- End diff -- Please remove. ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183208994 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/pool/ResourceFactory.java --- @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar.pool; + +import java.util.Properties; + +/** + * Factory pattern interface for @PoolableResource objects. Concrete implementations + * of this interface will be responsible for the creation of @PoolableResource objects + * based on the Properties passed in. + * + * @author david --- End diff -- Please remove ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183210297 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/AbstractPulsarProducerProcessor.java --- @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar; + +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.nifi.pulsar.PulsarProducer; +import org.apache.nifi.pulsar.cache.LRUCache; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerConfiguration; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode; + +public abstract class AbstractPulsarProducerProcessor extends AbstractPulsarProcessor { + +public static final String MSG_COUNT = "msg.count"; +public static final String TOPIC_NAME = "topic.name"; + +static final AllowableValue COMPRESSION_TYPE_NONE = new AllowableValue("NONE", "None", "No compression"); +static final AllowableValue COMPRESSION_TYPE_LZ4 = new AllowableValue("LZ4", "LZ4", "Compress with LZ4 algorithm."); +static final AllowableValue COMPRESSION_TYPE_ZLIB = new AllowableValue("ZLIB", "ZLIB", "Compress with ZLib algorithm"); + +static final AllowableValue MESSAGE_ROUTING_MODE_CUSTOM_PARTITION = new AllowableValue("CustomPartition", "Custom Partition", "Route messages to a custom partition"); +static final AllowableValue MESSAGE_ROUTING_MODE_ROUND_ROBIN_PARTITION = new AllowableValue("RoundRobinPartition", "Round Robin Partition", "Route messages to all " + + "partitions in a round robin manner"); +static final AllowableValue MESSAGE_ROUTING_MODE_SINGLE_PARTITION = new AllowableValue("SinglePartition", "Single Partition", "Route messages to a single partition"); + +public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() +.name("topic") +.displayName("Topic Name") +.description("The name of the Pulsar Topic.") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) +.build(); + +public static final PropertyDescriptor ASYNC_ENABLED = new PropertyDescriptor.Builder() +.name("Async Enabled") +.description("Control whether the messages will be sent asyncronously or not. Messages sent" ++ " syncronously will be acknowledged immediately before processing the next
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183234732 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord_1_X.java --- @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER; +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; + +@CapabilityDescription("Consumes messages from Apache Pulsar specifically built against the Pulsar 1.x Consumer API. " ++ "The complementary NiFi processor for sending messages is PublishPulsarRecord_1_0. Please note that, at this time, " ++ "the Processor assumes that all records that are retrieved from a given partition have the same schema. If any " ++ "of the Pulsar messages that are pulled but cannot be parsed or written with the configured Record Reader or " ++ "Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the " ++ "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual " ++ "messages within the single FlowFile. A 'record.count' attribute is added to indicate how many messages are contained in the " ++ "FlowFile. No two Pulsar messages will be placed into the same FlowFile if they have different schemas.") +@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@WritesAttributes({ +@WritesAttribute(attribute = "record.count", description = "The number of records received") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183234849 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_X extends AbstractPulsarConsumerProcessor { + +private static final List PROPERTIES; +private static final Set RELATIONSHIPS; + +static { +final List properties = new ArrayList<>(); +properties.add(PULSAR_CLIENT_SERVICE); +properties.add(TOPIC); +properties.add(SUBSCRIPTION); +properties.add(ASYNC_ENABLED); +properties.add(MAX_ASYNC_REQUESTS); +properties.add(ACK_TIMEOUT); +properties.add(PRIORITY_LEVEL); +properties.add(RECEIVER_QUEUE_SIZE); +properties.add(SUBSCRIPTION_TYPE); +properties.add(MAX_WAIT_TIME); + +PROPERTIES = Collections.unmodifiableList(properties); + +final Set relationships = new HashSet<>(); +relationships.add(REL_SUCCESS); +RELATIONSHIPS = Collections.unmodifiableSet(relationships); +} + +@Override +public Set getRelationships() { +return RELATIONSHIPS; +} + +@Override +protected List getSupportedPropertyDescriptors() { +return PROPERTIES; +} + +@Override +public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + +try { +if (context.getProperty(ASYNC_ENABLED).asBoolean()) { +// Launch consumers +consumeAsync(context, session); + +// Handle completed consumers +handleAsync(context, session); + +} else { +consume(context, session); +} +} catch (PulsarClientException e) { +getLogger().error("Unable to consume from Pulsar Topic ", e); +context.yield(); +throw new ProcessException(e); +} + +} + +private void handleAsync(ProcessContext context, ProcessSession session) { + +try { +Future done = consumerService.take(); +Message msg = done.get(); + +if
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183235123 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java --- @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER; +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicLong; + + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; + +@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "1.0"}) +@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Pulsar using the Pulsar 1.x client API. " ++ "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. " ++ "The complementary NiFi processor for fetching messages is ConsumePulsarRecord_1_0.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to " ++ "FlowFiles that are routed to success.") +@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, ConsumePulsarRecord_1_X.class}) +public class PublishPulsarRecord_1_X extends AbstractPulsarProducerProcessor { + +private static final List PROPERTIES; +private static final Set RELATIONSHIPS; + +static { +final List properties = new ArrayList<>(); +properties.add(PULSAR_CLIENT_SERVICE); +properties.add(RECORD_READER); +properties.add(RECORD_WRITER); +properties.add(TOPIC); +properties.add(ASYNC_ENABLED); +properties.add(MAX_ASYNC_REQUESTS); +properties.add(BATCHING_ENABLED); +
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183234534 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord_1_X.java --- @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER; +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; + +@CapabilityDescription("Consumes messages from Apache Pulsar specifically built against the Pulsar 1.x Consumer API. " ++ "The complementary NiFi processor for sending messages is PublishPulsarRecord_1_0. Please note that, at this time, " ++ "the Processor assumes that all records that are retrieved from a given partition have the same schema. If any " ++ "of the Pulsar messages that are pulled but cannot be parsed or written with the configured Record Reader or " ++ "Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the " ++ "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual " ++ "messages within the single FlowFile. A 'record.count' attribute is added to indicate how many messages are contained in the " ++ "FlowFile. No two Pulsar messages will be placed into the same FlowFile if they have different schemas.") +@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@WritesAttributes({ +@WritesAttribute(attribute = "record.count", description = "The number of records received") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183235163 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsar_1_X.java --- @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; + +@Tags({"Apache", "Pulsar", "Put", "Send", "Message", "PubSub"}) +@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Pulsar using the Pulsar 1.X Producer API." ++ "The messages to send may be individual FlowFiles or may be delimited, using a " ++ "user-specified delimiter, such as a new-line. " ++ "The complementary NiFi processor for fetching messages is ConsumePulsar_1_X.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to " ++ "FlowFiles that are routed to success.") +public class PublishPulsar_1_X extends AbstractPulsarProducerProcessor { + +private static final List PROPERTIES; +private static final Set RELATIONSHIPS; + +static { +final List properties = new ArrayList<>(); +properties.add(PULSAR_CLIENT_SERVICE); +properties.add(TOPIC); +properties.add(ASYNC_ENABLED); +properties.add(MAX_ASYNC_REQUESTS); +properties.add(BATCHING_ENABLED); +properties.add(BATCHING_MAX_MESSAGES); +properties.add(BATCH_INTERVAL); +properties.add(BLOCK_IF_QUEUE_FULL); +properties.add(COMPRESSION_TYPE); +properties.add(MESSAGE_ROUTING_MODE); +properties.add(PENDING_MAX_MESSAGES); + +PROPERTIES = Collections.unmodifiableList(properties); + +final Set relationships = new HashSet<>(); +relationships.add(REL_SUCCESS); +relationships.add(REL_FAILURE); +RELATIONSHIPS = Collections.unmodifiableSet(relationships); +} + +@Override +public Set getRelationships() { +return RELATIONSHIPS; +} + +@Override +protected List getSupportedPropertyDescriptors() { +return PROPERTIES; +} + +@Override +public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + +FlowFile flowFile = session.get(); + +if (flowFile == null) { +return; +} + +final ComponentLog logger = getLogger(); +final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); + +if (StringUtils.isBlank(topic)) { +
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183234539 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsarRecord_1_X.java --- @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER; +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; + +@CapabilityDescription("Consumes messages from Apache Pulsar specifically built against the Pulsar 1.x Consumer API. " ++ "The complementary NiFi processor for sending messages is PublishPulsarRecord_1_0. Please note that, at this time, " ++ "the Processor assumes that all records that are retrieved from a given partition have the same schema. If any " ++ "of the Pulsar messages that are pulled but cannot be parsed or written with the configured Record Reader or " ++ "Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the " ++ "'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual " ++ "messages within the single FlowFile. A 'record.count' attribute is added to indicate how many messages are contained in the " ++ "FlowFile. No two Pulsar messages will be placed into the same FlowFile if they have different schemas.") +@Tags({"Pulsar", "Get", "Record", "csv", "avro", "json", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@WritesAttributes({ +@WritesAttribute(attribute = "record.count", description = "The number of records received") +}) +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183208835 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientPool.java --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.pulsar.pool.ResourcePool; + + +@Tags({"Pulsar"}) --- End diff -- Could use others like "client" and "pool." ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183234861 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/InFlightMessageMonitor.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Helper class to monitor the asynchronous submission of a large number + * of records to Apache Pulsar. + * + * @author david --- End diff -- Please remove. ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183234779 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_X extends AbstractPulsarConsumerProcessor { + +private static final List PROPERTIES; +private static final Set RELATIONSHIPS; + +static { +final List properties = new ArrayList<>(); +properties.add(PULSAR_CLIENT_SERVICE); +properties.add(TOPIC); +properties.add(SUBSCRIPTION); +properties.add(ASYNC_ENABLED); +properties.add(MAX_ASYNC_REQUESTS); +properties.add(ACK_TIMEOUT); +properties.add(PRIORITY_LEVEL); +properties.add(RECEIVER_QUEUE_SIZE); +properties.add(SUBSCRIPTION_TYPE); +properties.add(MAX_WAIT_TIME); + +PROPERTIES = Collections.unmodifiableList(properties); + +final Set relationships = new HashSet<>(); +relationships.add(REL_SUCCESS); +RELATIONSHIPS = Collections.unmodifiableSet(relationships); +} + +@Override +public Set getRelationships() { +return RELATIONSHIPS; +} + +@Override +protected List getSupportedPropertyDescriptors() { +return PROPERTIES; +} + +@Override +public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + +try { +if (context.getProperty(ASYNC_ENABLED).asBoolean()) { +// Launch consumers +consumeAsync(context, session); + +// Handle completed consumers +handleAsync(context, session); + +} else { +consume(context, session); +} +} catch (PulsarClientException e) { +getLogger().error("Unable to consume from Pulsar Topic ", e); +context.yield(); +throw new ProcessException(e); +} + +} + +private void handleAsync(ProcessContext context, ProcessSession session) { + +try { +Future done = consumerService.take(); +Message msg = done.get(); + +if
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183209099 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.pulsar.pool.ResourcePoolImpl; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +@Tags({ "Pulsar"}) +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.") +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() +.name("Maximum concurrent lookup-requests") +.description("Number of concurrent lookup-requests allowed on each broker-connection to prevent " ++ "overload on broker. (default: 5000) It should be configured with higher value only in case " ++ "of it requires to produce/subscribe on thousands of topics") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("5000") +.build(); + +public static final PropertyDescriptor CONNECTIONS_PER_BROKER = new PropertyDescriptor.Builder() +.name("Maximum connects per Pulsar broker") +.description("Sets the max number of connection that the client library will open to a single broker.\n" + +"By default, the connection pool will use a single connection for all the producers and consumers. " + +"Increasing this parameter may improve throughput when using many producers over a high latency connection") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) --- End diff -- > Consider adding expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY). In fact, you might want to consider that for all of these properties so you can make it more customizable. ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183235146 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java --- @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER; +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicLong; + + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; + +@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "1.0"}) +@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Pulsar using the Pulsar 1.x client API. " ++ "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. " ++ "The complementary NiFi processor for fetching messages is ConsumePulsarRecord_1_0.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to " ++ "FlowFiles that are routed to success.") +@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, ConsumePulsarRecord_1_X.class}) +public class PublishPulsarRecord_1_X extends AbstractPulsarProducerProcessor { + +private static final List PROPERTIES; +private static final Set RELATIONSHIPS; + +static { +final List properties = new ArrayList<>(); +properties.add(PULSAR_CLIENT_SERVICE); +properties.add(RECORD_READER); +properties.add(RECORD_WRITER); +properties.add(TOPIC); +properties.add(ASYNC_ENABLED); +properties.add(MAX_ASYNC_REQUESTS); +properties.add(BATCHING_ENABLED); +
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183235215 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/RecordBasedConst.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriterFactory; + +public final class RecordBasedConst { + +public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() +.name("record-reader") +.displayName("Record Reader") +.description("The Record Reader to use for incoming FlowFiles") +.identifiesControllerService(RecordReaderFactory.class) +.expressionLanguageSupported(false) +.required(true) +.build(); + +public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() +.name("record-writer") +.displayName("Record Writer") +.description("The Record Writer to use in order to serialize the data before sending to Pulsar") +.identifiesControllerService(RecordSetWriterFactory.class) +.expressionLanguageSupported(false) +.required(true) +.build(); + +private RecordBasedConst() { --- End diff -- Not needed. ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183235033 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/PublishPulsarRecord_1_X.java --- @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_READER; +import static org.apache.nifi.processors.pulsar.pubsub.RecordBasedConst.RECORD_WRITER; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.atomic.AtomicLong; + + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processors.pulsar.AbstractPulsarProducerProcessor; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.util.StringUtils; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; + +@Tags({"Apache", "Pulsar", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "1.0"}) +@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Pulsar using the Pulsar 1.x client API. " ++ "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. " ++ "The complementary NiFi processor for fetching messages is ConsumePulsarRecord_1_0.") +@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) +@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Pulsar for this FlowFile. This attribute is added only to " ++ "FlowFiles that are routed to success.") +@SeeAlso({PublishPulsar_1_X.class, ConsumePulsar_1_X.class, ConsumePulsarRecord_1_X.class}) +public class PublishPulsarRecord_1_X extends AbstractPulsarProducerProcessor { + +private static final List PROPERTIES; +private static final Set RELATIONSHIPS; + +static { +final List properties = new ArrayList<>(); +properties.add(PULSAR_CLIENT_SERVICE); +properties.add(RECORD_READER); +properties.add(RECORD_WRITER); +properties.add(TOPIC); +properties.add(ASYNC_ENABLED); +properties.add(MAX_ASYNC_REQUESTS); +properties.add(BATCHING_ENABLED); +
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183234838 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-processors/src/main/java/org/apache/nifi/processors/pulsar/pubsub/ConsumePulsar_1_X.java --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.pulsar.pubsub; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processors.pulsar.AbstractPulsarConsumerProcessor; +import org.apache.nifi.pulsar.PulsarClientPool; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; + +@Tags({"Pulsar", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume"}) +@CapabilityDescription("Consumes messages from Apache Pulsar " ++ "The complementary NiFi processor for sending messages is PublishPulsar.") +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +public class ConsumePulsar_1_X extends AbstractPulsarConsumerProcessor { + +private static final List PROPERTIES; +private static final Set RELATIONSHIPS; + +static { +final List properties = new ArrayList<>(); +properties.add(PULSAR_CLIENT_SERVICE); +properties.add(TOPIC); +properties.add(SUBSCRIPTION); +properties.add(ASYNC_ENABLED); +properties.add(MAX_ASYNC_REQUESTS); +properties.add(ACK_TIMEOUT); +properties.add(PRIORITY_LEVEL); +properties.add(RECEIVER_QUEUE_SIZE); +properties.add(SUBSCRIPTION_TYPE); +properties.add(MAX_WAIT_TIME); + +PROPERTIES = Collections.unmodifiableList(properties); + +final Set relationships = new HashSet<>(); +relationships.add(REL_SUCCESS); +RELATIONSHIPS = Collections.unmodifiableSet(relationships); +} + +@Override +public Set getRelationships() { +return RELATIONSHIPS; +} + +@Override +protected List getSupportedPropertyDescriptors() { +return PROPERTIES; +} + +@Override +public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + +try { +if (context.getProperty(ASYNC_ENABLED).asBoolean()) { +// Launch consumers +consumeAsync(context, session); + +// Handle completed consumers +handleAsync(context, session); + +} else { +consume(context, session); +} +} catch (PulsarClientException e) { +getLogger().error("Unable to consume from Pulsar Topic ", e); +context.yield(); +throw new ProcessException(e); +} + +} + +private void handleAsync(ProcessContext context, ProcessSession session) { + +try { +Future done = consumerService.take(); +Message msg = done.get(); + +if
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183209046 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.pulsar.pool.ResourcePoolImpl; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +@Tags({ "Pulsar"}) --- End diff -- Should be fleshed out with more tags. ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183235251 --- Diff: nifi-nar-bundles/pom.xml --- @@ -93,6 +93,7 @@ nifi-spark-bundle nifi-atlas-bundle nifi-druid-bundle +nifi-pulsar-bundle --- End diff -- You need to add a dependency declaration for the NAR in `nifi-assembly/pom.xml`. Just putting that note here since there's no really good place to track that in the review. ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183208828 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service-api/src/main/java/org/apache/nifi/pulsar/PulsarClientPool.java --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.pulsar.pool.ResourcePool; + + +@Tags({"Pulsar"}) +@CapabilityDescription("Provides the ability to create Pulsar Producer / Consumer instances on demand, based on the configuration." + + "properties defined") +/** + * Service definition for apache Pulsar Client ControllerService + * responsible for maintaining a pool of @PulsarProducer and + * @PulsarConsumer objects. + * + * Since both of these objects can be reused, in a manner similar + * to database connections, and the cost to create these objects is + * relatively high. The PulsarClientPool keeps these objects in pools + * for re-use. + * + * @author david --- End diff -- Please remove. ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183209058 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.pulsar.pool.ResourcePoolImpl; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +@Tags({ "Pulsar"}) +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.") +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) --- End diff -- Consider adding `expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)`. ---
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183209071 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/src/main/java/org/apache/nifi/pulsar/StandardPulsarClientPool.java --- @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.pulsar; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.pulsar.pool.PulsarConsumerFactory; +import org.apache.nifi.pulsar.pool.PulsarProducerFactory; +import org.apache.nifi.pulsar.pool.ResourcePool; +import org.apache.nifi.pulsar.pool.ResourcePoolImpl; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.pulsar.client.api.ClientConfiguration; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.impl.auth.AuthenticationTls; + +@Tags({ "Pulsar"}) +@CapabilityDescription("Standard ControllerService implementation of PulsarClientService.") +public class StandardPulsarClientPool extends AbstractControllerService implements PulsarClientPool { + +public static final PropertyDescriptor PULSAR_SERVICE_URL = new PropertyDescriptor +.Builder().name("PULSAR_SERVICE_URL") +.displayName("Pulsar Service URL") +.description("URL for the Pulsar cluster, e.g localhost:6650") +.required(true) +.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) +.build(); + +public static final PropertyDescriptor CONCURRENT_LOOKUP_REQUESTS = new PropertyDescriptor.Builder() +.name("Maximum concurrent lookup-requests") +.description("Number of concurrent lookup-requests allowed on each broker-connection to prevent " ++ "overload on broker. (default: 5000) It should be configured with higher value only in case " ++ "of it requires to produce/subscribe on thousands of topics") +.required(false) +.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) +.defaultValue("5000") --- End diff -- Are you sure this is a sane default? Your description suggests that 5,000 is the upper end of where most users would want to be. ---
[GitHub] nifi pull request #2638: NIFI-5082: Added support for custom Oracle timestam...
Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2638 ---
[jira] [Commented] (NIFI-5082) SQL processors do not handle Avro conversion of Oracle timestamps correctly
[ https://issues.apache.org/jira/browse/NIFI-5082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447423#comment-16447423 ] ASF GitHub Bot commented on NIFI-5082: -- Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2638 > SQL processors do not handle Avro conversion of Oracle timestamps correctly > --- > > Key: NIFI-5082 > URL: https://issues.apache.org/jira/browse/NIFI-5082 > Project: Apache NiFi > Issue Type: Bug >Reporter: Matt Burgess >Assignee: Matt Burgess >Priority: Major > > In JdbcCommon (used by such processors as ExecuteSQL and QueryDatabaseTable), > if a ResultSet column is not a CLOB or BLOB, its value is retrieved using > getObject(), then further processing is done based on the SQL type and/or the > Java class of the value. > However, in Oracle when getObject() is called on a Timestamp column, it > returns an Oracle-specific TIMESTAMP class which does not inherit from > java.sql.Timestamp or java.sql.Date. Thus the processing "falls through" and > its value is attempted to be inserted as a string, which violates the Avro > schema (which correctly recognized it as a long of timestamp logical type). > At least for Oracle, the right way to process a Timestamp column is to call > getTimestamp() rather than getObject(), the former returns a > java.sql.Timestamp object which would correctly be processed by the current > code. I would hope that all drivers would support this but we would want to > test on (at least) MySQL, Oracle, and PostgreSQL. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5082) SQL processors do not handle Avro conversion of Oracle timestamps correctly
[ https://issues.apache.org/jira/browse/NIFI-5082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447422#comment-16447422 ] ASF subversion and git services commented on NIFI-5082: --- Commit ba32879ec81cdfa3c44734ab8b210b13fa0581ac in nifi's branch refs/heads/master from [~ca9mbu] [ https://git-wip-us.apache.org/repos/asf?p=nifi.git;h=ba32879 ] NIFI-5082: Added support for custom Oracle timestamp types to Avro conversion This closes #2638 Signed-off-by: Mike Thomsen> SQL processors do not handle Avro conversion of Oracle timestamps correctly > --- > > Key: NIFI-5082 > URL: https://issues.apache.org/jira/browse/NIFI-5082 > Project: Apache NiFi > Issue Type: Bug >Reporter: Matt Burgess >Assignee: Matt Burgess >Priority: Major > > In JdbcCommon (used by such processors as ExecuteSQL and QueryDatabaseTable), > if a ResultSet column is not a CLOB or BLOB, its value is retrieved using > getObject(), then further processing is done based on the SQL type and/or the > Java class of the value. > However, in Oracle when getObject() is called on a Timestamp column, it > returns an Oracle-specific TIMESTAMP class which does not inherit from > java.sql.Timestamp or java.sql.Date. Thus the processing "falls through" and > its value is attempted to be inserted as a string, which violates the Avro > schema (which correctly recognized it as a long of timestamp logical type). > At least for Oracle, the right way to process a Timestamp column is to call > getTimestamp() rather than getObject(), the former returns a > java.sql.Timestamp object which would correctly be processed by the current > code. I would hope that all drivers would support this but we would want to > test on (at least) MySQL, Oracle, and PostgreSQL. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (NIFI-5106) Add provenance reporting to GetSolr
Johannes Peter created NIFI-5106: Summary: Add provenance reporting to GetSolr Key: NIFI-5106 URL: https://issues.apache.org/jira/browse/NIFI-5106 Project: Apache NiFi Issue Type: Improvement Reporter: Johannes Peter Assignee: Johannes Peter -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5106) Add provenance reporting to GetSolr
[ https://issues.apache.org/jira/browse/NIFI-5106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447329#comment-16447329 ] ASF GitHub Bot commented on NIFI-5106: -- Github user JohannesDaniel commented on the issue: https://github.com/apache/nifi/pull/2650 @MikeThomsen Hi, just saw that GetSolr does not create receive provenance events when data is retrieved and written into flowfiles. Added that. > Add provenance reporting to GetSolr > --- > > Key: NIFI-5106 > URL: https://issues.apache.org/jira/browse/NIFI-5106 > Project: Apache NiFi > Issue Type: Improvement >Reporter: Johannes Peter >Assignee: Johannes Peter >Priority: Minor > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi issue #2650: NIFI-5106 Add provenance reporting to GetSolr
Github user JohannesDaniel commented on the issue: https://github.com/apache/nifi/pull/2650 @MikeThomsen Hi, just saw that GetSolr does not create receive provenance events when data is retrieved and written into flowfiles. Added that. ---
[GitHub] nifi pull request #2638: NIFI-5082: Added support for custom Oracle timestam...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2638#discussion_r183256566 --- Diff: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java --- @@ -350,6 +350,8 @@ private static RecordFieldType getFieldType(final int sqlType) { return RecordFieldType.TIME; case Types.TIMESTAMP: case Types.TIMESTAMP_WITH_TIMEZONE: +case -101: // Oracle's TIMESTAMP WITH TIME ZONE --- End diff -- @mattyb149 @markap14 Given how proprietary Oracle can be, I don't see how this could hurt because it's going to be difficult to say what they'll do in the future and how Calcite might respond to that. ---
[jira] [Commented] (NIFI-5082) SQL processors do not handle Avro conversion of Oracle timestamps correctly
[ https://issues.apache.org/jira/browse/NIFI-5082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447421#comment-16447421 ] ASF GitHub Bot commented on NIFI-5082: -- Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2638#discussion_r183256566 --- Diff: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/ResultSetRecordSet.java --- @@ -350,6 +350,8 @@ private static RecordFieldType getFieldType(final int sqlType) { return RecordFieldType.TIME; case Types.TIMESTAMP: case Types.TIMESTAMP_WITH_TIMEZONE: +case -101: // Oracle's TIMESTAMP WITH TIME ZONE --- End diff -- @mattyb149 @markap14 Given how proprietary Oracle can be, I don't see how this could hurt because it's going to be difficult to say what they'll do in the future and how Calcite might respond to that. > SQL processors do not handle Avro conversion of Oracle timestamps correctly > --- > > Key: NIFI-5082 > URL: https://issues.apache.org/jira/browse/NIFI-5082 > Project: Apache NiFi > Issue Type: Bug >Reporter: Matt Burgess >Assignee: Matt Burgess >Priority: Major > > In JdbcCommon (used by such processors as ExecuteSQL and QueryDatabaseTable), > if a ResultSet column is not a CLOB or BLOB, its value is retrieved using > getObject(), then further processing is done based on the SQL type and/or the > Java class of the value. > However, in Oracle when getObject() is called on a Timestamp column, it > returns an Oracle-specific TIMESTAMP class which does not inherit from > java.sql.Timestamp or java.sql.Date. Thus the processing "falls through" and > its value is attempted to be inserted as a string, which violates the Avro > schema (which correctly recognized it as a long of timestamp logical type). > At least for Oracle, the right way to process a Timestamp column is to call > getTimestamp() rather than getObject(), the former returns a > java.sql.Timestamp object which would correctly be processed by the current > code. I would hope that all drivers would support this but we would want to > test on (at least) MySQL, Oracle, and PostgreSQL. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #2650: NIFI-5106 Add provenance reporting to GetSolr
GitHub user JohannesDaniel opened a pull request: https://github.com/apache/nifi/pull/2650 NIFI-5106 Add provenance reporting to GetSolr Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/JohannesDaniel/nifi NIFI-5106-provenanceGetSolr Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2650.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2650 commit 43f326e2b78cd5a0d40d1647b3a28b640fe8e784 Author: JohannesDanielDate: 2018-04-22T19:06:33Z Added provenance reporting ---
[jira] [Commented] (NIFI-5106) Add provenance reporting to GetSolr
[ https://issues.apache.org/jira/browse/NIFI-5106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447327#comment-16447327 ] ASF GitHub Bot commented on NIFI-5106: -- GitHub user JohannesDaniel opened a pull request: https://github.com/apache/nifi/pull/2650 NIFI-5106 Add provenance reporting to GetSolr Thank you for submitting a contribution to Apache NiFi. In order to streamline the review of the contribution we ask you to ensure the following steps have been taken: ### For all changes: - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message? - [ ] Does your PR title start with NIFI- where is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character. - [ ] Has your PR been rebased against the latest commit within the target branch (typically master)? - [ ] Is your initial contribution a single, squashed commit? ### For code changes: - [ ] Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder? - [ ] Have you written or updated unit tests to verify your changes? - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? - [ ] If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly? - [ ] If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly? - [ ] If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties? ### For documentation related changes: - [ ] Have you ensured that format looks appropriate for the output in which it is rendered? ### Note: Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible. You can merge this pull request into a Git repository by running: $ git pull https://github.com/JohannesDaniel/nifi NIFI-5106-provenanceGetSolr Alternatively you can review and apply these changes as the patch at: https://github.com/apache/nifi/pull/2650.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2650 commit 43f326e2b78cd5a0d40d1647b3a28b640fe8e784 Author: JohannesDanielDate: 2018-04-22T19:06:33Z Added provenance reporting > Add provenance reporting to GetSolr > --- > > Key: NIFI-5106 > URL: https://issues.apache.org/jira/browse/NIFI-5106 > Project: Apache NiFi > Issue Type: Improvement >Reporter: Johannes Peter >Assignee: Johannes Peter >Priority: Minor > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (NIFI-5106) Add provenance reporting to GetSolr
[ https://issues.apache.org/jira/browse/NIFI-5106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447346#comment-16447346 ] ASF GitHub Bot commented on NIFI-5106: -- Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2650 > Add provenance reporting to GetSolr > --- > > Key: NIFI-5106 > URL: https://issues.apache.org/jira/browse/NIFI-5106 > Project: Apache NiFi > Issue Type: Improvement >Reporter: Johannes Peter >Assignee: Johannes Peter >Priority: Minor > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #2650: NIFI-5106 Add provenance reporting to GetSolr
Github user asfgit closed the pull request at: https://github.com/apache/nifi/pull/2650 ---
[GitHub] nifi-minifi pull request #122: MINIFI-451 Updating Apache parent pom to vers...
Github user asfgit closed the pull request at: https://github.com/apache/nifi-minifi/pull/122 ---
[GitHub] nifi-minifi pull request #117: MINIFI-424 Adding the ability to evaluate boo...
Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi-minifi/pull/117#discussion_r183258883 --- Diff: minifi-bootstrap/src/main/java/org/apache/nifi/minifi/bootstrap/RunMiNiFi.java --- @@ -1176,11 +1191,18 @@ public boolean accept(final File dir, final String filename) { @SuppressWarnings({"rawtypes", "unchecked"}) public void start() throws IOException, InterruptedException { +Properties bootstrapProperties = getBootstrapProperties(); + +final String confDir = bootstrapProperties.getProperty(CONF_DIR_KEY); +final File configFile = new File(bootstrapProperties.getProperty(MINIFI_CONFIG_FILE_KEY)); +final Properties transformationConfigProperties = getConfigTransformationProperties(bootstrapProperties); + +for(String name: transformationConfigProperties.stringPropertyNames()) { +defaultLogger.error("config property: name:[" + name + "] value:[" + transformationConfigProperties.getProperty(name) + "] "); --- End diff -- Most definitely better as `info` ---
[jira] [Commented] (NIFI-5105) Update AWS SDK (Spring 2018)
[ https://issues.apache.org/jira/browse/NIFI-5105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447269#comment-16447269 ] Otto Fowler commented on NIFI-5105: --- I think this is a good idea. In NIFI-5022 I had to update. > Update AWS SDK (Spring 2018) > > > Key: NIFI-5105 > URL: https://issues.apache.org/jira/browse/NIFI-5105 > Project: Apache NiFi > Issue Type: Improvement >Affects Versions: 1.6.0 >Reporter: James Wing >Priority: Minor > > Update the AWS SDK version used by nifi-aws-bundle to a recent SDK, with > support for newer AWS features, regions, etc. As part of the upgrade, we > should specify the individual SDK sub-component maven coordinates we actually > use, rather than the entire SDK, to reduce the size of binary distributions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] nifi pull request #2614: Added Apache Pulsar Processors and Controller Servi...
Github user MikeThomsen commented on a diff in the pull request: https://github.com/apache/nifi/pull/2614#discussion_r183235809 --- Diff: nifi-nar-bundles/nifi-pulsar-bundle/nifi-pulsar-client-service/pom.xml --- @@ -0,0 +1,67 @@ + + +http://maven.apache.org/POM/4.0.0; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd;> +4.0.0 + + +org.apache.nifi +nifi-pulsar-bundle +1.7.0-SNAPSHOT + + +nifi-pulsar-client-service +jar + + + +org.apache.nifi +nifi-pulsar-client-service-api +1.7.0-SNAPSHOT --- End diff -- This should be marked as provided as well. ---
[GitHub] nifi issue #2587: NIFI-4185 Add XML Record Reader
Github user JohannesDaniel commented on the issue: https://github.com/apache/nifi/pull/2587 @markap14 - Added EL for record format property - Removed record tag validation ---
[GitHub] nifi issue #2614: Added Apache Pulsar Processors and Controller Service
Github user MikeThomsen commented on the issue: https://github.com/apache/nifi/pull/2614 When I added it to nifi-assembly, I got this error on startup: ``` java.util.ServiceConfigurationError: org.apache.nifi.processor.Processor: Provider org.apache.nifi.processors.pulsar.pubsub.ConsumePulsarRecord_1_X could not be instantiated at java.util.ServiceLoader.fail(ServiceLoader.java:232) at java.util.ServiceLoader.access$100(ServiceLoader.java:185) at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384) at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) at java.util.ServiceLoader$1.next(ServiceLoader.java:480) at org.apache.nifi.nar.ExtensionManager.loadExtensions(ExtensionManager.java:148) at org.apache.nifi.nar.ExtensionManager.discoverExtensions(ExtensionManager.java:123) at org.apache.nifi.web.server.JettyServer.start(JettyServer.java:771) at org.apache.nifi.NiFi.(NiFi.java:157) at org.apache.nifi.NiFi.(NiFi.java:71) at org.apache.nifi.NiFi.main(NiFi.java:292) Caused by: java.lang.NoClassDefFoundError: org/apache/nifi/serialization/MalformedRecordException at java.lang.Class.getDeclaredConstructors0(Native Method) at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671) at java.lang.Class.getConstructor0(Class.java:3075) at java.lang.Class.newInstance(Class.java:412) at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380) ... 8 common frames omitted Caused by: java.lang.ClassNotFoundException: org.apache.nifi.serialization.MalformedRecordException at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 13 common frames omitted ``` Your declaration for `nifi-record` looks right, so we'll have to dig deeper once you get these changes done. ---
[jira] [Commented] (NIFI-4185) Add XML record reader & writer services
[ https://issues.apache.org/jira/browse/NIFI-4185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16447249#comment-16447249 ] ASF GitHub Bot commented on NIFI-4185: -- Github user JohannesDaniel commented on the issue: https://github.com/apache/nifi/pull/2587 @markap14 - Added EL for record format property - Removed record tag validation > Add XML record reader & writer services > --- > > Key: NIFI-4185 > URL: https://issues.apache.org/jira/browse/NIFI-4185 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions >Affects Versions: 1.3.0 >Reporter: Andy LoPresto >Assignee: Johannes Peter >Priority: Major > Labels: json, records, xml > > With the addition of the {{RecordReader}} and {{RecordSetWriter}} paradigm, > XML conversion has not yet been targeted. This will replace the previous > ticket for XML to JSON conversion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)