Hi Vino,

Thanks for your reply !

The key of my input data is same value. So I think there is only one partition.

And Why sometimes I can get the value stored in the ValueState before update?
> > > > before update value : 3
> > > > after update value: 4

What’s more, How can I stored the previous value so that I can get the value 
when next element come in and invoke the onElement method?



Best  regards
Utopia
在 2019年12月18日 +0800 21:57,vino yang <yanghua1...@gmail.com>,写道:
> Hi Utopia,
>
> The behavior may be correct.
>
> First, the default value is null. It's the correct value. 
> `ValueStateDescriptor` has multiple constructors, some of them can let you 
> specify a default value. However, these constructors are deprecated. And the 
> doc does not recommend them.[1] For the other constructors which can not 
> specify default values, it would be null.
>
> Second, before the window, there is a `keyBy` operation. it will partition 
> your data. For each partition, the default value state is null.
>
> Best,
> Vino
>
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/index.html?org/apache/flink/api/common/state/ValueStateDescriptor.html
>
> > Utopia <gejunwei...@gmail.com> 于2019年12月18日周三 下午7:20写道:
> > > Hi,
> > >
> > > I want to get the last value stored in ValueState when processing element 
> > > in Trigger.
> > >
> > > But as the log shows that sometimes I can get the value, sometimes not.
> > >
> > > Only one key in my data(SensorReading).
> > >
> > > ValueState:
> > > class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] {
> > >
> > > private lazy val descriptor = new ValueStateDescriptor[Long]("desc", 
> > > classOf[Long])
> > > var value = 1
> > > override def onElement( r: SensorReading, timestamp: Long, window: 
> > > TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
> > >
> > >   println("before update value : " + 
> > > ctx.getPartitionedState(descriptor).value())
> > >
> > >    ctx.getPartitionedState(descriptor).update(value)
> > >
> > >    value += 1
> > >
> > >    println("after update value: " + 
> > > ctx.getPartitionedState(descriptor).value())
> > >
> > >    ctx.registerProcessingTimeTimer(window.maxTimestamp)
> > >    TriggerResult.CONTINUE
> > > }
> > >
> > > override def onEventTime(time: Long, window: TimeWindow, ctx: 
> > > Trigger.TriggerContext) = TriggerResult.CONTINUE
> > >
> > > override def onProcessingTime(time: Long, window: TimeWindow, ctx: 
> > > Trigger.TriggerContext) = TriggerResult.FIRE
> > >
> > > override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit 
> > > = {
> > >    ctx.deleteProcessingTimeTimer(window.maxTimestamp)
> > >  }
> > >
> > > override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): 
> > > Unit = {
> > >    val windowMaxTimestamp = window.maxTimestamp
> > >    if (windowMaxTimestamp > ctx.getCurrentProcessingTime) 
> > > ctx.registerProcessingTimeTimer(windowMaxTimestamp)
> > >  }
> > >
> > > override def canMerge: Boolean = true
> > >
> > > }
> > >
> > > Main process:
> > > object MyCustomWindows {
> > >
> > > def main(args: Array[String]): Unit = {
> > >
> > >    val env = StreamExecutionEnvironment.getExecutionEnvironment
> > >    env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
> > >    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> > >    env.getConfig.setAutoWatermarkInterval(1000L)
> > >
> > >    val sensorData: DataStream[SensorReading] = env
> > >      .addSource(new SensorSource)
> > >      .assignTimestampsAndWatermarks(new SensorTimeAssigner)
> > >
> > >    val countsPerThirtySecs = sensorData
> > >      .keyBy(_.id)
> > >     .window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000)))
> > >      .trigger(new ProcessingTimeTrigger)
> > >      .process(new CountFunction)
> > >
> > >    env.execute()
> > >  }
> > > }
> > >
> > > Log results:
> > >
> > > > before update value : null
> > > > after update value: 1
> > > > before update value : null
> > > > after update value: 2
> > > > before update value : null
> > > > after update value: 3
> > > > before update value : 3
> > > > after update value: 4
> > > > before update value : null
> > > > after update value: 5
> > > > before update value : null
> > > > after update value: 6
> > > > before update value : null
> > > > after update value: 7
> > > > before update value : null
> > > > after update value: 8
> > > > before update value : null
> > > > after update value: 9
> > > > before update value : 9
> > > > after update value: 10
> > >
> > >
> > > Best  regards
> > > Utopia

Reply via email to