That sounds like a great hack :D I'll give it a try for sure. Thank you! /David
On Fri, Mar 24, 2023 at 5:25 PM Schwalbe Matthias < matthias.schwa...@viseca.ch> wrote: > Hi David, > > … coming in late into this discussion > > > > We had a very similar problem and I found a simple way to implement > priming savepoints with mixed keyed/operator state. > > The trick is this: > > - In your KeyedStateBootstrapFunction also implement > CheckpointedFunction > - In initializeState() you can initialize the broad state primitive > (the code skeleton below uses getUnionListState, same principle) > - Then in the processElement() function I process a tuple of state > collections for each state primitive, i.e. event object per key > - For the union list state I forge special key “broadcast”, and only > the 3rd tuple vector contains anything, > - (the upstream operator feeding into this bootstrap function makes > sure only one event with “broadcast” key is generated) > > > > Peruse the code skeleton (scala) if you like (I removed some stuff I’m not > supposed to disclose): > > > > > > /** Event type for state updates for savepoint generation. A 4-tuple of > > * 1) vector of valid [[CSC]] entries > > * 2) a vector of valid [[DAS]] entries > > * 3) a vector of valid broadcast FFR. > > * 4) a vector of timers for state cleanup */ > > type DAFFS = (Vector[CSC], Vector[DAS], Vector[FFR], Vector[Long]) > > > > /** [[StateUpdate]] type for [[DAFFS]] state along with the [[String]] key > context. */ > > type DAFFSHU = StateUpdate[String, DAFFS] > > > > class DAFFOperatorStateBootstrapFunction > > extends KeyedStateBootstrapFunction[String, DAFFSHU] > > with CheckpointedFunction { > > > > override def open(parameters: Configuration): Unit = { > > super.open(parameters) > > val rtc: RuntimeContext = getRuntimeContext > > //keyed state setup: > > // cSC = rtc.getListState(new ListStateDescriptor[CSC](... > > // dAS = rtc.getListState(new ListStateDescriptor[DAS](... > > } > > > > override def processElement(value: DAFFSHU, ctx: > KeyedStateBootstrapFunction[String, DAFFSHU]#Context): Unit = { > > > > val daffs = value.state > > val ts = ctx.timerService() > > > > for (csc <- daffs._1) { > > cSC.add(csc) > > } > > for (das <- daffs._2) { > > dAS.add(das) > > } > > for (ffr <- daffs._3) { > > fFRState.add(ffr) > > } > > for (timer <- daffs._4) { > > ts.registerEventTimeTimer(timer) > > } > > > > val stop = 0 > > } > > > > @transient var fFRState: ListState[FFR] = null > > > > override def snapshotState(context: FunctionSnapshotContext): Unit = { > > } > > > > override def initializeState(context: FunctionInitializationContext): > Unit = { > > val fFRStateDescriptor = new ListStateDescriptor[FFR]("ffr", ffrTI) > > fFRState = > context.getOperatorStateStore.getUnionListState(fFRStateDescriptor) > > } > > } > > > > Hope this helps … > > > > Sincere greetings > > > > Thias > > > > > > *From:* David Artiga <david.art...@gmail.com> > *Sent:* Wednesday, March 22, 2023 11:31 AM > *To:* Hang Ruan <ruanhang1...@gmail.com> > *Cc:* user@flink.apache.org > *Subject:* Re: Bootstrapping multiple state within same operator > > > > ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠ > > > > Not familiar with the implementation but thinking some options: > > - composable transformations > - underlying MultiMap > - ... > > > > On Wed, Mar 22, 2023 at 10:24 AM Hang Ruan <ruanhang1...@gmail.com> wrote: > > Hi, David, > > I also read the code about the `SavepointWriter#withOperator`. The > transformations are stored in a `Map` whose key is `OperatorID`. I don't > come up with a way that we could register multi transformations for one > operator with the provided API. > > > > Maybe we need a new type of `XXXStateBootstrapFunction` to change more > states at one time. > > > > Best, > > Hang > > > > David Artiga <david.art...@gmail.com> 于2023年3月22日周三 15:22写道: > > We are using state > <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/libs/state_processor_api/> > processor > API > <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/libs/state_processor_api/> > to > bootstrap the state of some operators. It has been working fine until now, > when we tried to bootstrap an operator that has both a keyed state and a > broadcasted state. Seems the API does not provide a convenient method to > apply multiple transformations on the same *uid...* > > > > Is there a way to do that and we just missed it? Any insights appreciated. > > > > Cheers, > > /David > > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >