Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]

2024-05-21 Thread via GitHub


greyp9 commented on PR #8463:
URL: https://github.com/apache/nifi/pull/8463#issuecomment-2123083750

   > @greyp9 It looks like the latest changes are failing on integration tests 
due to property name changes.
   
   Thanks; pushed an update.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]

2024-05-21 Thread via GitHub


exceptionfactory commented on PR #8463:
URL: https://github.com/apache/nifi/pull/8463#issuecomment-2122889110

   @greyp9 It looks like the latest changes are failing on integration tests 
due to property name changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]

2024-05-17 Thread via GitHub


greyp9 commented on code in PR #8463:
URL: https://github.com/apache/nifi/pull/8463#discussion_r1605446151


##
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/config/DeliveryGuarantee.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors.producer.config;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+
+public class DeliveryGuarantee {
+
+public static final String ACKS_CONFIG = "acks";
+
+public static final AllowableValue DELIVERY_REPLICATED = new 
AllowableValue("all", "Guarantee Replicated Delivery",
+"FlowFile will be routed to failure unless the message is 
replicated to the appropriate "
++ "number of Kafka Nodes according to the Topic 
configuration");
+public static final AllowableValue DELIVERY_ONE_NODE = new 
AllowableValue("1", "Guarantee Single Node Delivery",
+"FlowFile will be routed to success if the message is received by 
a single Kafka node, "
++ "whether or not it is replicated. This is faster than 
 "
++ "but can result in data loss if a Kafka node crashes");
+public static final AllowableValue DELIVERY_BEST_EFFORT = new 
AllowableValue("0", "Best Effort",
+"FlowFile will be routed to success after successfully sending the 
content to a Kafka node, "
++ "without waiting for any acknowledgment from the node at 
all. This provides the best performance but may result in data loss.");
+
+public static final PropertyDescriptor DELIVERY_GUARANTEE = new 
PropertyDescriptor.Builder()
+.name(ACKS_CONFIG)

Review Comment:
   This label is brought in from the Kafka source.  Added a link comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]

2024-05-02 Thread via GitHub


greyp9 commented on PR #8463:
URL: https://github.com/apache/nifi/pull/8463#issuecomment-2091285872

   > I see what you mean. I'm suggesting even needing to swap out the 
controller service seems problematic.
   
   The jump from 2.6 to 3.0 involved * slight * changes in the NiFi component 
code (IIRC compatible API parameter types and additional method arguments). 
Less than an hour to remediate.
   
   It is a good point.  Aside from practical considerations (Maven dependencies 
declared at compile time), it is hard to anticipate the scope of future changes 
to the client library API.  It'll be interesting to get additional perspectives 
here.
   
   > But in any case...I dont feel that strongly. Fine as is just sharing my 
thoughts.
   
   And thanks for that!  :)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]

2024-05-02 Thread via GitHub


joewitt commented on PR #8463:
URL: https://github.com/apache/nifi/pull/8463#issuecomment-2091227065

   I see what you mean.  I'm suggesting even needing to swap out the controller 
service seems problematic.
   
   I like the controller service model and I recall why this direction was 
taken.  But we should be able to call it Kafka and not Kafka3 even there.  IF a 
later client changes behavior we can call that by specific version.
   
   But in any case...I dont feel that strongly.  Fine as is just sharing my 
thoughts.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]

2024-05-02 Thread via GitHub


joewitt commented on PR #8463:
URL: https://github.com/apache/nifi/pull/8463#issuecomment-2090978880

   @greyp9 Yeah that is fair.  Dropping the 2.x stuff is certainly fair in a 
diff PR.
   
   More relevant for this PR though would be the naming for the Kafka3 
components and breaking the cycle of users having to change processors with 
Kafka changes.  We had to do this during the 0x/1x/2x transitions as the client 
behaviors just weren't stable enough.  But I suspect now they are (just my view 
-others may have different info).  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]

2024-05-02 Thread via GitHub


greyp9 commented on PR #8463:
URL: https://github.com/apache/nifi/pull/8463#issuecomment-2090939791

   > Can we just drop the old Kafka processors and go all in on the latest 
release? At this point the client libraries have stabilized greatly and us 
supporting the older stuff feels far less important and worthwhile now.
   
   It was difficult to tease out all of the business logic from the existing 
processor set. Some logic is there because of actual user need, and some is 
there because of the way Kafka and NiFi evolved alongside each other. It seems 
like the happy path works well for the new components, but not sure about all 
of the possible runtime permutations.
   
   In my mind, we’re only talking about the 2.6 processor set; the previous 
implementations aren’t available in the development executable. It does seem 
like a good idea to stop investing in enhancements to the 2.6 processors. The 
2.6 work in this PR is only scaffolding, intended to better understand runtime 
processor behavior so it can be reproduced in the new components.
   
   The concern for me is that if we drop the 2.6 set as part of this PR, we’ll 
make things harder for users during the transition period, in case some 
important edge case is missed. It would also complicate this PR. I’d like to 
pitch the idea of a transition period where both sets of components exist, to 
facilitate transition, and to provide a simple fallback if needed.  Longer 
term, it makes a lot of sense for 2.6 to go the way of 2.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]

2024-05-01 Thread via GitHub


joewitt commented on PR #8463:
URL: https://github.com/apache/nifi/pull/8463#issuecomment-2089326292

   Can we just drop the old Kafka processors and go all in on the latest 
release?  At this point the client libraries have stabilized greatly and us 
supporting the older stuff feels far less important and worthwhile now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]

2024-04-08 Thread via GitHub


greyp9 commented on code in PR #8463:
URL: https://github.com/apache/nifi/pull/8463#discussion_r1556223354


##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.kafka.processors.common.KafkaUtils;
+import org.apache.nifi.kafka.processors.consumer.OffsetTracker;
+import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy;
+import org.apache.nifi.kafka.processors.consumer.bundle.ByteRecordBundler;
+import 
org.apache.nifi.kafka.processors.consumer.convert.FlowFileStreamKafkaMessageConverter;
+import org.apache.nifi.kafka.processors.consumer.convert.KafkaMessageConverter;
+import 
org.apache.nifi.kafka.processors.consumer.convert.RecordStreamKafkaMessageConverter;
+import 
org.apache.nifi.kafka.processors.consumer.convert.WrapperRecordStreamKafkaMessageConverter;
+import org.apache.nifi.kafka.service.api.KafkaConnectionService;
+import org.apache.nifi.kafka.service.api.common.PartitionState;
+import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
+import org.apache.nifi.kafka.service.api.consumer.ConsumerConfiguration;
+import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService;
+import org.apache.nifi.kafka.service.api.consumer.PollingContext;
+import org.apache.nifi.kafka.service.api.record.ByteRecord;
+import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
+import org.apache.nifi.kafka.shared.property.KeyEncoding;
+import org.apache.nifi.kafka.shared.property.KeyFormat;
+import org.apache.nifi.kafka.shared.property.OutputStrategy;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.util.StringUtils;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({"kafka", "consumer", "record"})
+public class ConsumeKafka extends AbstractProcessor implements 
VerifiableProcessor {
+
+static final AllowableValue TOPIC_NAME = new AllowableValue("names", 
"names", "Topic is a full topic name or comma separated list of names");
+static final AllowableValue TOPIC_PATTERN = new AllowableValue("pattern", 
"pattern", "Topic is a regex using the Java Pattern syntax");
+
+static final PropertyDescriptor CONNECTION_SERVICE = new 
PropertyDescriptor.Builder()
+.name("Kafka Connection Service")
+.displayName("Kafka Connection Service")
+.description("Provides connections to Kafka Broker for publishing 
Kafka Records")
+.identifiesControllerService(KafkaConnectionService.class)
+.expressionLanguageSupported(NONE)
+.required(true)
+.build();
+
+  

Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]

2024-03-25 Thread via GitHub


greyp9 commented on code in PR #8463:
URL: https://github.com/apache/nifi/pull/8463#discussion_r1538322756


##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.kafka.processors.common.KafkaUtils;
+import org.apache.nifi.kafka.processors.consumer.OffsetTracker;
+import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy;
+import org.apache.nifi.kafka.processors.consumer.bundle.ByteRecordBundler;
+import 
org.apache.nifi.kafka.processors.consumer.convert.FlowFileStreamKafkaMessageConverter;
+import org.apache.nifi.kafka.processors.consumer.convert.KafkaMessageConverter;
+import 
org.apache.nifi.kafka.processors.consumer.convert.RecordStreamKafkaMessageConverter;
+import 
org.apache.nifi.kafka.processors.consumer.convert.WrapperRecordStreamKafkaMessageConverter;
+import org.apache.nifi.kafka.service.api.KafkaConnectionService;
+import org.apache.nifi.kafka.service.api.common.PartitionState;
+import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
+import org.apache.nifi.kafka.service.api.consumer.ConsumerConfiguration;
+import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService;
+import org.apache.nifi.kafka.service.api.consumer.PollingContext;
+import org.apache.nifi.kafka.service.api.record.ByteRecord;
+import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
+import org.apache.nifi.kafka.shared.property.KeyEncoding;
+import org.apache.nifi.kafka.shared.property.KeyFormat;
+import org.apache.nifi.kafka.shared.property.OutputStrategy;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.util.StringUtils;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({"kafka", "consumer", "record"})
+public class ConsumeKafka extends AbstractProcessor implements 
VerifiableProcessor {
+
+static final AllowableValue TOPIC_NAME = new AllowableValue("names", 
"names", "Topic is a full topic name or comma separated list of names");
+static final AllowableValue TOPIC_PATTERN = new AllowableValue("pattern", 
"pattern", "Topic is a regex using the Java Pattern syntax");
+
+static final PropertyDescriptor CONNECTION_SERVICE = new 
PropertyDescriptor.Builder()
+.name("Kafka Connection Service")
+.displayName("Kafka Connection Service")
+.description("Provides connections to Kafka Broker for publishing 
Kafka Records")
+.identifiesControllerService(KafkaConnectionService.class)
+.expressionLanguageSupported(NONE)
+.required(true)
+.build();
+
+  

Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]

2024-03-25 Thread via GitHub


greyp9 commented on code in PR #8463:
URL: https://github.com/apache/nifi/pull/8463#discussion_r1538320629


##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/txn/KafkaTransactionalProducerWrapper.java:
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service.producer.txn;
+
+import org.apache.kafka.clients.producer.Producer;
+
+public class KafkaTransactionalProducerWrapper extends KafkaProducerWrapper {
+
+public KafkaTransactionalProducerWrapper(final Producer 
producer) {
+super(producer);
+}
+
+@Override
+public void init() {
+producer.initTransactions();
+producer.beginTransaction();
+}
+
+@Override
+public void commit() {
+try {
+producer.commitTransaction();
+logger.trace("committed");

Review Comment:
   Yep; thanks.  In general, there will be instances of these that were useful 
in the moment, but no longer helpful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]

2024-03-25 Thread via GitHub


greyp9 commented on code in PR #8463:
URL: https://github.com/apache/nifi/pull/8463#discussion_r1538319116


##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/txn/KafkaProducerWrapper.java:
##
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service.producer.txn;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.nifi.kafka.service.api.producer.PublishContext;
+import org.apache.nifi.kafka.service.api.record.KafkaRecord;
+import org.apache.nifi.kafka.service.producer.ProducerCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Abstract away the configured transactionality of the PublishKafka producer.
+ */
+public abstract class KafkaProducerWrapper {
+protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+protected final Producer producer;
+
+public KafkaProducerWrapper(final Producer producer) {
+this.producer = producer;
+}
+
+/**
+ * Transaction-enabled publish to Kafka involves the use of special Kafka 
client library APIs.
+ */
+public abstract void init();
+
+public void send(final Iterator kafkaRecords, final 
PublishContext publishContext, final ProducerCallback callback) {
+while (kafkaRecords.hasNext()) {
+final KafkaRecord kafkaRecord = kafkaRecords.next();
+producer.send(toProducerRecord(kafkaRecord, publishContext), 
callback);
+callback.send();
+}
+logger.trace("send():inFlight");

Review Comment:
   Maybe not.
   
   Are there reasonable project guidelines for how to log things that should be 
disabled by default, but useful in troubleshooting particular scenarios?  Not 
necessarily suggesting this is one; more of a general question.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]

2024-03-25 Thread via GitHub


greyp9 commented on code in PR #8463:
URL: https://github.com/apache/nifi/pull/8463#discussion_r1538315024


##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/ProducerCallback.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service.producer;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.kafka.service.api.producer.FlowFileResult;
+import org.apache.nifi.kafka.service.api.producer.ProducerRecordMetadata;
+import org.apache.nifi.kafka.shared.util.Notifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
+
+public class ProducerCallback implements Callback {
+private final Logger logger = LoggerFactory.getLogger(getClass());
+
+private final AtomicLong sentCount;
+private final AtomicLong acknowledgedCount;
+private final AtomicLong failedCount;
+private final FlowFile flowFile;
+private final List metadatas;
+private final List exceptions;
+private final Notifier notifier;
+
+public List getExceptions() {
+return exceptions;
+}
+
+public boolean isFailure() {
+return !exceptions.isEmpty();
+}
+
+public ProducerCallback(final FlowFile flowFile) {
+this.sentCount = new AtomicLong(0L);
+this.acknowledgedCount = new AtomicLong(0L);
+this.failedCount = new AtomicLong(0L);
+this.flowFile = flowFile;
+this.metadatas = new ArrayList<>();
+this.exceptions = new ArrayList<>();
+this.notifier = new Notifier();
+}
+
+public long send() {
+return sentCount.incrementAndGet();
+}
+
+@Override
+public void onCompletion(final RecordMetadata metadata, final Exception 
exception) {
+
+// the source `FlowFile` and the associated `RecordMetadata` need to 
somehow be associated...

Review Comment:
   I was originally hoping that this sort of API would not be needed, but there 
were problems when I added transactionality.  Maybe a reasonable method-level 
comment could be used instead to capture this nuance?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]

2024-03-25 Thread via GitHub


greyp9 commented on code in PR #8463:
URL: https://github.com/apache/nifi/pull/8463#discussion_r1538306358


##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/Kafka3ProducerService.java:
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service.producer;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.kafka.service.api.common.PartitionState;
+import org.apache.nifi.kafka.service.api.common.ServiceConfiguration;
+import org.apache.nifi.kafka.service.api.producer.FlowFileResult;
+import org.apache.nifi.kafka.service.api.producer.KafkaProducerService;
+import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration;
+import org.apache.nifi.kafka.service.api.producer.PublishContext;
+import org.apache.nifi.kafka.service.api.producer.RecordSummary;
+import org.apache.nifi.kafka.service.api.record.KafkaRecord;
+import 
org.apache.nifi.kafka.service.producer.txn.KafkaNonTransactionalProducerWrapper;
+import org.apache.nifi.kafka.service.producer.txn.KafkaProducerWrapper;
+import 
org.apache.nifi.kafka.service.producer.txn.KafkaTransactionalProducerWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+public class Kafka3ProducerService implements KafkaProducerService {
+private final Logger logger = LoggerFactory.getLogger(getClass());
+
+private final Producer producer;
+private final List callbacks;
+
+private final ServiceConfiguration serviceConfiguration;
+
+private final KafkaProducerWrapper wrapper;
+
+public Kafka3ProducerService(final Properties properties,
+ final ServiceConfiguration 
serviceConfiguration,
+ final ProducerConfiguration 
producerConfiguration) {
+final ByteArraySerializer serializer = new ByteArraySerializer();
+this.producer = new KafkaProducer<>(properties, serializer, 
serializer);
+this.callbacks = new ArrayList<>();
+
+this.serviceConfiguration = serviceConfiguration;
+
+this.wrapper = producerConfiguration.getUseTransactions()
+? new KafkaTransactionalProducerWrapper(producer)
+: new KafkaNonTransactionalProducerWrapper(producer);
+}
+
+@Override
+public void close() {
+producer.close();
+}
+
+@Override
+public void init() {
+wrapper.init();
+logger.trace("init()");

Review Comment:
   This particular one helped me work out a problem with controller service 
lifecycle, but now uneeded; I will remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]

2024-03-25 Thread via GitHub


greyp9 commented on code in PR #8463:
URL: https://github.com/apache/nifi/pull/8463#discussion_r1538303445


##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/readme.txt:
##
@@ -0,0 +1,2 @@
+Integration tests derived from examples provided in:
+https://github.com/apache/nifi/blob/main/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6/additionalDetails.html

Review Comment:
   I found myself noting things during implementation.  
   
   Where appropriate, I'll make sure any documentation like this is in Markdown 
format.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]

2024-03-25 Thread via GitHub


greyp9 commented on code in PR #8463:
URL: https://github.com/apache/nifi/pull/8463#discussion_r1538303639


##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/README.md:
##
@@ -0,0 +1,14 @@
+
+
+### Kafka3ProducerService
+- As the `send()` API maps cleanly to the context of a single `FlowFile`, 
additional APIs are useful to handle the ability to publish multiple FlowFiles 
in the context of a single publish to Kafka.

Review Comment:
   That makes sense; I will make that change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]

2024-03-25 Thread via GitHub


greyp9 commented on code in PR #8463:
URL: https://github.com/apache/nifi/pull/8463#discussion_r1538301385


##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java:
##
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.TopicListing;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.security.plain.PlainLoginModule;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.VerifiableControllerService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.kafka.service.api.KafkaConnectionService;
+import org.apache.nifi.kafka.service.api.common.ServiceConfiguration;
+import org.apache.nifi.kafka.service.api.consumer.ConsumerConfiguration;
+import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService;
+import org.apache.nifi.kafka.service.api.producer.KafkaProducerService;
+import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration;
+import org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService;
+import org.apache.nifi.kafka.service.producer.Kafka3ProducerService;
+import org.apache.nifi.kafka.shared.property.SaslMechanism;
+import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider;
+import 
org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
+import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED;
+import static 
org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL;
+import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_LOCATION;
+import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_PASSWORD;
+import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_TYPE;
+import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEY_PASSWORD;
+import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_LOCATION;
+import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_PASSWORD;
+import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_TYPE;
+
+public class Kafka3ConnectionService extends AbstractControllerService 
implements KafkaConnectionService, VerifiableControllerService {
+
+public static final PropertyDescriptor BOOTSTRAP_SERVERS = new 
PropertyDescriptor.Builder()
+.name("bootstrap.servers")
+.displayName("Bootstrap Servers")
+.description("Comma-separated list of Kafka Bootstrap Servers in 
the format host:port. Mapped to Kafka bootstrap.servers")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+

Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]

2024-03-15 Thread via GitHub


exceptionfactory commented on code in PR #8463:
URL: https://github.com/apache/nifi/pull/8463#discussion_r1526461916


##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/README.md:
##
@@ -0,0 +1,14 @@
+
+
+### Kafka3ProducerService
+- As the `send()` API maps cleanly to the context of a single `FlowFile`, 
additional APIs are useful to handle the ability to publish multiple FlowFiles 
in the context of a single publish to Kafka.

Review Comment:
   Is there a reason for this separate readme as opposed to including these 
comments in the class itself?



##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/readme.txt:
##
@@ -0,0 +1,2 @@
+Integration tests derived from examples provided in:
+https://github.com/apache/nifi/blob/main/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6/additionalDetails.html

Review Comment:
   Is this file necessary? Recommend changing to Markdown for consistency if 
you prefer to keep it.



##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaWrapperRecordIT.java:
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.io.IOUtils;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.header.Header;
+import org.apache.nifi.kafka.shared.property.PublishStrategy;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.MethodOrderer;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestMethodOrder;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@TestMethodOrder(MethodOrderer.MethodName.class)
+public class PublishKafkaWrapperRecordIT extends AbstractPublishKafkaIT {
+private static final String TEST_RESOURCE = 
"org/apache/nifi/kafka/processors/publish/ffwrapper.json";
+
+private static final String KEY_ATTRIBUTE_KEY = "keyAttribute";
+private static final String KEY_ATTRIBUTE_VALUE = "keyAttributeValue";
+
+private static final int TEST_RECORD_COUNT = 3;
+
+@Test
+public void test_1_KafkaTestContainerProduceOneFlowFile() throws 
InitializationException, IOException {
+final TestRunner runner = 
TestRunners.newTestRunner(PublishKafka.class);
+runner.setValidateExpressionUsage(false);
+runner.setProperty(PublishKafka.CONNECTION_SERVICE, 
addKafkaConnectionService(runner));
+addRecordReaderService(runner);
+addRecordWriterService(runner);
+addRecordKeyWriterService(runner);
+
+runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName());
+runner.setProperty(PublishKafka.KEY, KEY_ATTRIBUTE_KEY);
+runner.setProperty(PublishKafka.MESSAGE_KEY_FIELD, "address");
+runner.setProperty(PublishKafka.ATTRIBUTE_NAME_REGEX, "a.*");
+runner.setProperty(PublishKafka.PUBLISH_STRATEGY, 
PublishStrategy.USE_WRAPPER.name());
+
+final Map attributes = new HashMap<>();
+attributes.put(KEY_ATTRIBUTE_KEY, KEY_ATTRIBUTE_VALUE);
+attributes.put("a1", "valueA1");
+attributes.put("b1", "valueB1");
+final byte[] bytesFlowFile = 
IOUtils.toByteArray(Objects.requireNonNull(
+getClass().getClassLoader().getResource(TEST_RESOURCE)));
+

Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]

2024-03-12 Thread via GitHub


greyp9 commented on code in PR #8463:
URL: https://github.com/apache/nifi/pull/8463#discussion_r1522080555


##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.kafka.processors.common.KafkaUtils;
+import org.apache.nifi.kafka.processors.consumer.OffsetTracker;
+import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy;
+import org.apache.nifi.kafka.processors.consumer.bundle.ByteRecordBundler;
+import 
org.apache.nifi.kafka.processors.consumer.convert.FlowFileStreamKafkaMessageConverter;
+import org.apache.nifi.kafka.processors.consumer.convert.KafkaMessageConverter;
+import 
org.apache.nifi.kafka.processors.consumer.convert.RecordStreamKafkaMessageConverter;
+import 
org.apache.nifi.kafka.processors.consumer.convert.WrapperRecordStreamKafkaMessageConverter;
+import org.apache.nifi.kafka.service.api.KafkaConnectionService;
+import org.apache.nifi.kafka.service.api.common.PartitionState;
+import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
+import org.apache.nifi.kafka.service.api.consumer.ConsumerConfiguration;
+import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService;
+import org.apache.nifi.kafka.service.api.consumer.PollingContext;
+import org.apache.nifi.kafka.service.api.record.ByteRecord;
+import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
+import org.apache.nifi.kafka.shared.property.KeyEncoding;
+import org.apache.nifi.kafka.shared.property.KeyFormat;
+import org.apache.nifi.kafka.shared.property.OutputStrategy;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.util.StringUtils;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({"kafka", "consumer", "record"})
+public class ConsumeKafka extends AbstractProcessor implements 
VerifiableProcessor {
+
+static final AllowableValue TOPIC_NAME = new AllowableValue("names", 
"names", "Topic is a full topic name or comma separated list of names");
+static final AllowableValue TOPIC_PATTERN = new AllowableValue("pattern", 
"pattern", "Topic is a regex using the Java Pattern syntax");
+
+static final PropertyDescriptor CONNECTION_SERVICE = new 
PropertyDescriptor.Builder()
+.name("Kafka Connection Service")
+.displayName("Kafka Connection Service")
+.description("Provides connections to Kafka Broker for publishing 
Kafka Records")
+.identifiesControllerService(KafkaConnectionService.class)
+.expressionLanguageSupported(NONE)
+.required(true)
+.build();
+
+  

Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]

2024-03-12 Thread via GitHub


greyp9 commented on code in PR #8463:
URL: https://github.com/apache/nifi/pull/8463#discussion_r1522076394


##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java:
##
@@ -0,0 +1,402 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.service;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.ListTopicsResult;
+import org.apache.kafka.clients.admin.TopicListing;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.security.plain.PlainLoginModule;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.VerifiableControllerService;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.kafka.service.api.KafkaConnectionService;
+import org.apache.nifi.kafka.service.api.common.ServiceConfiguration;
+import org.apache.nifi.kafka.service.api.consumer.ConsumerConfiguration;
+import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService;
+import org.apache.nifi.kafka.service.api.producer.KafkaProducerService;
+import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration;
+import org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService;
+import org.apache.nifi.kafka.service.producer.Kafka3ProducerService;
+import org.apache.nifi.kafka.shared.property.SaslMechanism;
+import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider;
+import 
org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
+import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED;
+import static 
org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL;
+import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_LOCATION;
+import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_PASSWORD;
+import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_TYPE;
+import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEY_PASSWORD;
+import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_LOCATION;
+import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_PASSWORD;
+import static 
org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_TYPE;
+
+public class Kafka3ConnectionService extends AbstractControllerService 
implements KafkaConnectionService, VerifiableControllerService {
+
+public static final PropertyDescriptor BOOTSTRAP_SERVERS = new 
PropertyDescriptor.Builder()
+.name("bootstrap.servers")
+.displayName("Bootstrap Servers")
+.description("Comma-separated list of Kafka Bootstrap Servers in 
the format host:port. Mapped to Kafka bootstrap.servers")
+.required(true)
+.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+

Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]

2024-03-12 Thread via GitHub


greyp9 commented on code in PR #8463:
URL: https://github.com/apache/nifi/pull/8463#discussion_r1522075657


##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java:
##
@@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.kafka.processors.producer.common.PublishKafkaUtil;
+import org.apache.nifi.kafka.processors.producer.config.DeliveryGuarantee;
+import 
org.apache.nifi.kafka.processors.producer.convert.DelimitedStreamKafkaRecordConverter;
+import 
org.apache.nifi.kafka.processors.producer.convert.FlowFileStreamKafkaRecordConverter;
+import org.apache.nifi.kafka.processors.producer.convert.KafkaRecordConverter;
+import 
org.apache.nifi.kafka.processors.producer.convert.RecordStreamKafkaRecordConverter;
+import 
org.apache.nifi.kafka.processors.producer.convert.RecordWrapperStreamKafkaRecordConverter;
+import 
org.apache.nifi.kafka.processors.producer.header.AttributesHeadersFactory;
+import org.apache.nifi.kafka.processors.producer.header.HeadersFactory;
+import org.apache.nifi.kafka.processors.producer.key.AttributeKeyFactory;
+import org.apache.nifi.kafka.processors.producer.key.KeyFactory;
+import org.apache.nifi.kafka.processors.producer.key.MessageKeyFactory;
+import 
org.apache.nifi.kafka.processors.producer.wrapper.RecordMetadataStrategy;
+import org.apache.nifi.kafka.service.api.KafkaConnectionService;
+import org.apache.nifi.kafka.service.api.common.PartitionState;
+import org.apache.nifi.kafka.service.api.producer.FlowFileResult;
+import org.apache.nifi.kafka.service.api.producer.KafkaProducerService;
+import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration;
+import org.apache.nifi.kafka.service.api.producer.PublishContext;
+import org.apache.nifi.kafka.service.api.producer.RecordSummary;
+import org.apache.nifi.kafka.service.api.record.KafkaRecord;
+import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
+import org.apache.nifi.kafka.shared.component.KafkaPublishComponent;
+import org.apache.nifi.kafka.shared.property.FailureStrategy;
+import org.apache.nifi.kafka.shared.property.KeyEncoding;
+import org.apache.nifi.kafka.shared.property.PublishStrategy;
+import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+import java.io.BufferedInputStream;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+
+@Tags({"kafka", "producer", "record"})
+public class PublishKafka extends AbstractProcessor implements 
KafkaPublishComponent, VerifiableProcessor {

Review Comment:
   I'll verify the counter and attribute for message count.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the 

Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]

2024-03-12 Thread via GitHub


greyp9 commented on code in PR #8463:
URL: https://github.com/apache/nifi/pull/8463#discussion_r1522074061


##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.kafka.processors.common.KafkaUtils;
+import org.apache.nifi.kafka.processors.consumer.OffsetTracker;
+import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy;
+import org.apache.nifi.kafka.processors.consumer.bundle.ByteRecordBundler;
+import 
org.apache.nifi.kafka.processors.consumer.convert.FlowFileStreamKafkaMessageConverter;
+import org.apache.nifi.kafka.processors.consumer.convert.KafkaMessageConverter;
+import 
org.apache.nifi.kafka.processors.consumer.convert.RecordStreamKafkaMessageConverter;
+import 
org.apache.nifi.kafka.processors.consumer.convert.WrapperRecordStreamKafkaMessageConverter;
+import org.apache.nifi.kafka.service.api.KafkaConnectionService;
+import org.apache.nifi.kafka.service.api.common.PartitionState;
+import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset;
+import org.apache.nifi.kafka.service.api.consumer.ConsumerConfiguration;
+import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService;
+import org.apache.nifi.kafka.service.api.consumer.PollingContext;
+import org.apache.nifi.kafka.service.api.record.ByteRecord;
+import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
+import org.apache.nifi.kafka.shared.property.KeyEncoding;
+import org.apache.nifi.kafka.shared.property.KeyFormat;
+import org.apache.nifi.kafka.shared.property.OutputStrategy;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.util.StringUtils;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@Tags({"kafka", "consumer", "record"})
+public class ConsumeKafka extends AbstractProcessor implements 
VerifiableProcessor {

Review Comment:
   Thanks.  Need to check annotations to make sure those are all propagated 
where appropriate.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]

2024-03-11 Thread via GitHub


taz1988 commented on code in PR #8463:
URL: https://github.com/apache/nifi/pull/8463#discussion_r1517541500


##
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java:
##
@@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.kafka.processors;
+
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.kafka.processors.producer.common.PublishKafkaUtil;
+import org.apache.nifi.kafka.processors.producer.config.DeliveryGuarantee;
+import 
org.apache.nifi.kafka.processors.producer.convert.DelimitedStreamKafkaRecordConverter;
+import 
org.apache.nifi.kafka.processors.producer.convert.FlowFileStreamKafkaRecordConverter;
+import org.apache.nifi.kafka.processors.producer.convert.KafkaRecordConverter;
+import 
org.apache.nifi.kafka.processors.producer.convert.RecordStreamKafkaRecordConverter;
+import 
org.apache.nifi.kafka.processors.producer.convert.RecordWrapperStreamKafkaRecordConverter;
+import 
org.apache.nifi.kafka.processors.producer.header.AttributesHeadersFactory;
+import org.apache.nifi.kafka.processors.producer.header.HeadersFactory;
+import org.apache.nifi.kafka.processors.producer.key.AttributeKeyFactory;
+import org.apache.nifi.kafka.processors.producer.key.KeyFactory;
+import org.apache.nifi.kafka.processors.producer.key.MessageKeyFactory;
+import 
org.apache.nifi.kafka.processors.producer.wrapper.RecordMetadataStrategy;
+import org.apache.nifi.kafka.service.api.KafkaConnectionService;
+import org.apache.nifi.kafka.service.api.common.PartitionState;
+import org.apache.nifi.kafka.service.api.producer.FlowFileResult;
+import org.apache.nifi.kafka.service.api.producer.KafkaProducerService;
+import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration;
+import org.apache.nifi.kafka.service.api.producer.PublishContext;
+import org.apache.nifi.kafka.service.api.producer.RecordSummary;
+import org.apache.nifi.kafka.service.api.record.KafkaRecord;
+import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
+import org.apache.nifi.kafka.shared.component.KafkaPublishComponent;
+import org.apache.nifi.kafka.shared.property.FailureStrategy;
+import org.apache.nifi.kafka.shared.property.KeyEncoding;
+import org.apache.nifi.kafka.shared.property.PublishStrategy;
+import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.VerifiableProcessor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+import java.io.BufferedInputStream;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+
+@Tags({"kafka", "producer", "record"})
+public class PublishKafka extends AbstractProcessor implements 
KafkaPublishComponent, VerifiableProcessor {

Review Comment:
   InputRequirement, CapabilityDescription, WritesAttribute annotations are 
missing



##