One thing to consider could be using a CoProcessFunction instead of a 
BroadcastProcessFunction, and calling .broadcast on the input stream you want 
every task manager to receive. Then you could follow the pattern you laid out 
in your sample code (e.g. initialize state in the initializeState function, 
update myState in processElement2, and do your business logic in 
processElement1). You would still need some way to initialize your state with 
cached values, but you would have needed that anyway with the code sample you 
shared.

From: Nick Bendtner <buggi...@gmail.com>
Date: Tuesday, January 26, 2021 at 12:31 PM
To: Guowei Ma <guowei....@gmail.com>
Cc: user <user@flink.apache.org>
Subject: Re: Initializing broadcast state

Thanks a lot Guowei, that makes sense. I will go with the caching approach. Can 
you point me to any example which shows what is the most efficient way to cache 
elements.
Thanks a ton for your help.

Best,
Nick

On Mon, Jan 25, 2021 at 10:38 PM Guowei Ma 
<guowei....@gmail.com<mailto:guowei....@gmail.com>> wrote:
Hi,Nick
I do not think you could update the `myState`  in the 
`processBroadcastElement`. It is because you need a key before to update the 
keyedstate. But there is no key in `processBroadcastElement` .
Best,
Guowei


On Tue, Jan 26, 2021 at 6:31 AM Nick Bendtner 
<buggi...@gmail.com<mailto:buggi...@gmail.com>> wrote:
Hi Guowei,
I am not using a keyed broadcast function, I use [1].  My question is, can a 
non broadcast state, for instance value state/map state be updated whenever I 
get a broadcast event in processBroadcastElement. This way the state updates 
are consistent since each instance of the task gets the same broadcast element.

```
private MapState<String, MyState> myState;

@Override
   public void processElement(InputType value, ReadOnlyContext ctx, 
Collector<OutputType> out) throws Exception {
     // Iterate over map state.
       myState.iterator().forEach(entry -> ())// Business logic

       // Do things
   }

   @Override
   public void processBroadcastElement(BroadcastedStateType value, Context ctx, 
Collector<OutputType> out) throws Exception {
     // update map state which is not a broadcast state. Same update in every 
sub operator
       state.put(value.ID(), value.state());   // Update the mapState with 
value from broadcast
   }


   @Override
 public void snapshotState(FunctionSnapshotContext context) throws Exception {

     // called when it's time to save state

     myState.clear();

         // Update myState with current application state

 }

 @Override
 public void initializeState(FunctionInitializationContext context) throws 
Exception {

     // called when things start up, possibly recovering from an error

     descriptor = new MapStateDescriptor<>("state", Types.STRING, 
Types.POJO(BroadcastedStateType.class));

     myState = context.getKeyedStateStore().getMapState(descriptor);

     if (context.isRestored()) {

         // restore application state from myState

     }

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.html<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dmaster_api_java_org_apache_flink_streaming_api_functions_co_BroadcastProcessFunction.html&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=a5MrOON04pU86P3jwvZ4_BVB9kp3qscpfOyt31y31Ls&s=1X1Y093CuuClMSFYxz1xEQRC4Q6vrOHSI9a1blWHUw4&e=>.


Best,
Nick.

On Sun, Jan 24, 2021 at 11:54 PM Guowei Ma 
<guowei....@gmail.com<mailto:guowei....@gmail.com>> wrote:
Hi,Nick
Normally you could not iterate all the keyed states, but the `BroadCastState` & 
`applyTokeyedState` could do that.
For example, before you get the broadcast side elements you might choose to 
cache the non-broadcast element to the keyed state. After the broadcast 
elements arrive you need to use `applyTokeyedState`[1] to iterate all the 
elements you "cached" in the keyed state and do your business logic.

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/stream/state/broadcast_state.html<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.12_dev_stream_state_broadcast-5Fstate.html&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=a5MrOON04pU86P3jwvZ4_BVB9kp3qscpfOyt31y31Ls&s=oAThncwHP8SAi-8hRn9eX1Wv5xHKHhH98qWYbvZ8LV8&e=>

Best,
Guowei


On Mon, Jan 25, 2021 at 12:59 PM Nick Bendtner 
<buggi...@gmail.com<mailto:buggi...@gmail.com>> wrote:
Thanks Guowei. Another question I have is, what is the use of a broadcast state 
when I can update a map state or value state inside of the process broadcast 
element method and use that state to do a lookup in the process element method 
like this example
https://stackoverflow.com/questions/58307154/initialize-the-content-of-a-mapstate<https://urldefense.proofpoint.com/v2/url?u=https-3A__stackoverflow.com_questions_58307154_initialize-2Dthe-2Dcontent-2Dof-2Da-2Dmapstate&d=DwMFaQ&c=qE8EibqjfXM-zBfebVhd4gtjNZbrDcrKYXvb1gt38s4&r=zKznthi6OTKpoJID9dIcyiJ28NX59JIQ2bD246nnMac&m=a5MrOON04pU86P3jwvZ4_BVB9kp3qscpfOyt31y31Ls&s=vdP28GzL-JSh1j2oXXsDF4dEbXFPPuJ8O7I-Kcgx66s&e=>


Best,
Nick
On Sun, Jan 24, 2021 at 9:23 PM Guowei Ma 
<guowei....@gmail.com<mailto:guowei....@gmail.com>> wrote:
Hi, Nick
  You might need to handle it yourself If you have to process an element only 
after you get the broadcast state.
  For example, you could “cache” the element to the state and handle it when 
the element from the broadcast side elements are arrived. Specially if you are 
using the `KeyedBroadcastProcessFunction` you could use the `applyToKeyedState` 
to access the element you cache before.

Best,
Guowei


On Mon, Jan 25, 2021 at 10:24 AM Nick Bendtner 
<buggi...@gmail.com<mailto:buggi...@gmail.com>> wrote:
Hi guys,
What is the way to initialize broadcast state(say with default values) before 
the first element shows up in the broadcasting stream? I do a lookup on the 
broadcast state to process transactions which come from another stream. The 
problem is the broadcast state is empty until the first element shows up.


Best,
Nick.

Reply via email to