When your operator process a value with the key, then the ValueState is implicitly scoped to that key. So, If all your payloads have a unique ID (and hence unique key) then the ValueState will initially always be null. Only if 2 payloads have the same ID will be ValueState return something non-null: the second payload will be see the first one.

Behavior-wise, think of it as Flink having an internal Map<KEY, VALUE>, and ValueState being a thin wrapper that automatically inserts the key for put/get operations.

On 12/07/2021 22:08, Marzi K wrote:
Hi,

1) The ValueState can only return a non-null value if a prior value with the same key (in your case, "x.id <http://x.id>") has been received. Have you double-checked that this is the case?

I use the payloads in each operator to make post requests. So the payloads are not null and I keyBy using the id field in each payload.
Not sure what else could be causing the null values.

Also in the checkpointing tab on Flink Dashboard, I see MB sizes for data persisted. But when I print out the the valueState in the code, it shows as null.
So wondering if it’s actually saving data or it’s null.

Thanks again,
Marzi


On Jul 12, 2021, at 3:21 AM, Chesnay Schepler <ches...@apache.org <mailto:ches...@apache.org>> wrote:

1) The ValueState can only return a non-null value if a prior value with the same key (in your case, "x.id <http://x.id>") has been received. Have you double-checked that this is the case?

2) Checkpointing does not alleviate the need to restart all operators, it alleviates having to reprocess all data. It is expected that the entire pipeline is restarted and not just the failed operator. In a nutshell this happens because a) downstream operators would otherwise receive duplicate messages (all message since the last checkpoint until the operator failure)
b) upstream operators need to reproduce the data to process.

On 11/07/2021 00:35, Marzi K wrote:
Hi All,

I have a simple POC project to understand Flink state management and integration with Kafka. Thanks in advance for helping me understand and resolve the following issues.

I have a FlinkKafkaConsumer that reads payloads from a separate FlinkKafkaProducer. The final 3 operators in my pipeline are keyed and stateful to save the content of passed payload in 3 different stages. To verify checkpointing and correctness of ValueStates in my code, I kill one taskManager manually, and as expected, the job restarts on a second TM.
But I have noticed the following:

(1) When I print out the valueState.value, it always shows as null even though in the checkpointing dir, I see some files and _metaData getting saved. I suspected that maybe it’s due to Kryo serialization that gets used due to JsonNode.class getting processed as General Type. So I changed my state Object to a POJO but still getting null.

(2) when the job restarts, A few of the payloads keep starting from the very beginning of the pipeline and keep repeating over and over. I noticed this is happening because one of the intermediate operators starts failing (s3 upload) after restart on second TM. So this raised the question: When one operator fails, shouldn’t the failed message only retry the failed operator and not start from the beginning of the pipeline? If so, does this further prove that the operator checkpoints are not happening properly and thus the message needs to start from the very beginning of the pipeline?

Including semi sudo code for reference:

_In Main class:_

//operators methodA and methodB don't have ValueState
DataStream<JsonNode> payloadStream =
env.addSource(kafkaConsumer)
        .map((payload) -> methodA(payload))
        .map((payload) -> methodB(payload))
        .keyBy((payload) -> payload.get("x").get("id").asText());

DataStream<Tuple2<JsonNode,JsonNode>> responseStream =
*payloadStream.flatMap(new RichFlatMapA()).name("Post Payload1").uid("Post Payload1")*         .keyBy((jsonNodeTuple) -> jsonNodeTuple.f0.get("x").get("id").asText())
*.flatMap(new RichFlatMapB()).name("S3 upload").uid("S3 upload")*
        .keyBy((jsonNodeTuple) -> jsonNodeTuple.f0.get("x").get("id").asText()) *  .flatMap(new RichFlatMapC()).name("Post Payload2").uid("Post Payload2”)*
*
*
_In one of the operators where I’d like to make payload json stateful: _

public class RichFlatMapA extends RichFlatMapFunction<JsonNode, Tuple2<JsonNode,JsonNode>> {

    private ValueState<JsonNode> payload;

    public void open(Configuration config) {
        payload = getRuntimeContext().getState(new ValueStateDescriptor<>("saved payload"), JsonNode.class)
    }

    public void flatMap(JsonNode jsonNode, Collector<Tuple2<JsonNode, JsonNode>> collector) {         JsonNode payload = this.payload.value(); //payload value is always null

        if (payload != null) {
this.payload.clear();
        } else {
this.payload.update(payload);
        }
        httpPost(jsonNode, collector);
    }
}

Thank you,
Marzi




Reply via email to