Hi,

Thank you for your explanation.

I now understand  the need for checkpoint lock :)



Best,

2021年1月19日(火) 18:00 Piotr Nowojski <[email protected]>:

> Hi,
>
> yes exactly :)
>
> > As a result, Source may save wrong offset and lost record if job
> recreation occurs at that timing.
>
> This is just one of the possible race conditions that could happen. As
> offsets are probably 64 bit integers, I'm pretty sure corrupted
> writes/reads can also happen, when only half of the bits (for example lower
> 32bits) were updated before checkpointing.
>
> Best,
> Piotrek
>
> wt., 19 sty 2021 o 02:22 Kazunori Shinhira <[email protected]>
> napisał(a):
>
>> Hi Piotrek,
>>
>>
>>
>> Thank you for your reply.
>>
>>
>> I understood that synchronization with checkpoint lock is needed to make
>> state modification and checkpointing exclusive.
>>
>> In my understanding, for example, in implementation of SourceFunction for
>> Kafka, it is necessary to enclose the process of acquiring records and
>> updating current offset, that is the state of kafka SourceFunction, within
>> synchronized code.
>>
>> If we don't use synchronized block, `StreamOperator#snapshotState` will
>> be called concurrently with state modification.
>>
>> As a result, Source may save wrong offset and lost record if job
>> recreation occurs at that timing.
>>
>> Is my understanding correct ?
>>
>>
>> Thank you for the information on the new Source interface.
>>
>> I’ll look into how to implement it.
>>
>>
>>
>> Best,
>>
>> 2021年1月18日(月) 23:45 Piotr Nowojski <[email protected]>:
>>
>>> Hi Kazunori,
>>>
>>> The checkpoint lock is acquired preemptively inside the
>>> SourceContext#collect call for the cases if the source is state less.
>>> However this is not enough if you are implementing a stateful
>>> `SourceFunction`, since if you are modifying state in your source function
>>> outside of the checkpoint lock scope, those updates would be happening
>>> concurrently to the
>>> `org.apache.flink.streaming.api.operators.StreamOperator#snapshotState`
>>> call and bad things would happen. `StreamOperator#snapshotState` is one
>>> example of actions that are being executed in the
>>> `StreamTaskActionExecutor`. `StreamTaskActionExecutor` depending on the
>>> execution context, if it's happening in a source task or not, will be or
>>> will not be acquiring the checkpoint lock.
>>>
>>> Also please note that `SourceFunction` is currently (as of Flink 1.12.0)
>>> being phased out, in favor of the new
>>> `org.apache.flink.api.connector.source.Source` interface. Amongst other
>>> improvements, this newer interface has a single thread execution model,
>>> without a dedicated thread to run the source code, so there is no need for
>>> the checkpoint lock.
>>>
>>> Best,
>>> Piotrek
>>>
>>> pon., 18 sty 2021 o 13:05 Kazunori Shinhira <[email protected]>
>>> napisał(a):
>>>
>>>> Hi,
>>>>
>>>>
>>>> I have a question about implementing method of SourceFunction.
>>>>
>>>> I'm trying to implement my own SourceFunction using Flink 1.11.3.
>>>>
>>>> The following javadoc says that SourceFunction which implements
>>>> CheckpointedFunction should use synchronized block to perform checkpointing
>>>> and emission of elements atomically.
>>>>
>>>>
>>>> See
>>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.
>>>> html
>>>>
>>>>
>>>>
>>>> On the other hand, from the implementation of StreamSourceContexts and
>>>> StreamTaskActionExecutor, it looks like that the SourceContext.collect and
>>>> checkpointing processes are exclusive.
>>>>
>>>> I’m sorry if I misunderstood.
>>>>
>>>>
>>>>
>>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java#L106
>>>>
>>>>
>>>> https://github.com/apache/flink/blob/release-1.11.3/flink-streaming-https://github.com/apache/flink/blob/release-1.11.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskActionExecutor.java#L91
>>>>
>>>>
>>>>
>>>> My Question is that is the synchronized block in SourceFunction
>>>> necessary?
>>>>
>>>> And why is it necessary?
>>>>
>>>>
>>>>
>>>>
>>>> Best regards,
>>>>
>>>> --
>>>> Kazunori Shinhira
>>>> Mail : [email protected]
>>>>
>>>
>>
>> --
>> Kazunori Shinhira
>> Mail : [email protected]
>>
>

-- 
Kazunori Shinhira
Mail : [email protected]

Reply via email to