Hi 首先,transient ListState 这种用法绝对是没有问题的,你可以在Flink源码中找到大量的使用例子 [1]
可以排查的思路 1. 你的state是否开启了TTL呢 2. 能否在写入的时候,进行打印,然后在get会空iterator时进行对比,确认之前是写入成功的。 3. 目前使用的state backend是什么,更换一种state backend,问题还能复现么 [1] https://github.com/apache/flink/blob/25b54a5e261d8dbc8133d0fdc9cae2e653af1ea7/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java#L158 祝好 唐云 ________________________________ From: Liu Rising <stockholm...@gmail.com> Sent: Sunday, September 6, 2020 17:45 To: user-zh@flink.apache.org <user-zh@flink.apache.org> Subject: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取 Hi 唐云 以下是state定义以及初始化的code public class FlinkKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, ObjectNode>, Tuple2<String, JsonNode>> { private static final Logger LOG = LoggerFactory.getLogger(FlinkKeyedProcessFunction.class); ... private final ParameterTool params; private transient ListState<ObjectNode> unmatchedProbesState; ... FlinkKeyedProcessFunction(ParameterTool params) { this.params = params; } @Override public void open(Configuration parameters) { ListStateDescriptor<ObjectNode> descriptor = new ListStateDescriptor<>( "unmatchedProbes", TypeInformation.of(ObjectNode.class) ); unmatchedProbesState = getRuntimeContext().getListState(descriptor); 以下是往state里add内容的部分 ... List<ObjectNode> unmatchedProbes = mapMatching.getUnMatchedProbes(id); unmatchedProbesState.clear(); if (unmatchedProbes.size() > 0) { try { unmatchedProbesState.addAll(unmatchedProbes); } catch (Exception e) { LOG.warn("Continue processing although failed to add unmatchedProbes to ListState. ID: " + id, e); } } ... 以下是从state读取的code for (ObjectNode unmatchedProbe : unmatchedProbesState.get()) { LOG.info("Processing unmatched probe: " + unmatchedProbe); matchedValues.addAll(mapMatching.matchLocation(id, unmatchedProbe)); } 之前的问题是,从state读取时对于一部分key,unmatchedProbesState.get()返回空值。(实际有值) 去掉定义state那里的transient之后,上述问题不再出现。 谢谢。 Rising -- Sent from: http://apache-flink.147419.n8.nabble.com/