Hi David, … coming in late into this discussion We had a very similar problem and I found a simple way to implement priming savepoints with mixed keyed/operator state. The trick is this:
* In your KeyedStateBootstrapFunction also implement CheckpointedFunction * In initializeState() you can initialize the broad state primitive (the code skeleton below uses getUnionListState, same principle) * Then in the processElement() function I process a tuple of state collections for each state primitive, i.e. event object per key * For the union list state I forge special key “broadcast”, and only the 3rd tuple vector contains anything, * (the upstream operator feeding into this bootstrap function makes sure only one event with “broadcast” key is generated) Peruse the code skeleton (scala) if you like (I removed some stuff I’m not supposed to disclose): /** Event type for state updates for savepoint generation. A 4-tuple of * 1) vector of valid [[CSC]] entries * 2) a vector of valid [[DAS]] entries * 3) a vector of valid broadcast FFR. * 4) a vector of timers for state cleanup */ type DAFFS = (Vector[CSC], Vector[DAS], Vector[FFR], Vector[Long]) /** [[StateUpdate]] type for [[DAFFS]] state along with the [[String]] key context. */ type DAFFSHU = StateUpdate[String, DAFFS] class DAFFOperatorStateBootstrapFunction extends KeyedStateBootstrapFunction[String, DAFFSHU] with CheckpointedFunction { override def open(parameters: Configuration): Unit = { super.open(parameters) val rtc: RuntimeContext = getRuntimeContext //keyed state setup: // cSC = rtc.getListState(new ListStateDescriptor[CSC](... // dAS = rtc.getListState(new ListStateDescriptor[DAS](... } override def processElement(value: DAFFSHU, ctx: KeyedStateBootstrapFunction[String, DAFFSHU]#Context): Unit = { val daffs = value.state val ts = ctx.timerService() for (csc <- daffs._1) { cSC.add(csc) } for (das <- daffs._2) { dAS.add(das) } for (ffr <- daffs._3) { fFRState.add(ffr) } for (timer <- daffs._4) { ts.registerEventTimeTimer(timer) } val stop = 0 } @transient var fFRState: ListState[FFR] = null override def snapshotState(context: FunctionSnapshotContext): Unit = { } override def initializeState(context: FunctionInitializationContext): Unit = { val fFRStateDescriptor = new ListStateDescriptor[FFR]("ffr", ffrTI) fFRState = context.getOperatorStateStore.getUnionListState(fFRStateDescriptor) } } Hope this helps … Sincere greetings Thias From: David Artiga <david.art...@gmail.com> Sent: Wednesday, March 22, 2023 11:31 AM To: Hang Ruan <ruanhang1...@gmail.com> Cc: user@flink.apache.org Subject: Re: Bootstrapping multiple state within same operator ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠ Not familiar with the implementation but thinking some options: - composable transformations - underlying MultiMap - ... On Wed, Mar 22, 2023 at 10:24 AM Hang Ruan <ruanhang1...@gmail.com<mailto:ruanhang1...@gmail.com>> wrote: Hi, David, I also read the code about the `SavepointWriter#withOperator`. The transformations are stored in a `Map` whose key is `OperatorID`. I don't come up with a way that we could register multi transformations for one operator with the provided API. Maybe we need a new type of `XXXStateBootstrapFunction` to change more states at one time. Best, Hang David Artiga <david.art...@gmail.com<mailto:david.art...@gmail.com>> 于2023年3月22日周三 15:22写道: We are using state<https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/libs/state_processor_api/> processor API<https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/libs/state_processor_api/> to bootstrap the state of some operators. It has been working fine until now, when we tried to bootstrap an operator that has both a keyed state and a broadcasted state. Seems the API does not provide a convenient method to apply multiple transformations on the same uid... Is there a way to do that and we just missed it? Any insights appreciated. Cheers, /David Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.