Hi

首先,transient ListState 这种用法绝对是没有问题的,你可以在Flink源码中找到大量的使用例子 [1]

可以排查的思路

  1.  你的state是否开启了TTL呢
  2.  能否在写入的时候,进行打印,然后在get会空iterator时进行对比,确认之前是写入成功的。
  3.  目前使用的state backend是什么,更换一种state backend,问题还能复现么

[1] 
https://github.com/apache/flink/blob/25b54a5e261d8dbc8133d0fdc9cae2e653af1ea7/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java#L158

祝好
唐云
________________________________
From: Liu Rising <stockholm...@gmail.com>
Sent: Sunday, September 6, 2020 17:45
To: user-zh@flink.apache.org <user-zh@flink.apache.org>
Subject: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

Hi 唐云

以下是state定义以及初始化的code

public class FlinkKeyedProcessFunction extends KeyedProcessFunction<String,
Tuple2&lt;String, ObjectNode>, Tuple2<String, JsonNode>> {

    private static final Logger LOG =
LoggerFactory.getLogger(FlinkKeyedProcessFunction.class);

  ...

    private final ParameterTool params;
    private transient ListState<ObjectNode> unmatchedProbesState;

    ...

    FlinkKeyedProcessFunction(ParameterTool params) {
        this.params = params;
    }

    @Override
    public void open(Configuration parameters) {

        ListStateDescriptor<ObjectNode> descriptor = new
ListStateDescriptor<>(
                "unmatchedProbes", TypeInformation.of(ObjectNode.class)
        );
        unmatchedProbesState = getRuntimeContext().getListState(descriptor);

以下是往state里add内容的部分
   ...

        List<ObjectNode> unmatchedProbes =
mapMatching.getUnMatchedProbes(id);
        unmatchedProbesState.clear();

        if (unmatchedProbes.size() > 0) {
            try {
                unmatchedProbesState.addAll(unmatchedProbes);
            } catch (Exception e) {
                LOG.warn("Continue processing although failed to add
unmatchedProbes to ListState. ID: " + id, e);
            }
        }

       ...

以下是从state读取的code

                    for (ObjectNode unmatchedProbe :
unmatchedProbesState.get()) {
                        LOG.info("Processing unmatched probe: " +
unmatchedProbe);
                        matchedValues.addAll(mapMatching.matchLocation(id,
unmatchedProbe));
                    }


之前的问题是,从state读取时对于一部分key,unmatchedProbesState.get()返回空值。(实际有值)
去掉定义state那里的transient之后,上述问题不再出现。

谢谢。
Rising




--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复