Hi 唐云

以下是state定义以及初始化的code

public class FlinkKeyedProcessFunction extends KeyedProcessFunction<String,
Tuple2&lt;String, ObjectNode>, Tuple2<String, JsonNode>> {

    private static final Logger LOG =
LoggerFactory.getLogger(FlinkKeyedProcessFunction.class);

  ...

    private final ParameterTool params;
    private transient ListState<ObjectNode> unmatchedProbesState;

    ...

    FlinkKeyedProcessFunction(ParameterTool params) {
        this.params = params;
    }

    @Override
    public void open(Configuration parameters) {

        ListStateDescriptor<ObjectNode> descriptor = new
ListStateDescriptor<>(
                "unmatchedProbes", TypeInformation.of(ObjectNode.class)
        );
        unmatchedProbesState = getRuntimeContext().getListState(descriptor);

以下是往state里add内容的部分
   ...

        List<ObjectNode> unmatchedProbes =
mapMatching.getUnMatchedProbes(id);
        unmatchedProbesState.clear();

        if (unmatchedProbes.size() > 0) {
            try {
                unmatchedProbesState.addAll(unmatchedProbes);
            } catch (Exception e) {
                LOG.warn("Continue processing although failed to add
unmatchedProbes to ListState. ID: " + id, e);
            }
        }

       ...

以下是从state读取的code

                    for (ObjectNode unmatchedProbe :
unmatchedProbesState.get()) {
                        LOG.info("Processing unmatched probe: " +
unmatchedProbe);
                        matchedValues.addAll(mapMatching.matchLocation(id,
unmatchedProbe));
                    }


之前的问题是,从state读取时对于一部分key,unmatchedProbesState.get()返回空值。(实际有值)
去掉定义state那里的transient之后,上述问题不再出现。

谢谢。
Rising




--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复