Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]
greyp9 commented on PR #8463: URL: https://github.com/apache/nifi/pull/8463#issuecomment-2123083750 > @greyp9 It looks like the latest changes are failing on integration tests due to property name changes. Thanks; pushed an update. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]
exceptionfactory commented on PR #8463: URL: https://github.com/apache/nifi/pull/8463#issuecomment-2122889110 @greyp9 It looks like the latest changes are failing on integration tests due to property name changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]
greyp9 commented on code in PR #8463: URL: https://github.com/apache/nifi/pull/8463#discussion_r1605446151 ## nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/producer/config/DeliveryGuarantee.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.processors.producer.config; + +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; + +public class DeliveryGuarantee { + +public static final String ACKS_CONFIG = "acks"; + +public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery", +"FlowFile will be routed to failure unless the message is replicated to the appropriate " ++ "number of Kafka Nodes according to the Topic configuration"); +public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", +"FlowFile will be routed to success if the message is received by a single Kafka node, " ++ "whether or not it is replicated. This is faster than " ++ "but can result in data loss if a Kafka node crashes"); +public static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", +"FlowFile will be routed to success after successfully sending the content to a Kafka node, " ++ "without waiting for any acknowledgment from the node at all. This provides the best performance but may result in data loss."); + +public static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() +.name(ACKS_CONFIG) Review Comment: This label is brought in from the Kafka source. Added a link comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]
greyp9 commented on PR #8463: URL: https://github.com/apache/nifi/pull/8463#issuecomment-2091285872 > I see what you mean. I'm suggesting even needing to swap out the controller service seems problematic. The jump from 2.6 to 3.0 involved * slight * changes in the NiFi component code (IIRC compatible API parameter types and additional method arguments). Less than an hour to remediate. It is a good point. Aside from practical considerations (Maven dependencies declared at compile time), it is hard to anticipate the scope of future changes to the client library API. It'll be interesting to get additional perspectives here. > But in any case...I dont feel that strongly. Fine as is just sharing my thoughts. And thanks for that! :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]
joewitt commented on PR #8463: URL: https://github.com/apache/nifi/pull/8463#issuecomment-2091227065 I see what you mean. I'm suggesting even needing to swap out the controller service seems problematic. I like the controller service model and I recall why this direction was taken. But we should be able to call it Kafka and not Kafka3 even there. IF a later client changes behavior we can call that by specific version. But in any case...I dont feel that strongly. Fine as is just sharing my thoughts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]
joewitt commented on PR #8463: URL: https://github.com/apache/nifi/pull/8463#issuecomment-2090978880 @greyp9 Yeah that is fair. Dropping the 2.x stuff is certainly fair in a diff PR. More relevant for this PR though would be the naming for the Kafka3 components and breaking the cycle of users having to change processors with Kafka changes. We had to do this during the 0x/1x/2x transitions as the client behaviors just weren't stable enough. But I suspect now they are (just my view -others may have different info). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]
greyp9 commented on PR #8463: URL: https://github.com/apache/nifi/pull/8463#issuecomment-2090939791 > Can we just drop the old Kafka processors and go all in on the latest release? At this point the client libraries have stabilized greatly and us supporting the older stuff feels far less important and worthwhile now. It was difficult to tease out all of the business logic from the existing processor set. Some logic is there because of actual user need, and some is there because of the way Kafka and NiFi evolved alongside each other. It seems like the happy path works well for the new components, but not sure about all of the possible runtime permutations. In my mind, we’re only talking about the 2.6 processor set; the previous implementations aren’t available in the development executable. It does seem like a good idea to stop investing in enhancements to the 2.6 processors. The 2.6 work in this PR is only scaffolding, intended to better understand runtime processor behavior so it can be reproduced in the new components. The concern for me is that if we drop the 2.6 set as part of this PR, we’ll make things harder for users during the transition period, in case some important edge case is missed. It would also complicate this PR. I’d like to pitch the idea of a transition period where both sets of components exist, to facilitate transition, and to provide a simple fallback if needed. Longer term, it makes a lot of sense for 2.6 to go the way of 2.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]
joewitt commented on PR #8463: URL: https://github.com/apache/nifi/pull/8463#issuecomment-2089326292 Can we just drop the old Kafka processors and go all in on the latest release? At this point the client libraries have stabilized greatly and us supporting the older stuff feels far less important and worthwhile now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]
greyp9 commented on code in PR #8463: URL: https://github.com/apache/nifi/pull/8463#discussion_r1556223354 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java: ## @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.processors; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.kafka.processors.common.KafkaUtils; +import org.apache.nifi.kafka.processors.consumer.OffsetTracker; +import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy; +import org.apache.nifi.kafka.processors.consumer.bundle.ByteRecordBundler; +import org.apache.nifi.kafka.processors.consumer.convert.FlowFileStreamKafkaMessageConverter; +import org.apache.nifi.kafka.processors.consumer.convert.KafkaMessageConverter; +import org.apache.nifi.kafka.processors.consumer.convert.RecordStreamKafkaMessageConverter; +import org.apache.nifi.kafka.processors.consumer.convert.WrapperRecordStreamKafkaMessageConverter; +import org.apache.nifi.kafka.service.api.KafkaConnectionService; +import org.apache.nifi.kafka.service.api.common.PartitionState; +import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset; +import org.apache.nifi.kafka.service.api.consumer.ConsumerConfiguration; +import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService; +import org.apache.nifi.kafka.service.api.consumer.PollingContext; +import org.apache.nifi.kafka.service.api.record.ByteRecord; +import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute; +import org.apache.nifi.kafka.shared.property.KeyEncoding; +import org.apache.nifi.kafka.shared.property.KeyFormat; +import org.apache.nifi.kafka.shared.property.OutputStrategy; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.VerifiableProcessor; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.util.StringUtils; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import static org.apache.nifi.expression.ExpressionLanguageScope.NONE; + +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"kafka", "consumer", "record"}) +public class ConsumeKafka extends AbstractProcessor implements VerifiableProcessor { + +static final AllowableValue TOPIC_NAME = new AllowableValue("names", "names", "Topic is a full topic name or comma separated list of names"); +static final AllowableValue TOPIC_PATTERN = new AllowableValue("pattern", "pattern", "Topic is a regex using the Java Pattern syntax"); + +static final PropertyDescriptor CONNECTION_SERVICE = new PropertyDescriptor.Builder() +.name("Kafka Connection Service") +.displayName("Kafka Connection Service") +.description("Provides connections to Kafka Broker for publishing Kafka Records") +.identifiesControllerService(KafkaConnectionService.class) +.expressionLanguageSupported(NONE) +.required(true) +.build(); + +
Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]
greyp9 commented on code in PR #8463: URL: https://github.com/apache/nifi/pull/8463#discussion_r1538322756 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java: ## @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.processors; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.kafka.processors.common.KafkaUtils; +import org.apache.nifi.kafka.processors.consumer.OffsetTracker; +import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy; +import org.apache.nifi.kafka.processors.consumer.bundle.ByteRecordBundler; +import org.apache.nifi.kafka.processors.consumer.convert.FlowFileStreamKafkaMessageConverter; +import org.apache.nifi.kafka.processors.consumer.convert.KafkaMessageConverter; +import org.apache.nifi.kafka.processors.consumer.convert.RecordStreamKafkaMessageConverter; +import org.apache.nifi.kafka.processors.consumer.convert.WrapperRecordStreamKafkaMessageConverter; +import org.apache.nifi.kafka.service.api.KafkaConnectionService; +import org.apache.nifi.kafka.service.api.common.PartitionState; +import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset; +import org.apache.nifi.kafka.service.api.consumer.ConsumerConfiguration; +import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService; +import org.apache.nifi.kafka.service.api.consumer.PollingContext; +import org.apache.nifi.kafka.service.api.record.ByteRecord; +import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute; +import org.apache.nifi.kafka.shared.property.KeyEncoding; +import org.apache.nifi.kafka.shared.property.KeyFormat; +import org.apache.nifi.kafka.shared.property.OutputStrategy; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.VerifiableProcessor; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.util.StringUtils; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import static org.apache.nifi.expression.ExpressionLanguageScope.NONE; + +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"kafka", "consumer", "record"}) +public class ConsumeKafka extends AbstractProcessor implements VerifiableProcessor { + +static final AllowableValue TOPIC_NAME = new AllowableValue("names", "names", "Topic is a full topic name or comma separated list of names"); +static final AllowableValue TOPIC_PATTERN = new AllowableValue("pattern", "pattern", "Topic is a regex using the Java Pattern syntax"); + +static final PropertyDescriptor CONNECTION_SERVICE = new PropertyDescriptor.Builder() +.name("Kafka Connection Service") +.displayName("Kafka Connection Service") +.description("Provides connections to Kafka Broker for publishing Kafka Records") +.identifiesControllerService(KafkaConnectionService.class) +.expressionLanguageSupported(NONE) +.required(true) +.build(); + +
Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]
greyp9 commented on code in PR #8463: URL: https://github.com/apache/nifi/pull/8463#discussion_r1538320629 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/txn/KafkaTransactionalProducerWrapper.java: ## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.service.producer.txn; + +import org.apache.kafka.clients.producer.Producer; + +public class KafkaTransactionalProducerWrapper extends KafkaProducerWrapper { + +public KafkaTransactionalProducerWrapper(final Producer producer) { +super(producer); +} + +@Override +public void init() { +producer.initTransactions(); +producer.beginTransaction(); +} + +@Override +public void commit() { +try { +producer.commitTransaction(); +logger.trace("committed"); Review Comment: Yep; thanks. In general, there will be instances of these that were useful in the moment, but no longer helpful. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]
greyp9 commented on code in PR #8463: URL: https://github.com/apache/nifi/pull/8463#discussion_r1538319116 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/txn/KafkaProducerWrapper.java: ## @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.service.producer.txn; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.nifi.kafka.service.api.producer.PublishContext; +import org.apache.nifi.kafka.service.api.record.KafkaRecord; +import org.apache.nifi.kafka.service.producer.ProducerCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * Abstract away the configured transactionality of the PublishKafka producer. + */ +public abstract class KafkaProducerWrapper { +protected final Logger logger = LoggerFactory.getLogger(getClass()); + +protected final Producer producer; + +public KafkaProducerWrapper(final Producer producer) { +this.producer = producer; +} + +/** + * Transaction-enabled publish to Kafka involves the use of special Kafka client library APIs. + */ +public abstract void init(); + +public void send(final Iterator kafkaRecords, final PublishContext publishContext, final ProducerCallback callback) { +while (kafkaRecords.hasNext()) { +final KafkaRecord kafkaRecord = kafkaRecords.next(); +producer.send(toProducerRecord(kafkaRecord, publishContext), callback); +callback.send(); +} +logger.trace("send():inFlight"); Review Comment: Maybe not. Are there reasonable project guidelines for how to log things that should be disabled by default, but useful in troubleshooting particular scenarios? Not necessarily suggesting this is one; more of a general question. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]
greyp9 commented on code in PR #8463: URL: https://github.com/apache/nifi/pull/8463#discussion_r1538315024 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/ProducerCallback.java: ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.service.producer; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.kafka.service.api.producer.FlowFileResult; +import org.apache.nifi.kafka.service.api.producer.ProducerRecordMetadata; +import org.apache.nifi.kafka.shared.util.Notifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +public class ProducerCallback implements Callback { +private final Logger logger = LoggerFactory.getLogger(getClass()); + +private final AtomicLong sentCount; +private final AtomicLong acknowledgedCount; +private final AtomicLong failedCount; +private final FlowFile flowFile; +private final List metadatas; +private final List exceptions; +private final Notifier notifier; + +public List getExceptions() { +return exceptions; +} + +public boolean isFailure() { +return !exceptions.isEmpty(); +} + +public ProducerCallback(final FlowFile flowFile) { +this.sentCount = new AtomicLong(0L); +this.acknowledgedCount = new AtomicLong(0L); +this.failedCount = new AtomicLong(0L); +this.flowFile = flowFile; +this.metadatas = new ArrayList<>(); +this.exceptions = new ArrayList<>(); +this.notifier = new Notifier(); +} + +public long send() { +return sentCount.incrementAndGet(); +} + +@Override +public void onCompletion(final RecordMetadata metadata, final Exception exception) { + +// the source `FlowFile` and the associated `RecordMetadata` need to somehow be associated... Review Comment: I was originally hoping that this sort of API would not be needed, but there were problems when I added transactionality. Maybe a reasonable method-level comment could be used instead to capture this nuance? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]
greyp9 commented on code in PR #8463: URL: https://github.com/apache/nifi/pull/8463#discussion_r1538306358 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/producer/Kafka3ProducerService.java: ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.service.producer; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.nifi.kafka.service.api.common.PartitionState; +import org.apache.nifi.kafka.service.api.common.ServiceConfiguration; +import org.apache.nifi.kafka.service.api.producer.FlowFileResult; +import org.apache.nifi.kafka.service.api.producer.KafkaProducerService; +import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration; +import org.apache.nifi.kafka.service.api.producer.PublishContext; +import org.apache.nifi.kafka.service.api.producer.RecordSummary; +import org.apache.nifi.kafka.service.api.record.KafkaRecord; +import org.apache.nifi.kafka.service.producer.txn.KafkaNonTransactionalProducerWrapper; +import org.apache.nifi.kafka.service.producer.txn.KafkaProducerWrapper; +import org.apache.nifi.kafka.service.producer.txn.KafkaTransactionalProducerWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.stream.Collectors; + +public class Kafka3ProducerService implements KafkaProducerService { +private final Logger logger = LoggerFactory.getLogger(getClass()); + +private final Producer producer; +private final List callbacks; + +private final ServiceConfiguration serviceConfiguration; + +private final KafkaProducerWrapper wrapper; + +public Kafka3ProducerService(final Properties properties, + final ServiceConfiguration serviceConfiguration, + final ProducerConfiguration producerConfiguration) { +final ByteArraySerializer serializer = new ByteArraySerializer(); +this.producer = new KafkaProducer<>(properties, serializer, serializer); +this.callbacks = new ArrayList<>(); + +this.serviceConfiguration = serviceConfiguration; + +this.wrapper = producerConfiguration.getUseTransactions() +? new KafkaTransactionalProducerWrapper(producer) +: new KafkaNonTransactionalProducerWrapper(producer); +} + +@Override +public void close() { +producer.close(); +} + +@Override +public void init() { +wrapper.init(); +logger.trace("init()"); Review Comment: This particular one helped me work out a problem with controller service lifecycle, but now uneeded; I will remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]
greyp9 commented on code in PR #8463: URL: https://github.com/apache/nifi/pull/8463#discussion_r1538303445 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/readme.txt: ## @@ -0,0 +1,2 @@ +Integration tests derived from examples provided in: +https://github.com/apache/nifi/blob/main/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6/additionalDetails.html Review Comment: I found myself noting things during implementation. Where appropriate, I'll make sure any documentation like this is in Markdown format. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]
greyp9 commented on code in PR #8463: URL: https://github.com/apache/nifi/pull/8463#discussion_r1538303639 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/README.md: ## @@ -0,0 +1,14 @@ + + +### Kafka3ProducerService +- As the `send()` API maps cleanly to the context of a single `FlowFile`, additional APIs are useful to handle the ability to publish multiple FlowFiles in the context of a single publish to Kafka. Review Comment: That makes sense; I will make that change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]
greyp9 commented on code in PR #8463: URL: https://github.com/apache/nifi/pull/8463#discussion_r1538301385 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java: ## @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.service; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.admin.TopicListing; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.security.plain.PlainLoginModule; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.VerifiableControllerService; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.kafka.service.api.KafkaConnectionService; +import org.apache.nifi.kafka.service.api.common.ServiceConfiguration; +import org.apache.nifi.kafka.service.api.consumer.ConsumerConfiguration; +import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService; +import org.apache.nifi.kafka.service.api.producer.KafkaProducerService; +import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration; +import org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService; +import org.apache.nifi.kafka.service.producer.Kafka3ProducerService; +import org.apache.nifi.kafka.shared.property.SaslMechanism; +import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider; +import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider; +import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.ssl.SSLContextService; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import static org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED; +import static org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL; +import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_LOCATION; +import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_PASSWORD; +import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_TYPE; +import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEY_PASSWORD; +import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_LOCATION; +import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_PASSWORD; +import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_TYPE; + +public class Kafka3ConnectionService extends AbstractControllerService implements KafkaConnectionService, VerifiableControllerService { + +public static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder() +.name("bootstrap.servers") +.displayName("Bootstrap Servers") +.description("Comma-separated list of Kafka Bootstrap Servers in the format host:port. Mapped to Kafka bootstrap.servers") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +
Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]
exceptionfactory commented on code in PR #8463: URL: https://github.com/apache/nifi/pull/8463#discussion_r1526461916 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/README.md: ## @@ -0,0 +1,14 @@ + + +### Kafka3ProducerService +- As the `send()` API maps cleanly to the context of a single `FlowFile`, additional APIs are useful to handle the ability to publish multiple FlowFiles in the context of a single publish to Kafka. Review Comment: Is there a reason for this separate readme as opposed to including these comments in the class itself? ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/publish/additional/readme.txt: ## @@ -0,0 +1,2 @@ +Integration tests derived from examples provided in: +https://github.com/apache/nifi/blob/main/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6/additionalDetails.html Review Comment: Is this file necessary? Recommend changing to Markdown for consistency if you prefer to keep it. ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/PublishKafkaWrapperRecordIT.java: ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.processors; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.commons.io.IOUtils; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.header.Header; +import org.apache.nifi.kafka.shared.property.PublishStrategy; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@TestMethodOrder(MethodOrderer.MethodName.class) +public class PublishKafkaWrapperRecordIT extends AbstractPublishKafkaIT { +private static final String TEST_RESOURCE = "org/apache/nifi/kafka/processors/publish/ffwrapper.json"; + +private static final String KEY_ATTRIBUTE_KEY = "keyAttribute"; +private static final String KEY_ATTRIBUTE_VALUE = "keyAttributeValue"; + +private static final int TEST_RECORD_COUNT = 3; + +@Test +public void test_1_KafkaTestContainerProduceOneFlowFile() throws InitializationException, IOException { +final TestRunner runner = TestRunners.newTestRunner(PublishKafka.class); +runner.setValidateExpressionUsage(false); +runner.setProperty(PublishKafka.CONNECTION_SERVICE, addKafkaConnectionService(runner)); +addRecordReaderService(runner); +addRecordWriterService(runner); +addRecordKeyWriterService(runner); + +runner.setProperty(PublishKafka.TOPIC_NAME, getClass().getName()); +runner.setProperty(PublishKafka.KEY, KEY_ATTRIBUTE_KEY); +runner.setProperty(PublishKafka.MESSAGE_KEY_FIELD, "address"); +runner.setProperty(PublishKafka.ATTRIBUTE_NAME_REGEX, "a.*"); +runner.setProperty(PublishKafka.PUBLISH_STRATEGY, PublishStrategy.USE_WRAPPER.name()); + +final Map attributes = new HashMap<>(); +attributes.put(KEY_ATTRIBUTE_KEY, KEY_ATTRIBUTE_VALUE); +attributes.put("a1", "valueA1"); +attributes.put("b1", "valueB1"); +final byte[] bytesFlowFile = IOUtils.toByteArray(Objects.requireNonNull( +getClass().getClassLoader().getResource(TEST_RESOURCE))); +
Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]
greyp9 commented on code in PR #8463: URL: https://github.com/apache/nifi/pull/8463#discussion_r1522080555 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java: ## @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.processors; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.kafka.processors.common.KafkaUtils; +import org.apache.nifi.kafka.processors.consumer.OffsetTracker; +import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy; +import org.apache.nifi.kafka.processors.consumer.bundle.ByteRecordBundler; +import org.apache.nifi.kafka.processors.consumer.convert.FlowFileStreamKafkaMessageConverter; +import org.apache.nifi.kafka.processors.consumer.convert.KafkaMessageConverter; +import org.apache.nifi.kafka.processors.consumer.convert.RecordStreamKafkaMessageConverter; +import org.apache.nifi.kafka.processors.consumer.convert.WrapperRecordStreamKafkaMessageConverter; +import org.apache.nifi.kafka.service.api.KafkaConnectionService; +import org.apache.nifi.kafka.service.api.common.PartitionState; +import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset; +import org.apache.nifi.kafka.service.api.consumer.ConsumerConfiguration; +import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService; +import org.apache.nifi.kafka.service.api.consumer.PollingContext; +import org.apache.nifi.kafka.service.api.record.ByteRecord; +import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute; +import org.apache.nifi.kafka.shared.property.KeyEncoding; +import org.apache.nifi.kafka.shared.property.KeyFormat; +import org.apache.nifi.kafka.shared.property.OutputStrategy; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.VerifiableProcessor; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.util.StringUtils; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import static org.apache.nifi.expression.ExpressionLanguageScope.NONE; + +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"kafka", "consumer", "record"}) +public class ConsumeKafka extends AbstractProcessor implements VerifiableProcessor { + +static final AllowableValue TOPIC_NAME = new AllowableValue("names", "names", "Topic is a full topic name or comma separated list of names"); +static final AllowableValue TOPIC_PATTERN = new AllowableValue("pattern", "pattern", "Topic is a regex using the Java Pattern syntax"); + +static final PropertyDescriptor CONNECTION_SERVICE = new PropertyDescriptor.Builder() +.name("Kafka Connection Service") +.displayName("Kafka Connection Service") +.description("Provides connections to Kafka Broker for publishing Kafka Records") +.identifiesControllerService(KafkaConnectionService.class) +.expressionLanguageSupported(NONE) +.required(true) +.build(); + +
Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]
greyp9 commented on code in PR #8463: URL: https://github.com/apache/nifi/pull/8463#discussion_r1522076394 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-3-service/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java: ## @@ -0,0 +1,402 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.service; + +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.ListTopicsResult; +import org.apache.kafka.clients.admin.TopicListing; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.security.plain.PlainLoginModule; +import org.apache.nifi.annotation.lifecycle.OnDisabled; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.context.PropertyContext; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.controller.VerifiableControllerService; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.kafka.service.api.KafkaConnectionService; +import org.apache.nifi.kafka.service.api.common.ServiceConfiguration; +import org.apache.nifi.kafka.service.api.consumer.ConsumerConfiguration; +import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService; +import org.apache.nifi.kafka.service.api.producer.KafkaProducerService; +import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration; +import org.apache.nifi.kafka.service.consumer.Kafka3ConsumerService; +import org.apache.nifi.kafka.service.producer.Kafka3ProducerService; +import org.apache.nifi.kafka.shared.property.SaslMechanism; +import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider; +import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider; +import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.ssl.SSLContextService; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import static org.apache.nifi.components.ConfigVerificationResult.Outcome.FAILED; +import static org.apache.nifi.components.ConfigVerificationResult.Outcome.SUCCESSFUL; +import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_LOCATION; +import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_PASSWORD; +import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEYSTORE_TYPE; +import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_KEY_PASSWORD; +import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_LOCATION; +import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_PASSWORD; +import static org.apache.nifi.kafka.shared.property.KafkaClientProperty.SSL_TRUSTSTORE_TYPE; + +public class Kafka3ConnectionService extends AbstractControllerService implements KafkaConnectionService, VerifiableControllerService { + +public static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder() +.name("bootstrap.servers") +.displayName("Bootstrap Servers") +.description("Comma-separated list of Kafka Bootstrap Servers in the format host:port. Mapped to Kafka bootstrap.servers") +.required(true) +.addValidator(StandardValidators.NON_BLANK_VALIDATOR) +
Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]
greyp9 commented on code in PR #8463: URL: https://github.com/apache/nifi/pull/8463#discussion_r1522075657 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java: ## @@ -0,0 +1,527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.processors; + +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.kafka.processors.producer.common.PublishKafkaUtil; +import org.apache.nifi.kafka.processors.producer.config.DeliveryGuarantee; +import org.apache.nifi.kafka.processors.producer.convert.DelimitedStreamKafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.convert.FlowFileStreamKafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.convert.KafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.convert.RecordStreamKafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.convert.RecordWrapperStreamKafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.header.AttributesHeadersFactory; +import org.apache.nifi.kafka.processors.producer.header.HeadersFactory; +import org.apache.nifi.kafka.processors.producer.key.AttributeKeyFactory; +import org.apache.nifi.kafka.processors.producer.key.KeyFactory; +import org.apache.nifi.kafka.processors.producer.key.MessageKeyFactory; +import org.apache.nifi.kafka.processors.producer.wrapper.RecordMetadataStrategy; +import org.apache.nifi.kafka.service.api.KafkaConnectionService; +import org.apache.nifi.kafka.service.api.common.PartitionState; +import org.apache.nifi.kafka.service.api.producer.FlowFileResult; +import org.apache.nifi.kafka.service.api.producer.KafkaProducerService; +import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration; +import org.apache.nifi.kafka.service.api.producer.PublishContext; +import org.apache.nifi.kafka.service.api.producer.RecordSummary; +import org.apache.nifi.kafka.service.api.record.KafkaRecord; +import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute; +import org.apache.nifi.kafka.shared.component.KafkaPublishComponent; +import org.apache.nifi.kafka.shared.property.FailureStrategy; +import org.apache.nifi.kafka.shared.property.KeyEncoding; +import org.apache.nifi.kafka.shared.property.PublishStrategy; +import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.VerifiableProcessor; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriterFactory; + +import java.io.BufferedInputStream; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Supplier; +import java.util.regex.Pattern; + +@Tags({"kafka", "producer", "record"}) +public class PublishKafka extends AbstractProcessor implements KafkaPublishComponent, VerifiableProcessor { Review Comment: I'll verify the counter and attribute for message count. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the
Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]
greyp9 commented on code in PR #8463: URL: https://github.com/apache/nifi/pull/8463#discussion_r1522074061 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java: ## @@ -0,0 +1,464 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.processors; + +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.Validator; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.kafka.processors.common.KafkaUtils; +import org.apache.nifi.kafka.processors.consumer.OffsetTracker; +import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy; +import org.apache.nifi.kafka.processors.consumer.bundle.ByteRecordBundler; +import org.apache.nifi.kafka.processors.consumer.convert.FlowFileStreamKafkaMessageConverter; +import org.apache.nifi.kafka.processors.consumer.convert.KafkaMessageConverter; +import org.apache.nifi.kafka.processors.consumer.convert.RecordStreamKafkaMessageConverter; +import org.apache.nifi.kafka.processors.consumer.convert.WrapperRecordStreamKafkaMessageConverter; +import org.apache.nifi.kafka.service.api.KafkaConnectionService; +import org.apache.nifi.kafka.service.api.common.PartitionState; +import org.apache.nifi.kafka.service.api.consumer.AutoOffsetReset; +import org.apache.nifi.kafka.service.api.consumer.ConsumerConfiguration; +import org.apache.nifi.kafka.service.api.consumer.KafkaConsumerService; +import org.apache.nifi.kafka.service.api.consumer.PollingContext; +import org.apache.nifi.kafka.service.api.record.ByteRecord; +import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute; +import org.apache.nifi.kafka.shared.property.KeyEncoding; +import org.apache.nifi.kafka.shared.property.KeyFormat; +import org.apache.nifi.kafka.shared.property.OutputStrategy; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.VerifiableProcessor; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.util.StringUtils; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import static org.apache.nifi.expression.ExpressionLanguageScope.NONE; + +@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) +@Tags({"kafka", "consumer", "record"}) +public class ConsumeKafka extends AbstractProcessor implements VerifiableProcessor { Review Comment: Thanks. Need to check annotations to make sure those are all propagated where appropriate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@nifi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] NIFI-11259 - Kafka processor refactor [nifi]
taz1988 commented on code in PR #8463: URL: https://github.com/apache/nifi/pull/8463#discussion_r1517541500 ## nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/PublishKafka.java: ## @@ -0,0 +1,527 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.kafka.processors; + +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.ConfigVerificationResult; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.kafka.processors.producer.common.PublishKafkaUtil; +import org.apache.nifi.kafka.processors.producer.config.DeliveryGuarantee; +import org.apache.nifi.kafka.processors.producer.convert.DelimitedStreamKafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.convert.FlowFileStreamKafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.convert.KafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.convert.RecordStreamKafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.convert.RecordWrapperStreamKafkaRecordConverter; +import org.apache.nifi.kafka.processors.producer.header.AttributesHeadersFactory; +import org.apache.nifi.kafka.processors.producer.header.HeadersFactory; +import org.apache.nifi.kafka.processors.producer.key.AttributeKeyFactory; +import org.apache.nifi.kafka.processors.producer.key.KeyFactory; +import org.apache.nifi.kafka.processors.producer.key.MessageKeyFactory; +import org.apache.nifi.kafka.processors.producer.wrapper.RecordMetadataStrategy; +import org.apache.nifi.kafka.service.api.KafkaConnectionService; +import org.apache.nifi.kafka.service.api.common.PartitionState; +import org.apache.nifi.kafka.service.api.producer.FlowFileResult; +import org.apache.nifi.kafka.service.api.producer.KafkaProducerService; +import org.apache.nifi.kafka.service.api.producer.ProducerConfiguration; +import org.apache.nifi.kafka.service.api.producer.PublishContext; +import org.apache.nifi.kafka.service.api.producer.RecordSummary; +import org.apache.nifi.kafka.service.api.record.KafkaRecord; +import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute; +import org.apache.nifi.kafka.shared.component.KafkaPublishComponent; +import org.apache.nifi.kafka.shared.property.FailureStrategy; +import org.apache.nifi.kafka.shared.property.KeyEncoding; +import org.apache.nifi.kafka.shared.property.PublishStrategy; +import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.VerifiableProcessor; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriterFactory; + +import java.io.BufferedInputStream; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Supplier; +import java.util.regex.Pattern; + +@Tags({"kafka", "producer", "record"}) +public class PublishKafka extends AbstractProcessor implements KafkaPublishComponent, VerifiableProcessor { Review Comment: InputRequirement, CapabilityDescription, WritesAttribute annotations are missing ##