Nice! Thanks for the confirmation :-) Am Mo., 29. Apr. 2019 um 13:21 Uhr schrieb Avi Levi <[email protected] >:
> Thanks! Works like a charm :) > > On Mon, Apr 29, 2019 at 12:11 PM Fabian Hueske <[email protected]> wrote: > >> *This Message originated outside your organization.* >> ------------------------------ >> Hi Avi, >> >> I'm not sure if you cannot emit data from the keyed state when you >> receive a broadcasted message. >> The Context parameter of the processBroadcastElement() method in the >> KeyedBroadcastProcessFunction has the applyToKeyedState() method. >> The method takes a KeyedStateFunction that is applied to each key of a >> state, but does not provide a Collector to emit data. >> Maybe you can pass the collector to the KeyedStateFunction and emit >> records while it iterates over the key space. >> >> Best, Fabian >> >> Am Fr., 26. Apr. 2019 um 17:35 Uhr schrieb Avi Levi < >> [email protected]>: >> >>> Hi Timo, >>> I defiantly did. but broadcasting a command and trying to address the >>> persisted state (I mean the state of the data stream and not the >>> broadcasted one) you get the exception that I wrote >>> (java.lang.NullPointerException: No key set. This method should not be >>> called outside of a keyed context). e.g doing something like >>> >>> override def processBroadcastElement(value: BroadcastRequest, ctx: >>> KeyedBroadcastProcessFunction[String, Request, BroadcastRequest, >>> Response]#Context, out: Collector[Response]): Unit = { >>> value match { >>> case Command(StateCmd.Fetch, _) => >>> if (state.value() != null) { >>> ouout.collecy(state.value()) >>> } >>> >>> will yield that exception >>> >>> BR >>> Avi >>> >>> On Fri, Apr 26, 2019 at 11:55 AM Timo Walther <[email protected]> >>> wrote: >>> >>>> This Message originated outside your organization. >>>> >>>> Hi Avi, >>>> >>>> did you have a look at the .connect() and .broadcast() API >>>> functionalities? They allow you to broadcast a control stream to all >>>> operators. Maybe this example [1] or other examples in this repository >>>> can help you. >>>> >>>> Regards, >>>> Timo >>>> >>>> [1] >>>> >>>> https://github.com/ververica/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/solutions/datastream_java/broadcast/TaxiQuerySolution.java >>>> >>>> Am 26.04.19 um 07:57 schrieb Avi Levi: >>>> > Hi, >>>> > We have a keyed pipeline with persisted state. >>>> > Is there a way to broadcast a command and collect all values that >>>> > persisted in the state ? >>>> > >>>> > The end result can be for example sending a fetch command to all >>>> > operators and emitting the results to some sink >>>> > >>>> > why do we need it ? from time to time we might want to check if we >>>> are >>>> > missing keys what are the additional keys or simply emit the current >>>> > state to a table and to query it. >>>> > >>>> > I tried simply broadcasting a command and addressing the persisted >>>> > state but that resulted with: >>>> > java.lang.NullPointerException: No key set. This method should not be >>>> > called outside of a keyed context. >>>> > >>>> > is there a good way to achieve that ? >>>> > >>>> > Cheers >>>> > Avi >>>> >>>>
