Thanks for the feedback and. glad I am on the right track.

> Outstanding transactions should be automatically aborted on restart by
Flink.

Let me understand this

1. Flink pipe is cancelled and has dangling kafka transactions.
2. A new Flink pipe  ( not restored from a checkpoint or sp ) is started
which is essentially the same pipe as 1 but does not restore. Would
the dangling kafka transactions be aborted ?

If yes, how does it work? As in how does the new pipe. know which
transactions to abort ? Does it ask kafka for pending transactions and know
which one belongs to the first pipe ( maybe b'coz they share some id b'coz
of name of the pipe or something else ) ?

Thanks again,

Vishal



On Fri, Apr 16, 2021 at 1:37 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Vishal,
>
> I think you pretty much nailed it.
>
> Outstanding transactions should be automatically aborted on restart by
> Flink. Flink (re)uses a pool of transaction ids, such that all possible
> transactions by Flink are canceled on restart.
>
> I guess the biggest downside of using a large transaction timeout is that
> other clients might leak transactions for a longer period of time or that
> they may linger for a longer time if you stop an application entirely (for
> example for an upgrade).
>
> On Fri, Apr 16, 2021 at 4:08 PM Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
>
>> Hello folks
>>
>> So AFAIK data loss on exactly once will happen if
>>
>>    -
>>
>>    start a transaction on kafka.
>>    -
>>
>>    pre commit done ( kafka is prepared for the commit )
>>    -
>>
>>    commit fails ( kafka went own or n/w issue or what ever ). kafka has
>>    an uncommitted transaction
>>    -
>>
>>    pipe was down for say n minutes and the kafka based transaction time
>>    out is m minutes, where m < n
>>    -
>>
>>    the pipe restarts and tries to commit an aborted transaction and
>>    fails and thus data loss
>>
>> Thus it is imperative that the ransaction.max.timeout.ms out on kafka is
>> a high value ( like n hours ) which should be greater then an SLA for
>> downtime of the pipe. As in we have to ensure that the pipe is restarted
>> before the transaction.timeout.ms set on the broker.
>>
>> The impulse is to make ransaction.max.timeout.ms high ( 24 hours ). The
>> only implication is what happens if we start a brand new pipeline on the
>> same topics which has yet to be resolved transactions, mostly b’coz of
>> extended timeout of a previous pipe .. I would assume we are delayed then
>> given that kafka will stall subsequent transactions from being visible to
>> the consumer, b'coz of this one outstanding trsnasaction ?
>>
>> And if that is the case, then understandably we have to abort those
>> dangling transactions before the 24 hrs time out. While there probably a
>> way to do that, does flink help.. as in set a property that will abort a
>> transaction on kafka, b'coz we need it to, given the above..
>>
>> Again I might have totally misunderstood the whole mechanics and if yes
>> apologies and will appreciate some clarifications.
>>
>>
>> Thanks.
>>
>>
>>
>>
>>

Reply via email to