Re: Question on ConsumeKafka_2_6

2022-05-29 Thread Knowles Atchison Jr
Mark, That configuration looks fine, but I cannot recreate this problem. I spun up a kafka broker operating on PLAINTEXT and created two ConsumeKafka_2_6 (NiFi 1.16.2) processors, identical configuration except for group id. One is group1, other is group2. Both are listening for test2.* via patter

Re: Question on ConsumeKafka_2_6

2022-05-27 Thread Mark Bean
I started this thread hoping it was just a configuration issue. But I may have to open a JIRA on this. If someone sees an issue with the following configuration and can explain why messages are not being consumed, please let me know. This processor has received no messages from Kafka in over an hou

Re: Question on ConsumeKafka_2_6

2022-05-27 Thread Mark Bean
I added 'metadata.max.age.ms = 1000' to the ConsumeKafka_2_6 processor. It started receiving faster than the ConsumeKafka_2_6 (which has not received any messages after about 15 minutes.) Here's my observations (all with different ConsumeKafka_2_6 processors configured with unique GroupID) - Topic

Re: Question on ConsumeKafka_2_6

2022-05-27 Thread Knowles Atchison Jr
1. Correct, this setting is per consumer. You can add dynamic configuration items to the ConsumeKafka processor and they will get passed down to the underlying Kafka Consumer. 2. Yes, when a new topic is created that matches the pattern the ConsumerKafka has been configured with, it will get picke

Re: Question on ConsumeKafka_2_6

2022-05-27 Thread Mark Payne
Mark, Any user defined property on the processor will be passed to the Kafka client as a client property. Thanks Mark > On May 27, 2022, at 9:27 AM, Mark Bean wrote: > > I do not see this property (metadata.max.age.ms) anywhere in the > ConsumeKafka processor. If I want to adjust this value

Re: Question on ConsumeKafka_2_6

2022-05-27 Thread Mark Bean
I do not see this property (metadata.max.age.ms) anywhere in the ConsumeKafka processor. If I want to adjust this value to something less than 5 minutes, that will mean modifying the processor. Or is there a mechanism already built-in to the processor that I can use? And, just to be clear, this pro

Re: Question on ConsumeKafka_2_6

2022-05-26 Thread Knowles Atchison Jr
Hi Mark, When a consumer first connects to a kafka cluster, there is a metadata handshake that transmits things like consumer group assignments and offsets. In application code, I find the first poll establishes this information and does not actually retrieve messages, depending on timeout used. C

Question on ConsumeKafka_2_6

2022-05-25 Thread Mark Bean
I have two ConsumeKafka_2_6 processors (NiFi 1.16.1). The first uses Topic Name Format of "names". The second uses Topic Name Format of "pattern". The names format is able to sync with Kafka relatively quickly and begins receiving messages within just a couple seconds. However, the pattern format t