That said I think we can improve our handling of the consumer (kafka client) and session (nifi transactional logic) and solve the problem. It is related to our backpressure/consumer handling so we can fix that.
Thanks Joe On Thu, Feb 9, 2017 at 1:38 PM, Bryan Bende <[email protected]> wrote: > No data loss, but you may process the same message twice in NiFi. > > The ordering of operations is: > > 1) poll Kafka > 2) write received data to flow file > 3) commit NiFi session so data in flow file cannot be lost > 4) commit offsets to Kafka > > Doing it this way achieves at-least once processing which means you > can't ever lose data, but you can process data twice. > > If we committed the offsets before committing the flow file you would > never get duplicates, but you could lose a message if a crash happened > between commit the offset and committing the NiFi session (at-most > once processing). > > So the error is happening on #4 and NiFi has already produced a flow > file with the message, but then Kafka says it can't update the offset, > and then another consumer will likely pull that same message again and > produce another flow file with the same message. > > > On Thu, Feb 9, 2017 at 1:19 PM, Nick Carenza > <[email protected]> wrote: >> That makes perfect sense. To be clear, is there any potential to lose >> messages in this scenario? >> >> On Thu, Feb 9, 2017 at 7:16 AM, Joe Witt <[email protected]> wrote: >>> >>> yeah this is probably a good case/cause for use of the pause concept >>> in kafka consumers. >>> >>> On Thu, Feb 9, 2017 at 9:49 AM, Bryan Bende <[email protected]> wrote: >>> > I believe you are running into this issue: >>> > >>> > https://issues.apache.org/jira/browse/NIFI-3189 >>> > >>> > When back-pressure happens on the queue coming out of ConsumeKafka, >>> > this can last for longer than session.timeout.ms, and when the >>> > processors resumes executing it receives this error on the first >>> > execution. We should be able to implement some type of keep-alive so >>> > that even when the processor is not executing, there is a background >>> > thread, or some way of keeping the connections alive. >>> > >>> > I believe any user-defined properties in the processor get passed to >>> > the Kafka consumer, so I believe you could add "session.timeout.ms" >>> > and set a much higher value as a possible work around. >>> > >>> > Thanks, >>> > >>> > Bryan >>> > >>> > On Thu, Feb 9, 2017 at 8:42 AM, Koji Kawamura <[email protected]> >>> > wrote: >>> >> Hello Nick, >>> >> >>> >> First, I assume "had a queue back up" means have a queue being >>> >> back-pressure. Sorry if that was different meaning. >>> >> >>> >> I was trying to reproduce by following flow: >>> >> ConsumeKafka_0_10 >>> >> -- success: Back Pressure Object Threshold = 10 >>> >> -- UpdateAttribute (Stopped) >>> >> >>> >> Then I used ./bin/kafka-console-producer.sh to send 11 messages. >>> >> The result was, when NiFi received 10th messages, the success >>> >> relationship back-pressure was enabled. >>> >> When I published the 11th message, NiFi didn't do anything. >>> >> This is expected behavior because downstream connection is >>> >> back-pressured, the processor won't be scheduled. >>> >> >>> >> After I started UpdateAttribute and the queued flow files went >>> >> through, ConsumeKafka was executed again and received the 11th >>> >> message. >>> >> >>> >> Also, I checked the ConsumerLease and ConsumeKafka_0_10 source code, >>> >> those warning and error message is logged because NiFi received >>> >> KafkaException when it tried to commit offset to Kafka. >>> >> >>> >> Were there anything in Kafka server logs? I suspect something had >>> >> happened at Kafka server side. >>> >> >>> >> Thanks, >>> >> Koji >>> >> >>> >> On Thu, Feb 9, 2017 at 11:54 AM, Nick Carenza >>> >> <[email protected]> wrote: >>> >>> Hey team, I have a ConsumeKafka_0_10 running which normally operates >>> >>> without >>> >>> problems. I had a queue back up due to a downstream processor and I >>> >>> started >>> >>> getting these bulletins. >>> >>> >>> >>> 01:16:01 UTC WARNING a46d13dd-3231-1bff-1a99-1eaf5f37e1d2 >>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Duplicates >>> >>> are >>> >>> likely as we were able to commit the process session but received an >>> >>> exception from Kafka while committing offsets. >>> >>> >>> >>> 01:16:01 UTC ERROR a46d13dd-3231-1bff-1a99-1eaf5f37e1d2 >>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Exception >>> >>> while >>> >>> interacting with Kafka so will close the lease >>> >>> >>> >>> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@87d2ac1 >>> >>> due to org.apache.kafka.clients.consumer.CommitFailedException: Commit >>> >>> cannot be completed since the group has already rebalanced and >>> >>> assigned the >>> >>> partitions to another member. This means that the time between >>> >>> subsequent >>> >>> calls to poll() was longer than the configured session.timeout.ms, >>> >>> which >>> >>> typically implies that the poll loop is spending too much time message >>> >>> processing. You can address this either by increasing the session >>> >>> timeout or >>> >>> by reducing the maximum size of batches returned in poll() with >>> >>> max.poll.records. >>> >>> >>> >>> My max.poll.records is set to 10000 on my consumer and >>> >>> session.timeout.ms is >>> >>> the default 10000 on the server. >>> >>> >>> >>> Since there is no such thing as coincidences, I believe this has to do >>> >>> with >>> >>> it not being able to push received messages to the downstream queue. >>> >>> >>> >>> If my flow is backed up, I expect the ConsumKafka processor not to >>> >>> throw >>> >>> errors but continue to heartbeat with the Kafka server and resume >>> >>> consuming >>> >>> once it can commit to the downstream queue? >>> >>> >>> >>> Might I have the server or consumer misconfigured to handle this >>> >>> scenario or >>> >>> should the consumer not be throwing this error? >>> >>> >>> >>> Thanks, >>> >>> - Nick >> >>
