Hi David,
… coming in late into this discussion

We had a very similar problem and I found a simple way to implement priming 
savepoints with mixed keyed/operator state.
The trick is this:

  *   In your KeyedStateBootstrapFunction also implement CheckpointedFunction
  *   In initializeState() you can initialize the broad state primitive (the 
code skeleton below uses getUnionListState, same principle)
  *   Then in the processElement() function I process a tuple of state 
collections for each state primitive, i.e. event object per key
  *   For the union list state I forge special key “broadcast”, and only the 
3rd tuple vector contains anything,
     *   (the upstream operator feeding into this bootstrap function makes sure 
only one event with “broadcast” key is generated)

Peruse the code skeleton (scala) if you like (I removed some stuff I’m not 
supposed to disclose):


/** Event type for state updates for savepoint generation. A 4-tuple of
* 1) vector of valid [[CSC]] entries
* 2) a vector of valid [[DAS]] entries
* 3) a vector of valid broadcast FFR.
* 4) a vector of timers for state cleanup */
type DAFFS = (Vector[CSC], Vector[DAS], Vector[FFR], Vector[Long])

/** [[StateUpdate]] type for [[DAFFS]] state along with the [[String]] key 
context. */
type DAFFSHU = StateUpdate[String, DAFFS]

class DAFFOperatorStateBootstrapFunction
  extends KeyedStateBootstrapFunction[String, DAFFSHU]
    with CheckpointedFunction {

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)
    val rtc: RuntimeContext = getRuntimeContext
    //keyed state setup:
    // cSC = rtc.getListState(new ListStateDescriptor[CSC](...
    // dAS = rtc.getListState(new ListStateDescriptor[DAS](...
  }

  override def processElement(value: DAFFSHU, ctx: 
KeyedStateBootstrapFunction[String, DAFFSHU]#Context): Unit = {

    val daffs = value.state
    val ts = ctx.timerService()

    for (csc <- daffs._1) {
      cSC.add(csc)
    }
    for (das <- daffs._2) {
      dAS.add(das)
    }
    for (ffr <- daffs._3) {
      fFRState.add(ffr)
    }
    for (timer <- daffs._4) {
      ts.registerEventTimeTimer(timer)
    }

    val stop = 0
  }

  @transient var fFRState: ListState[FFR] = null

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    val fFRStateDescriptor = new ListStateDescriptor[FFR]("ffr", ffrTI)
    fFRState = 
context.getOperatorStateStore.getUnionListState(fFRStateDescriptor)
  }
}

Hope this helps …

Sincere greetings

Thias


From: David Artiga <david.art...@gmail.com>
Sent: Wednesday, March 22, 2023 11:31 AM
To: Hang Ruan <ruanhang1...@gmail.com>
Cc: user@flink.apache.org
Subject: Re: Bootstrapping multiple state within same operator

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Not familiar with the implementation but thinking some options:

- composable transformations
- underlying MultiMap
- ...

On Wed, Mar 22, 2023 at 10:24 AM Hang Ruan 
<ruanhang1...@gmail.com<mailto:ruanhang1...@gmail.com>> wrote:
Hi, David,
I also read the code about the `SavepointWriter#withOperator`. The 
transformations are stored in a `Map` whose key is `OperatorID`. I don't come 
up with a way that we could register multi transformations for one operator 
with the provided API.

Maybe we need a new type of  `XXXStateBootstrapFunction` to change more states 
at one time.

Best,
Hang

David Artiga <david.art...@gmail.com<mailto:david.art...@gmail.com>> 
于2023年3月22日周三 15:22写道:
We are using 
state<https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/libs/state_processor_api/>
 processor 
API<https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/libs/state_processor_api/>
 to bootstrap the state of some operators. It has been working fine until now, 
when we tried to bootstrap an operator that has both a keyed state and a 
broadcasted state. Seems the API does not provide a convenient method to apply 
multiple transformations on the same uid...

Is there a way to do that and we just missed it? Any insights appreciated.

Cheers,
/David
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to