Hi,all:
flink算子在多个并行度的job中,每个算子(假如1个算子都有一个对应的java类)在多个subtask中会共享1个java类实例吗?还是每个subtask都会各自的实例?
我做了一个简单测试,写了一个flatmap类,在构造方法和open方法中打印类的toString方法,发现输出都不同,是否可以证明每个subtask都初始化了自己的类实例?
希望有朋友能解释下算子在job运行中初始化的过程。
测试相关代码如下:
// flink 1.10.2版本,并行度为3
@Slf4j
public class PersonFlatMap extends RichFlatMapFunction<Tuple2<String, String>,
Person> {
private transient ValueState<Integer> state;
public PersonFlatMap(){
log.info(String.format("PersonFlatMap【%s】: 创建实例",this.toString()));
}
@Override
public void open(Configuration parameters) throws IOException {
//略去无关代码...
log.info(String.format("PersonFlatMap【%s】:初始化状态!", this.toString()));
}
@Override
public void flatMap(Tuple2<String, String> t, Collector<Person> collector)
throws Exception {
Person p = JSONUtil.toObject(t.f1,Person.class);
collector.collect(p);
if(state.value() == null){state.update(0);}
state.update(state.value() + 1);
log.info("state: "+state.value());
}
}
//测试日志输出
...
flink-10.2 - [2020-11-16 13:41:54.360] - INFO [main]
com.toonyoo.operator.PersonFlatMap -
PersonFlatMap【com.toonyoo.operator.PersonFlatMap@ba8d91c】: 创建实例
//此处略去无关日志...
flink-10.2 - [2020-11-16 13:42:00.326] - INFO [Flat Map -> Sink: Print to Std.
Out (1/3)] org.apache.flink.runtime.state.heap.HeapKeyedStateBackend -
Initializing heap keyed state backend with stream factory.
flink-10.2 - [2020-11-16 13:42:00.351] - INFO [Flat Map -> Sink: Print to Std.
Out (1/3)] com.toonyoo.operator.PersonFlatMap -
PersonFlatMap【com.toonyoo.operator.PersonFlatMap@5c9d895d】:初始化状态!
flink-10.2 - [2020-11-16 13:42:00.354] - INFO [Flat Map -> Sink: Print to Std.
Out (3/3)] com.toonyoo.operator.PersonFlatMap -
PersonFlatMap【com.toonyoo.operator.PersonFlatMap@5c489c40】:初始化状态!
flink-10.2 - [2020-11-16 13:42:00.356] - INFO [Flat Map -> Sink: Print to Std.
Out (2/3)] com.toonyoo.operator.PersonFlatMap -
PersonFlatMap【com.toonyoo.operator.PersonFlatMap@1cd4f5c9】:初始化状态!
...
[email protected]