Hello Nicolaus,

Thank you for your quick feedback, sorry if I am not clear enough.
Actually in the documented example, the state which is updated in the
snapshotState method is an operator state and not a keyed state:

*public void initializeState(FunctionInitializationContext context) throws
Exception {*


*  [...]*

*  countPerPartition = context.getOperatorStateStore().getOperatorState(new
ListStateDescriptor<>("perPartitionCount", Long.class));*




*  [...] } public void snapshotState(FunctionSnapshotContext context)
throws Exception {*


*  [...]*

*  countPerPartition.add(localCount);*

*}*


It seems that the method is then only called once per operator parallel
task and not once per key.
On my side I have two keyed states with same key (e.g., userId) in a
CoFlatMapFunction:




*// Control state partitioned by userId private ValueState<Control>
controlState; // Data state partitioned by userId coming from the
ser/deserialization of a custom system having a partitioned state private
ValueState<byte[]> dataState;*

and I would like to do something like that to update dataState in a keyed
context for every key and every checkpoint:



*public void snapshotState(FunctionSnapshotContext context) throws
Exception {  dataState.update(customSystem.getSnapshot(context.getKey());
// Not a keyed context here ! }*

instead of saving dataState in the flatMap2 function for every received
event:


*public void flatMap1(Control control, Collector<Control> out) {*

*   controlState.update(control); *

*}*









*public void flatMap2(Event event, Collector<ProcessedEvent> out) {  //
Perform some event transformations based on controlState  ProcessedEvent
result = customSystem.process(controlState.value() , event);  // Save
internal custom system state after processing: can be costly if high event
throughput
dataState.update(customSystem.getSnapshot(controlState.value().getUserId());
// Output the processed event  out.collect(result); }*


So basically, I want to be able to synchronize the partitioned state of my
custom system with the checkpoints done by Flink.


Best Regards,
Marc

Le mer. 6 oct. 2021 à 12:11, Nicolaus Weidner <
nicolaus.weid...@ververica.com> a écrit :

> Hi Marc,
>
> I think you can just use keyed state in a
> CheckpointedFunction. FunctionInitializationContext gives you access to
> both keyed state and operator state (your stream needs to be keyed, of
> course). So you could just update your local custom state on regular
> invocations and update keyed state on calls to snapshotState.
> Check out the example in [1] where both types of state are used.
>
> Does that help? Not sure if I understood the problem correctly.
>
> Best regards,
> Nico
>
> [1]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java#L74-L110
>
> On Tue, Oct 5, 2021 at 3:28 PM Marc LEGER <maleger...@gmail.com> wrote:
>
>> Hello,
>>
>> Is there any method available in a RichFunction to be called by Flink
>> with a keyed context each time a checkpoint is triggered please ?
>>
>> It seems that the CheckpointedFunction interface provides such a feature
>> (snapshotState method) but only in case of operator state and it is called
>> in a non-keyed context.
>>
>> Indeed, I am implementing a CoFlatMapFunction with:
>> - a keyed state (state1) for a "control" stream (stream1) which is not
>> often updated,
>> - a keyed state (state2) for a "data" stream (stream2) with a high
>> throughput and relying on a custom solution for internal state snapshot
>> with some potential performance impact.
>>
>> Consequently, I don't want to trigger a state2 update for every event
>> received in stream2 for efficiency reasons but rather update state2 based
>> on checkpoints triggered by Flink.
>>
>> Best Regards,
>> Marc
>>
>>

Reply via email to