Hello Nicolaus,

Unfortunately, I don't really have the hand on the custom state solution
since it is managed by an existing system which cannot be easily modified.

What I finally did for the "data state" in my CoFlatMapFunction is to use a*
list-style operator state* to store the partitioned state for a key per
element in the list with an *union redistribution* scheme in case of
restore/redistribution.
Not sure if it's really efficient (need to do more tests) but all operators
are then receiving the same whole custom state from which the partitioned
state for the assigned keys can then be retrieved inside every operator
parallel task besides the other keyed state (control state):




*// Control state partitioned by userId (keyed state) private
ValueState<Control> controlState; // Data state partitioned by userId
(operator state) private ListState<Tuple2<String, byte[]> dataState;*

To avoid "state explosion", I also added a custom TTL-based cleanup
mechanism for this operator state to remove elements in the list which are
not used for some time.
However, I am still interested in any other better solution if available in
Flink.

Thank you for your help.

Best Regards,
Marc


Le mar. 12 oct. 2021 à 09:02, Nicolaus Weidner <
nicolaus.weid...@ververica.com> a écrit :

> Hi Marc,
>
> thanks for clarifying, I had misunderstood some parts.
> Unfortunately, I don't think there is a way to update keyed state (for
> multiple keys even) outside of a keyed context.
>
> I will ask if someone else has an idea, but allow me to ask one
> counter-question first: Did you actually run tests to verify that using the
> custom state solution is more efficient than using Flink's keyed state
> regularly (in the end, you would even have to include the state
> synchronization in the performance test)? Efficient stateful stream
> processing is one of the key features of Flink, and you are essentially
> trying to override a specific piece of it with custom logic.
>
> Best regards,
> Nico
>
> On Wed, Oct 6, 2021 at 5:50 PM Marc LEGER <maleger...@gmail.com> wrote:
>
>> Hello Nicolaus,
>>
>> Thank you for your quick feedback, sorry if I am not clear enough.
>> Actually in the documented example, the state which is updated in the
>> snapshotState method is an operator state and not a keyed state:
>>
>> *public void initializeState(FunctionInitializationContext context)
>> throws Exception {*
>>
>>
>> *  [...]*
>>
>> *  countPerPartition =
>> context.getOperatorStateStore().getOperatorState(new
>> ListStateDescriptor<>("perPartitionCount", Long.class));*
>>
>>
>>
>>
>> *  [...] } public void snapshotState(FunctionSnapshotContext context)
>> throws Exception {*
>>
>>
>> *  [...]*
>>
>> *  countPerPartition.add(localCount);*
>>
>> *}*
>>
>>
>> It seems that the method is then only called once per operator parallel
>> task and not once per key.
>> On my side I have two keyed states with same key (e.g., userId) in a
>> CoFlatMapFunction:
>>
>>
>>
>>
>> *// Control state partitioned by userId private ValueState<Control>
>> controlState; // Data state partitioned by userId coming from the
>> ser/deserialization of a custom system having a partitioned state private
>> ValueState<byte[]> dataState;*
>>
>> and I would like to do something like that to update dataState in a keyed
>> context for every key and every checkpoint:
>>
>>
>>
>> *public void snapshotState(FunctionSnapshotContext context) throws
>> Exception {  dataState.update(customSystem.getSnapshot(context.getKey());
>> // Not a keyed context here ! }*
>>
>> instead of saving dataState in the flatMap2 function for every received
>> event:
>>
>>
>> *public void flatMap1(Control control, Collector<Control> out) {*
>>
>> *   controlState.update(control); *
>>
>> *}*
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *public void flatMap2(Event event, Collector<ProcessedEvent> out) {  //
>> Perform some event transformations based on controlState  ProcessedEvent
>> result = customSystem.process(controlState.value() , event);  // Save
>> internal custom system state after processing: can be costly if high event
>> throughput
>> dataState.update(customSystem.getSnapshot(controlState.value().getUserId());
>> // Output the processed event  out.collect(result); }*
>>
>>
>> So basically, I want to be able to synchronize the partitioned state of
>> my custom system with the checkpoints done by Flink.
>>
>>
>> Best Regards,
>> Marc
>>
>> Le mer. 6 oct. 2021 à 12:11, Nicolaus Weidner <
>> nicolaus.weid...@ververica.com> a écrit :
>>
>>> Hi Marc,
>>>
>>> I think you can just use keyed state in a
>>> CheckpointedFunction. FunctionInitializationContext gives you access to
>>> both keyed state and operator state (your stream needs to be keyed, of
>>> course). So you could just update your local custom state on regular
>>> invocations and update keyed state on calls to snapshotState.
>>> Check out the example in [1] where both types of state are used.
>>>
>>> Does that help? Not sure if I understood the problem correctly.
>>>
>>> Best regards,
>>> Nico
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.java#L74-L110
>>>
>>> On Tue, Oct 5, 2021 at 3:28 PM Marc LEGER <maleger...@gmail.com> wrote:
>>>
>>>> Hello,
>>>>
>>>> Is there any method available in a RichFunction to be called by Flink
>>>> with a keyed context each time a checkpoint is triggered please ?
>>>>
>>>> It seems that the CheckpointedFunction interface provides such a
>>>> feature (snapshotState method) but only in case of operator state and it is
>>>> called in a non-keyed context.
>>>>
>>>> Indeed, I am implementing a CoFlatMapFunction with:
>>>> - a keyed state (state1) for a "control" stream (stream1) which is not
>>>> often updated,
>>>> - a keyed state (state2) for a "data" stream (stream2) with a high
>>>> throughput and relying on a custom solution for internal state snapshot
>>>> with some potential performance impact.
>>>>
>>>> Consequently, I don't want to trigger a state2 update for every event
>>>> received in stream2 for efficiency reasons but rather update state2 based
>>>> on checkpoints triggered by Flink.
>>>>
>>>> Best Regards,
>>>> Marc
>>>>
>>>>

Reply via email to