That sounds like a great hack :D
I'll give it a try for sure. Thank you!
/David

On Fri, Mar 24, 2023 at 5:25 PM Schwalbe Matthias <
matthias.schwa...@viseca.ch> wrote:

> Hi David,
>
> … coming in late into this discussion
>
>
>
> We had a very similar problem and I found a simple way to implement
> priming savepoints with mixed keyed/operator state.
>
> The trick is this:
>
>    - In your KeyedStateBootstrapFunction also implement
>    CheckpointedFunction
>    - In initializeState() you can initialize the broad state primitive
>    (the code skeleton below uses getUnionListState, same principle)
>    - Then in the processElement() function I process a tuple of state
>    collections for each state primitive, i.e. event object per key
>    - For the union list state I forge special key “broadcast”, and only
>    the 3rd tuple vector contains anything,
>       - (the upstream operator feeding into this bootstrap function makes
>       sure only one event with “broadcast” key is generated)
>
>
>
> Peruse the code skeleton (scala) if you like (I removed some stuff I’m not
> supposed to disclose):
>
>
>
>
>
> /** Event type for state updates for savepoint generation. A 4-tuple of
>
> * 1) vector of valid [[CSC]] entries
>
> * 2) a vector of valid [[DAS]] entries
>
> * 3) a vector of valid broadcast FFR.
>
> * 4) a vector of timers for state cleanup */
>
> type DAFFS = (Vector[CSC], Vector[DAS], Vector[FFR], Vector[Long])
>
>
>
> /** [[StateUpdate]] type for [[DAFFS]] state along with the [[String]] key
> context. */
>
> type DAFFSHU = StateUpdate[String, DAFFS]
>
>
>
> class DAFFOperatorStateBootstrapFunction
>
>   extends KeyedStateBootstrapFunction[String, DAFFSHU]
>
>     with CheckpointedFunction {
>
>
>
>   override def open(parameters: Configuration): Unit = {
>
>     super.open(parameters)
>
>     val rtc: RuntimeContext = getRuntimeContext
>
>     //keyed state setup:
>
>     // cSC = rtc.getListState(new ListStateDescriptor[CSC](...
>
>     // dAS = rtc.getListState(new ListStateDescriptor[DAS](...
>
>   }
>
>
>
>   override def processElement(value: DAFFSHU, ctx:
> KeyedStateBootstrapFunction[String, DAFFSHU]#Context): Unit = {
>
>
>
>     val daffs = value.state
>
>     val ts = ctx.timerService()
>
>
>
>     for (csc <- daffs._1) {
>
>       cSC.add(csc)
>
>     }
>
>     for (das <- daffs._2) {
>
>       dAS.add(das)
>
>     }
>
>     for (ffr <- daffs._3) {
>
>       fFRState.add(ffr)
>
>     }
>
>     for (timer <- daffs._4) {
>
>       ts.registerEventTimeTimer(timer)
>
>     }
>
>
>
>     val stop = 0
>
>   }
>
>
>
>   @transient var fFRState: ListState[FFR] = null
>
>
>
>   override def snapshotState(context: FunctionSnapshotContext): Unit = {
>
>   }
>
>
>
>   override def initializeState(context: FunctionInitializationContext):
> Unit = {
>
>     val fFRStateDescriptor = new ListStateDescriptor[FFR]("ffr", ffrTI)
>
>     fFRState =
> context.getOperatorStateStore.getUnionListState(fFRStateDescriptor)
>
>   }
>
> }
>
>
>
> Hope this helps …
>
>
>
> Sincere greetings
>
>
>
> Thias
>
>
>
>
>
> *From:* David Artiga <david.art...@gmail.com>
> *Sent:* Wednesday, March 22, 2023 11:31 AM
> *To:* Hang Ruan <ruanhang1...@gmail.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: Bootstrapping multiple state within same operator
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Not familiar with the implementation but thinking some options:
>
> - composable transformations
> - underlying MultiMap
> - ...
>
>
>
> On Wed, Mar 22, 2023 at 10:24 AM Hang Ruan <ruanhang1...@gmail.com> wrote:
>
> Hi, David,
>
> I also read the code about the `SavepointWriter#withOperator`. The
> transformations are stored in a `Map` whose key is `OperatorID`. I don't
> come up with a way that we could register multi transformations for one
> operator with the provided API.
>
>
>
> Maybe we need a new type of  `XXXStateBootstrapFunction` to change more
> states at one time.
>
>
>
> Best,
>
> Hang
>
>
>
> David Artiga <david.art...@gmail.com> 于2023年3月22日周三 15:22写道:
>
> We are using state
> <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/libs/state_processor_api/>
>  processor
> API
> <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/libs/state_processor_api/>
>  to
> bootstrap the state of some operators. It has been working fine until now,
> when we tried to bootstrap an operator that has both a keyed state and a
> broadcasted state. Seems the API does not provide a convenient method to
> apply multiple transformations on the same *uid...*
>
>
>
> Is there a way to do that and we just missed it? Any insights appreciated.
>
>
>
> Cheers,
>
> /David
>
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to