Hi,

In short, [1] means whether the job will trigger checkpoints, and [2] means 
which operators will take action when checkpoints are triggered.
If use ExampleCountSource, flink-streaming-java should be a dependency in 
pom.xml and classes such as ListState, ListStateDescriptor, 
FunctionInitializationContext, FunctionSnapshotContext, CheckpointedFunction, 
SourceFunction should be import.


By the way, I'm not sure whether this mail will be displayed well because it's 
the first time for me to write such a formatted one. If not, please let me know.




------------------------------------------------------------------------------------
Detailed reply for question 1:


CheckpointedFunction is not necessary to trigger or complete a checkpoint. 


A job will trigger a checkpoint when all its tasks are running and 
checkpointing was enabled using code in [1], such as 
env.enableCheckpointing(xxx). Your job in the first mail didn't trigger a 
checkpoint because the source was not running at the time of the first 
checkpoint (rather than checkpoint was not enabled).


However, for some functions and operators, checkpoints make no sense. 


Take the code in that word count demo for an example:


source → flatMap → keyBy → sum → print


Assume the data:


aaa bbb aaa
bbb ccc
aaa
bbb
aaa ccc ddd


And assume the job failed because of somewhat error after dealing with the 
first 3 lines.


aaa bbb aaa
bbb ccc
aaa
-- job fail
-- job recover
bbb
aaa ccc ddd


When the source operator and the sum operator recover from a failure, they'll 
need a checkpoint.
The source operator wants to know where to start (the 4th line) because some 
data may already be done before the failure. The sum operator wants to know 
what's the count of every word before the failure (aaa:3, bbb:2, ccc:1) so that 
when new sentences coming they can be calculated correctly.


However, the flatMap operator doesn't need a checkpoint at all. Whenever a 
sentence comes, split it. This operator requires nothing from a checkpoint to 
recover. 


CheckpointedFunction in [2] is to distinguish these stateful functions from all 
the functions (It's not the only way, but the most flexible one). See [3] and 
[4] for more information.


------------------------------------------------------------------------------------
Detailed reply for question 2:


Here's my sample code for ExampleCountSource.java


|
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

publicclassExampleCountSourceimplements SourceFunction<Long>, 
CheckpointedFunction {
    privatelong count = 0L;
    privatevolatileboolean isRunning = true;

    privatetransient ListState<Long> checkpointedCount;

    @Override
    publicvoid run(SourceContext<Long> ctx) throws Exception {
        while (isRunning && count < 1000) {
            // this synchronized block ensures that state checkpointing,// 
internal state updates and emission of elements are an atomic 
operationsynchronized (ctx.getCheckpointLock()) {
                ctx.collect(count);
                count++;
            }
        }
    }

    @Override
    publicvoid cancel() {
        isRunning = false;
    }

    @Override
    publicvoid initializeState(FunctionInitializationContext context) throws 
Exception {
        this.checkpointedCount = context
                .getOperatorStateStore()
                .getListState(new ListStateDescriptor<>("count", Long.class));

        if (context.isRestored()) {
            for (Long count : this.checkpointedCount.get()) {
                this.count = count;
            }
        }
    }

    @Override
    publicvoid snapshotState(FunctionSnapshotContext context) throws Exception {
        this.checkpointedCount.clear();
        this.checkpointedCount.add(count);
    }
}

|



[1]. 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#java
[2]. 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html
[3]. 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/stateful-stream-processing.html
[4]. 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.html


Regards,
Smile

Reply via email to