When your operator process a value with the key, then the ValueState is
implicitly scoped to that key.
So, If all your payloads have a unique ID (and hence unique key) then
the ValueState will initially always be null.
Only if 2 payloads have the same ID will be ValueState return something
non-null: the second payload will be see the first one.
Behavior-wise, think of it as Flink having an internal Map<KEY, VALUE>,
and ValueState being a thin wrapper that automatically inserts the key
for put/get operations.
On 12/07/2021 22:08, Marzi K wrote:
Hi,
1) The ValueState can only return a non-null value if a prior value
with the same key (in your case, "x.id <http://x.id>") has been
received. Have you double-checked that this is the case?
I use the payloads in each operator to make post requests. So the
payloads are not null and I keyBy using the id field in each payload.
Not sure what else could be causing the null values.
Also in the checkpointing tab on Flink Dashboard, I see MB sizes for
data persisted. But when I print out the the valueState in the code,
it shows as null.
So wondering if it’s actually saving data or it’s null.
Thanks again,
Marzi
On Jul 12, 2021, at 3:21 AM, Chesnay Schepler <ches...@apache.org
<mailto:ches...@apache.org>> wrote:
1) The ValueState can only return a non-null value if a prior value
with the same key (in your case, "x.id <http://x.id>") has been
received. Have you double-checked that this is the case?
2) Checkpointing does not alleviate the need to restart all
operators, it alleviates having to reprocess all data.
It is expected that the entire pipeline is restarted and not just the
failed operator. In a nutshell this happens because
a) downstream operators would otherwise receive duplicate messages
(all message since the last checkpoint until the operator failure)
b) upstream operators need to reproduce the data to process.
On 11/07/2021 00:35, Marzi K wrote:
Hi All,
I have a simple POC project to understand Flink state management and
integration with Kafka. Thanks in advance for helping me understand
and resolve the following issues.
I have a FlinkKafkaConsumer that reads payloads from a separate
FlinkKafkaProducer.
The final 3 operators in my pipeline are keyed and stateful to save
the content of passed payload in 3 different stages.
To verify checkpointing and correctness of ValueStates in my code, I
kill one taskManager manually, and as expected, the job restarts on
a second TM.
But I have noticed the following:
(1) When I print out the valueState.value, it always shows as null
even though in the checkpointing dir, I see some files and _metaData
getting saved.
I suspected that maybe it’s due to Kryo serialization that gets used
due to JsonNode.class getting processed as General Type. So I
changed my state Object to a POJO but still getting null.
(2) when the job restarts, A few of the payloads keep starting from
the very beginning of the pipeline and keep repeating over and over.
I noticed this is happening because one of the intermediate
operators starts failing (s3 upload) after restart on second TM. So
this raised the question:
When one operator fails, shouldn’t the failed message only retry the
failed operator and not start from the beginning of the pipeline? If
so, does this further prove that the operator checkpoints are not
happening properly and thus the message needs to start from the very
beginning of the pipeline?
Including semi sudo code for reference:
_In Main class:_
//operators methodA and methodB don't have ValueState
DataStream<JsonNode> payloadStream =
env.addSource(kafkaConsumer)
.map((payload) -> methodA(payload))
.map((payload) -> methodB(payload))
.keyBy((payload) -> payload.get("x").get("id").asText());
DataStream<Tuple2<JsonNode,JsonNode>> responseStream =
*payloadStream.flatMap(new RichFlatMapA()).name("Post
Payload1").uid("Post Payload1")*
.keyBy((jsonNodeTuple) ->
jsonNodeTuple.f0.get("x").get("id").asText())
*.flatMap(new RichFlatMapB()).name("S3 upload").uid("S3 upload")*
.keyBy((jsonNodeTuple) ->
jsonNodeTuple.f0.get("x").get("id").asText())
* .flatMap(new RichFlatMapC()).name("Post Payload2").uid("Post
Payload2”)*
*
*
_In one of the operators where I’d like to make payload json stateful: _
public class RichFlatMapA extends RichFlatMapFunction<JsonNode,
Tuple2<JsonNode,JsonNode>> {
private ValueState<JsonNode> payload;
public void open(Configuration config) {
payload = getRuntimeContext().getState(new
ValueStateDescriptor<>("saved payload"), JsonNode.class)
}
public void flatMap(JsonNode jsonNode,
Collector<Tuple2<JsonNode, JsonNode>> collector) {
JsonNode payload = this.payload.value(); //payload value is
always null
if (payload != null) {
this.payload.clear();
} else {
this.payload.update(payload);
}
httpPost(jsonNode, collector);
}
}
Thank you,
Marzi