That said I think we can improve our handling of the consumer (kafka
client) and session (nifi transactional logic) and solve the problem.
It is related to our backpressure/consumer handling so we can fix
that.

Thanks
Joe

On Thu, Feb 9, 2017 at 1:38 PM, Bryan Bende <[email protected]> wrote:
> No data loss, but you may process the same message twice in NiFi.
>
> The ordering of operations is:
>
> 1) poll Kafka
> 2) write received data to flow file
> 3) commit NiFi session so data in flow file cannot be lost
> 4) commit offsets to Kafka
>
> Doing it this way achieves at-least once processing which means you
> can't ever lose data, but you can process data twice.
>
> If we committed the offsets before committing the flow file you would
> never get duplicates, but you could lose a message if a crash happened
> between commit the offset and committing the NiFi session (at-most
> once processing).
>
> So the error is happening on #4 and NiFi has already produced a flow
> file with the message, but then Kafka says it can't update the offset,
> and then another consumer will likely pull that same message again and
> produce another flow file with the same message.
>
>
> On Thu, Feb 9, 2017 at 1:19 PM, Nick Carenza
> <[email protected]> wrote:
>> That makes perfect sense. To be clear, is there any potential to lose
>> messages in this scenario?
>>
>> On Thu, Feb 9, 2017 at 7:16 AM, Joe Witt <[email protected]> wrote:
>>>
>>> yeah this is probably a good case/cause for use of the pause concept
>>> in kafka consumers.
>>>
>>> On Thu, Feb 9, 2017 at 9:49 AM, Bryan Bende <[email protected]> wrote:
>>> > I believe you are running into this issue:
>>> >
>>> > https://issues.apache.org/jira/browse/NIFI-3189
>>> >
>>> > When back-pressure happens on the queue coming out of ConsumeKafka,
>>> > this can last for longer than session.timeout.ms, and when the
>>> > processors resumes executing it receives this error on the first
>>> > execution. We should be able to implement some type of keep-alive so
>>> > that even when the processor is not executing, there is a background
>>> > thread, or some way of keeping the connections alive.
>>> >
>>> > I believe any user-defined properties in the processor get passed to
>>> > the Kafka consumer, so I believe you could add "session.timeout.ms"
>>> > and set a much higher value as a possible work around.
>>> >
>>> > Thanks,
>>> >
>>> > Bryan
>>> >
>>> > On Thu, Feb 9, 2017 at 8:42 AM, Koji Kawamura <[email protected]>
>>> > wrote:
>>> >> Hello Nick,
>>> >>
>>> >> First, I assume "had a queue back up" means have a queue being
>>> >> back-pressure. Sorry if that was different meaning.
>>> >>
>>> >> I was trying to reproduce by following flow:
>>> >> ConsumeKafka_0_10
>>> >>   -- success: Back Pressure Object Threshold = 10
>>> >>     -- UpdateAttribute (Stopped)
>>> >>
>>> >> Then I used ./bin/kafka-console-producer.sh to send 11 messages.
>>> >> The result was, when NiFi received 10th messages, the success
>>> >> relationship back-pressure was enabled.
>>> >> When I published the 11th message, NiFi didn't do anything.
>>> >> This is expected behavior because downstream connection is
>>> >> back-pressured, the processor won't be scheduled.
>>> >>
>>> >> After I started UpdateAttribute and the queued flow files went
>>> >> through, ConsumeKafka was executed again and received the 11th
>>> >> message.
>>> >>
>>> >> Also, I checked the ConsumerLease and ConsumeKafka_0_10 source code,
>>> >> those warning and error message is logged because NiFi received
>>> >> KafkaException when it tried to commit offset to Kafka.
>>> >>
>>> >> Were there anything in Kafka server logs? I suspect something had
>>> >> happened at Kafka server side.
>>> >>
>>> >> Thanks,
>>> >> Koji
>>> >>
>>> >> On Thu, Feb 9, 2017 at 11:54 AM, Nick Carenza
>>> >> <[email protected]> wrote:
>>> >>> Hey team, I have a ConsumeKafka_0_10 running which normally operates
>>> >>> without
>>> >>> problems. I had a queue back up due to a downstream processor and I
>>> >>> started
>>> >>> getting these bulletins.
>>> >>>
>>> >>> 01:16:01 UTC WARNING a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
>>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Duplicates
>>> >>> are
>>> >>> likely as we were able to commit the process session but received an
>>> >>> exception from Kafka while committing offsets.
>>> >>>
>>> >>> 01:16:01 UTC ERROR a46d13dd-3231-1bff-1a99-1eaf5f37e1d2
>>> >>> ConsumeKafka_0_10[id=a46d13dd-3231-1bff-1a99-1eaf5f37e1d2] Exception
>>> >>> while
>>> >>> interacting with Kafka so will close the lease
>>> >>>
>>> >>> org.apache.nifi.processors.kafka.pubsub.ConsumerPool$SimpleConsumerLease@87d2ac1
>>> >>> due to org.apache.kafka.clients.consumer.CommitFailedException: Commit
>>> >>> cannot be completed since the group has already rebalanced and
>>> >>> assigned the
>>> >>> partitions to another member. This means that the time between
>>> >>> subsequent
>>> >>> calls to poll() was longer than the configured session.timeout.ms,
>>> >>> which
>>> >>> typically implies that the poll loop is spending too much time message
>>> >>> processing. You can address this either by increasing the session
>>> >>> timeout or
>>> >>> by reducing the maximum size of batches returned in poll() with
>>> >>> max.poll.records.
>>> >>>
>>> >>> My max.poll.records is set to 10000 on my consumer and
>>> >>> session.timeout.ms is
>>> >>> the default 10000 on the server.
>>> >>>
>>> >>> Since there is no such thing as coincidences, I believe this has to do
>>> >>> with
>>> >>> it not being able to push received messages to the downstream queue.
>>> >>>
>>> >>> If my flow is backed up, I expect the ConsumKafka processor not to
>>> >>> throw
>>> >>> errors but continue to heartbeat with the Kafka server and resume
>>> >>> consuming
>>> >>> once it can commit to the downstream queue?
>>> >>>
>>> >>> Might I have the server or consumer misconfigured to handle this
>>> >>> scenario or
>>> >>> should the consumer not be throwing this error?
>>> >>>
>>> >>> Thanks,
>>> >>> - Nick
>>
>>

Reply via email to