Hi Piotrek,
Thank you for your reply. I understood that synchronization with checkpoint lock is needed to make state modification and checkpointing exclusive. In my understanding, for example, in implementation of SourceFunction for Kafka, it is necessary to enclose the process of acquiring records and updating current offset, that is the state of kafka SourceFunction, within synchronized code. If we don't use synchronized block, `StreamOperator#snapshotState` will be called concurrently with state modification. As a result, Source may save wrong offset and lost record if job recreation occurs at that timing. Is my understanding correct ? Thank you for the information on the new Source interface. I’ll look into how to implement it. Best, 2021年1月18日(月) 23:45 Piotr Nowojski <[email protected]>: > Hi Kazunori, > > The checkpoint lock is acquired preemptively inside the > SourceContext#collect call for the cases if the source is state less. > However this is not enough if you are implementing a stateful > `SourceFunction`, since if you are modifying state in your source function > outside of the checkpoint lock scope, those updates would be happening > concurrently to the > `org.apache.flink.streaming.api.operators.StreamOperator#snapshotState` > call and bad things would happen. `StreamOperator#snapshotState` is one > example of actions that are being executed in the > `StreamTaskActionExecutor`. `StreamTaskActionExecutor` depending on the > execution context, if it's happening in a source task or not, will be or > will not be acquiring the checkpoint lock. > > Also please note that `SourceFunction` is currently (as of Flink 1.12.0) > being phased out, in favor of the new > `org.apache.flink.api.connector.source.Source` interface. Amongst other > improvements, this newer interface has a single thread execution model, > without a dedicated thread to run the source code, so there is no need for > the checkpoint lock. > > Best, > Piotrek > > pon., 18 sty 2021 o 13:05 Kazunori Shinhira <[email protected]> > napisał(a): > >> Hi, >> >> >> I have a question about implementing method of SourceFunction. >> >> I'm trying to implement my own SourceFunction using Flink 1.11.3. >> >> The following javadoc says that SourceFunction which implements >> CheckpointedFunction should use synchronized block to perform checkpointing >> and emission of elements atomically. >> >> >> See >> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction. >> html >> >> >> >> On the other hand, from the implementation of StreamSourceContexts and >> StreamTaskActionExecutor, it looks like that the SourceContext.collect and >> checkpointing processes are exclusive. >> >> I’m sorry if I misunderstood. >> >> >> >> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java#L106 >> >> >> https://github.com/apache/flink/blob/release-1.11.3/flink-streaming-https://github.com/apache/flink/blob/release-1.11.3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskActionExecutor.java#L91 >> >> >> >> My Question is that is the synchronized block in SourceFunction necessary? >> >> And why is it necessary? >> >> >> >> >> Best regards, >> >> -- >> Kazunori Shinhira >> Mail : [email protected] >> > -- Kazunori Shinhira Mail : [email protected]
