Hi,
Thank you for your explanation. I now understand the need for checkpoint lock :) Best, 2021年1月19日(火) 18:00 Piotr Nowojski <[email protected]>: > Hi, > > yes exactly :) > > > As a result, Source may save wrong offset and lost record if job > recreation occurs at that timing. > > This is just one of the possible race conditions that could happen. As > offsets are probably 64 bit integers, I'm pretty sure corrupted > writes/reads can also happen, when only half of the bits (for example lower > 32bits) were updated before checkpointing. > > Best, > Piotrek > > wt., 19 sty 2021 o 02:22 Kazunori Shinhira <[email protected]> > napisał(a): > >> Hi Piotrek, >> >> >> >> Thank you for your reply. >> >> >> I understood that synchronization with checkpoint lock is needed to make >> state modification and checkpointing exclusive. >> >> In my understanding, for example, in implementation of SourceFunction for >> Kafka, it is necessary to enclose the process of acquiring records and >> updating current offset, that is the state of kafka SourceFunction, within >> synchronized code. >> >> If we don't use synchronized block, `StreamOperator#snapshotState` will >> be called concurrently with state modification. >> >> As a result, Source may save wrong offset and lost record if job >> recreation occurs at that timing. >> >> Is my understanding correct ? >> >> >> Thank you for the information on the new Source interface. >> >> I’ll look into how to implement it. >> >> >> >> Best, >> >> 2021年1月18日(月) 23:45 Piotr Nowojski <[email protected]>: >> >>> Hi Kazunori, >>> >>> The checkpoint lock is acquired preemptively inside the >>> SourceContext#collect call for the cases if the source is state less. >>> However this is not enough if you are implementing a stateful >>> `SourceFunction`, since if you are modifying state in your source function >>> outside of the checkpoint lock scope, those updates would be happening >>> concurrently to the >>> `org.apache.flink.streaming.api.operators.StreamOperator#snapshotState` >>> call and bad things would happen. `StreamOperator#snapshotState` is one >>> example of actions that are being executed in the >>> `StreamTaskActionExecutor`. `StreamTaskActionExecutor` depending on the >>> execution context, if it's happening in a source task or not, will be or >>> will not be acquiring the checkpoint lock. >>> >>> Also please note that `SourceFunction` is currently (as of Flink 1.12.0) >>> being phased out, in favor of the new >>> `org.apache.flink.api.connector.source.Source` interface. Amongst other >>> improvements, this newer interface has a single thread execution model, >>> without a dedicated thread to run the source code, so there is no need for >>> the checkpoint lock. >>> >>> Best, >>> Piotrek >>> >>> pon., 18 sty 2021 o 13:05 Kazunori Shinhira <[email protected]> >>> napisał(a): >>> >>>> Hi, >>>> >>>> >>>> I have a question about implementing method of SourceFunction. >>>> >>>> I'm trying to implement my own SourceFunction using Flink 1.11.3. >>>> >>>> The following javadoc says that SourceFunction which implements >>>> CheckpointedFunction should use synchronized block to perform checkpointing >>>> and emission of elements atomically. >>>> >>>> >>>> See >>>> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction. >>>> html >>>> >>>> >>>> >>>> On the other hand, from the implementation of StreamSourceContexts and >>>> StreamTaskActionExecutor, it looks like that the SourceContext.collect and >>>> checkpointing processes are exclusive. >>>> >>>> I’m sorry if I misunderstood. >>>> >>>> >>>> >>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java#L106 >>>> >>>> >>>> https://github.com/apache/flink/blob/release-1.11.3/flink-streaming-https://github.com/apache/flink/blob/release-1.11.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskActionExecutor.java#L91 >>>> >>>> >>>> >>>> My Question is that is the synchronized block in SourceFunction >>>> necessary? >>>> >>>> And why is it necessary? >>>> >>>> >>>> >>>> >>>> Best regards, >>>> >>>> -- >>>> Kazunori Shinhira >>>> Mail : [email protected] >>>> >>> >> >> -- >> Kazunori Shinhira >> Mail : [email protected] >> > -- Kazunori Shinhira Mail : [email protected]
