That makes perfect sense. To be clear, is there any potential to lose messages in this scenario?
On Thu, Feb 9, 2017 at 7:16 AM, Joe Witt <[email protected]> wrote: > yeah this is probably a good case/cause for use of the pause concept > in kafka consumers. > > On Thu, Feb 9, 2017 at 9:49 AM, Bryan Bende <[email protected]> wrote: > > I believe you are running into this issue: > > > > https://issues.apache.org/jira/browse/NIFI-3189 > > > > When back-pressure happens on the queue coming out of ConsumeKafka, > > this can last for longer than session.timeout.ms, and when the > > processors resumes executing it receives this error on the first > > execution. We should be able to implement some type of keep-alive so > > that even when the processor is not executing, there is a background > > thread, or some way of keeping the connections alive. > > > > I believe any user-defined properties in the processor get passed to > > the Kafka consumer, so I believe you could add "session.timeout.ms" > > and set a much higher value as a possible work around. > > > > Thanks, > > > > Bryan > > > > On Thu, Feb 9, 2017 at 8:42 AM, Koji Kawamura <[email protected]> > wrote: > >> Hello Nick, > >> > >> First, I assume "had a queue back up" means have a queue being > >> back-pressure. Sorry if that was different meaning. > >> > >> I was trying to reproduce by following flow: > >> ConsumeKafka_0_10 > >> -- success: Back Pressure Object Threshold = 10 > >> -- UpdateAttribute (Stopped) > >> > >> Then I used ./bin/kafka-console-producer.sh to send 11 messages. > >> The result was, when NiFi received 10th messages, the success > >> relationship back-pressure was enabled. > >> When I published the 11th message, NiFi didn't do anything. > >> This is expected behavior because downstream connection is > >> back-pressured, the processor won't be scheduled. > >> > >> After I started UpdateAttribute and the queued flow files went > >> through, ConsumeKafka was executed again and received the 11th > >> message. > >> > >> Also, I checked the ConsumerLease and ConsumeKafka_0_10 source code, > >> those warning and error message is logged because NiFi received > >> KafkaException when it tried to commit offset to Kafka. > >> > >> Were there anything in Kafka server logs? I suspect something had > >> happened at Kafka server side. > >> > >> Thanks, > >> Koji > >> > >> On Thu, Feb 9, 2017 at 11:54 AM, Nick Carenza > >> <[email protected]> wrote: > >>> Hey team, I have a ConsumeKafka_0_10 running which normally operates > without > >>> problems. I had a queue back up due to a downstream processor and I > started > >>> getting these bulletins. > >>> > >>> 01:16:01 UTC WARNING a46d13dd-3231-1bff-1a99-1eaf5f37e1d2 > >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Duplicates > are > >>> likely as we were able to commit the process session but received an > >>> exception from Kafka while committing offsets. > >>> > >>> 01:16:01 UTC ERROR a46d13dd-3231-1bff-1a99-1eaf5f37e1d2 > >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Exception > while > >>> interacting with Kafka so will close the lease > >>> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$ > SimpleConsumerLease@87d2ac1 > >>> due to org.apache.kafka.clients.consumer.CommitFailedException: Commit > >>> cannot be completed since the group has already rebalanced and > assigned the > >>> partitions to another member. This means that the time between > subsequent > >>> calls to poll() was longer than the configured session.timeout.ms, > which > >>> typically implies that the poll loop is spending too much time message > >>> processing. You can address this either by increasing the session > timeout or > >>> by reducing the maximum size of batches returned in poll() with > >>> max.poll.records. > >>> > >>> My max.poll.records is set to 10000 on my consumer and > session.timeout.ms is > >>> the default 10000 on the server. > >>> > >>> Since there is no such thing as coincidences, I believe this has to do > with > >>> it not being able to push received messages to the downstream queue. > >>> > >>> If my flow is backed up, I expect the ConsumKafka processor not to > throw > >>> errors but continue to heartbeat with the Kafka server and resume > consuming > >>> once it can commit to the downstream queue? > >>> > >>> Might I have the server or consumer misconfigured to handle this > scenario or > >>> should the consumer not be throwing this error? > >>> > >>> Thanks, > >>> - Nick >
