Re: Cannot get value from ValueState in ProcessingTimeTrigger
Hi Utopia, IMO, your analysis is correct. Best, Vino Utopia 于2019年12月19日周四 上午12:44写道: > Hi Vino, > > Maybe it is due to the type of window. What I used is > ProcessingTimeSessionWindows, while keyedState is scoped to *window and > key*. Window changes so that the ValueState is different. > > Best regards > Utopia > 在 2019年12月18日 +0800 22:30,Utopia ,写道: > > Hi Vino, > > Thanks for your reply ! > > The key of my input data is same value. So I think there is only one > partition. > > And Why sometimes I can get the value stored in the ValueState before > update? > > before update value : 3 >> >> after update value: 4 >> >> > What’s more, How can I stored the previous value so that I can get the > value when next element come in and invoke the onElement method? > > > > Best regards > Utopia > 在 2019年12月18日 +0800 21:57,vino yang ,写道: > > Hi Utopia, > > The behavior may be correct. > > First, the default value is null. It's the correct value. > `ValueStateDescriptor` has multiple constructors, some of them can let you > specify a default value. However, these constructors are deprecated. And > the doc does not recommend them.[1] For the other constructors which can > not specify default values, it would be null. > > Second, before the window, there is a `keyBy` operation. it will partition > your data. For each partition, the default value state is null. > > Best, > Vino > > [1]: > https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/index.html?org/apache/flink/api/common/state/ValueStateDescriptor.html > > Utopia 于2019年12月18日周三 下午7:20写道: > >> Hi, >> >> I want to get the last value stored in ValueState when processing element >> in Trigger. >> >> But as the log shows that sometimes I can get the value, sometimes not. >> >> Only one key in my data(SensorReading). >> >> ValueState: >> >> class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] { >> >> private lazy val descriptor = new ValueStateDescriptor[Long]("desc", >> classOf[Long]) >> >> var value = 1 >> >> override def onElement( r: SensorReading, timestamp: Long, window: >> TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = { >> >> println("before update value : " + >> ctx.getPartitionedState(descriptor).value()) >> >> ctx.getPartitionedState(descriptor).update(value) >> >> value += 1 >> >> println("after update value: " + >> ctx.getPartitionedState(descriptor).value()) >> >> ctx.registerProcessingTimeTimer(window.maxTimestamp) >> TriggerResult.CONTINUE >> } >> >> override def onEventTime(time: Long, window: TimeWindow, ctx: >> Trigger.TriggerContext) = TriggerResult.CONTINUE >> >> override def onProcessingTime(time: Long, window: TimeWindow, ctx: >> Trigger.TriggerContext) = TriggerResult.FIRE >> >> override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit >> = { >> ctx.deleteProcessingTimeTimer(window.maxTimestamp) >> } >> >> override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): >> Unit = { >> val windowMaxTimestamp = window.maxTimestamp >> if (windowMaxTimestamp > ctx.getCurrentProcessingTime) >> ctx.registerProcessingTimeTimer(windowMaxTimestamp) >> } >> >> override def canMerge: Boolean = true >> >> } >> >> >> Main process: >> >> object MyCustomWindows { >> >> def main(args: Array[String]): Unit = { >> >> val env = StreamExecutionEnvironment.getExecutionEnvironment >> env.getCheckpointConfig.setCheckpointInterval(10 * 1000) >> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> env.getConfig.setAutoWatermarkInterval(1000L) >> >> val sensorData: DataStream[SensorReading] = env >> .addSource(new SensorSource) >> .assignTimestampsAndWatermarks(new SensorTimeAssigner) >> >> val countsPerThirtySecs = sensorData >> .keyBy(_.id) >> .window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000))) >> .trigger(new ProcessingTimeTrigger) >> .process(new CountFunction) >> >> env.execute() >> } >> } >> >> >> Log results: >> >> before update value : null >> after update value: 1 >> before update value : null >> after update value: 2 >> before update value : null >> after update value: 3 >> before update value : 3 >> after update value: 4 >> before update value : null >> after update value: 5 >> before update value : null >> after update value: 6 >> before update value : null >> after update value: 7 >> before update value : null >> after update value: 8 >> before update value : null >> after update value: 9 >> before update value : 9 >> after update value: 10 >> >> >> >> Best regards >> Utopia >> >
Re: Cannot get value from ValueState in ProcessingTimeTrigger
Hi Vino, Maybe it is due to the type of window. What I used is ProcessingTimeSessionWindows, while keyedState is scoped to window and key. Window changes so that the ValueState is different. Best regards Utopia 在 2019年12月18日 +0800 22:30,Utopia ,写道: > Hi Vino, > > Thanks for your reply ! > > The key of my input data is same value. So I think there is only one > partition. > > And Why sometimes I can get the value stored in the ValueState before update? > > > > > before update value : 3 > > > > > after update value: 4 > > What’s more, How can I stored the previous value so that I can get the value > when next element come in and invoke the onElement method? > > > > Best regards > Utopia > 在 2019年12月18日 +0800 21:57,vino yang ,写道: > > Hi Utopia, > > > > The behavior may be correct. > > > > First, the default value is null. It's the correct value. > > `ValueStateDescriptor` has multiple constructors, some of them can let you > > specify a default value. However, these constructors are deprecated. And > > the doc does not recommend them.[1] For the other constructors which can > > not specify default values, it would be null. > > > > Second, before the window, there is a `keyBy` operation. it will partition > > your data. For each partition, the default value state is null. > > > > Best, > > Vino > > > > [1]: > > https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/index.html?org/apache/flink/api/common/state/ValueStateDescriptor.html > > > > > Utopia 于2019年12月18日周三 下午7:20写道: > > > > Hi, > > > > > > > > I want to get the last value stored in ValueState when processing > > > > element in Trigger. > > > > > > > > But as the log shows that sometimes I can get the value, sometimes not. > > > > > > > > Only one key in my data(SensorReading). > > > > > > > > ValueState: > > > > class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] { > > > > > > > > private lazy val descriptor = new ValueStateDescriptor[Long]("desc", > > > > classOf[Long]) > > > > var value = 1 > > > > override def onElement( r: SensorReading, timestamp: Long, window: > > > > TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = { > > > > > > > > println("before update value : " + > > > > ctx.getPartitionedState(descriptor).value()) > > > > > > > >ctx.getPartitionedState(descriptor).update(value) > > > > > > > >value += 1 > > > > > > > >println("after update value: " + > > > > ctx.getPartitionedState(descriptor).value()) > > > > > > > >ctx.registerProcessingTimeTimer(window.maxTimestamp) > > > >TriggerResult.CONTINUE > > > > } > > > > > > > > override def onEventTime(time: Long, window: TimeWindow, ctx: > > > > Trigger.TriggerContext) = TriggerResult.CONTINUE > > > > > > > > override def onProcessingTime(time: Long, window: TimeWindow, ctx: > > > > Trigger.TriggerContext) = TriggerResult.FIRE > > > > > > > > override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): > > > > Unit = { > > > >ctx.deleteProcessingTimeTimer(window.maxTimestamp) > > > > } > > > > > > > > override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): > > > > Unit = { > > > >val windowMaxTimestamp = window.maxTimestamp > > > >if (windowMaxTimestamp > ctx.getCurrentProcessingTime) > > > > ctx.registerProcessingTimeTimer(windowMaxTimestamp) > > > > } > > > > > > > > override def canMerge: Boolean = true > > > > > > > > } > > > > > > > > Main process: > > > > object MyCustomWindows { > > > > > > > > def main(args: Array[String]): Unit = { > > > > > > > >val env = StreamExecutionEnvironment.getExecutionEnvironment > > > >env.getCheckpointConfig.setCheckpointInterval(10 * 1000) > > > >env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > > > >env.getConfig.setAutoWatermarkInterval(1000L) > > > > > > > >val sensorData: DataStream[SensorReading] = env > > > > .addSource(new SensorSource) > > > > .assignTimestampsAndWatermarks(new SensorTimeAssigner) > > > > > > > >val countsPerThirtySecs = sensorData > > > > .keyBy(_.id) > > > > .window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000))) > > > > .trigger(new ProcessingTimeTrigger) > > > > .process(new CountFunction) > > > > > > > >env.execute() > > > > } > > > > } > > > > > > > > Log results: > > > > > > > > > before update value : null > > > > > after update value: 1 > > > > > before update value : null > > > > > after update value: 2 > > > > > before update value : null > > > > > after update value: 3 > > > > > before update value : 3 > > > > > after update value: 4 > > > > > before update value : null > > > > > after update value: 5 > > > > > before update value : null > > > > > after update value: 6 > > > > > before update value : null > > > > > after update value: 7 > > > > > before update value : null > > > > > after update value: 8 > > > > > before update value : null > > > > > after update value: 9 > > > > > before update value : 9 > > > >
Re: Cannot get value from ValueState in ProcessingTimeTrigger
Hi Vino, Thanks for your reply ! The key of my input data is same value. So I think there is only one partition. And Why sometimes I can get the value stored in the ValueState before update? > > > > before update value : 3 > > > > after update value: 4 What’s more, How can I stored the previous value so that I can get the value when next element come in and invoke the onElement method? Best regards Utopia 在 2019年12月18日 +0800 21:57,vino yang ,写道: > Hi Utopia, > > The behavior may be correct. > > First, the default value is null. It's the correct value. > `ValueStateDescriptor` has multiple constructors, some of them can let you > specify a default value. However, these constructors are deprecated. And the > doc does not recommend them.[1] For the other constructors which can not > specify default values, it would be null. > > Second, before the window, there is a `keyBy` operation. it will partition > your data. For each partition, the default value state is null. > > Best, > Vino > > [1]: > https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/index.html?org/apache/flink/api/common/state/ValueStateDescriptor.html > > > Utopia 于2019年12月18日周三 下午7:20写道: > > > Hi, > > > > > > I want to get the last value stored in ValueState when processing element > > > in Trigger. > > > > > > But as the log shows that sometimes I can get the value, sometimes not. > > > > > > Only one key in my data(SensorReading). > > > > > > ValueState: > > > class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] { > > > > > > private lazy val descriptor = new ValueStateDescriptor[Long]("desc", > > > classOf[Long]) > > > var value = 1 > > > override def onElement( r: SensorReading, timestamp: Long, window: > > > TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = { > > > > > > println("before update value : " + > > > ctx.getPartitionedState(descriptor).value()) > > > > > >ctx.getPartitionedState(descriptor).update(value) > > > > > >value += 1 > > > > > >println("after update value: " + > > > ctx.getPartitionedState(descriptor).value()) > > > > > >ctx.registerProcessingTimeTimer(window.maxTimestamp) > > >TriggerResult.CONTINUE > > > } > > > > > > override def onEventTime(time: Long, window: TimeWindow, ctx: > > > Trigger.TriggerContext) = TriggerResult.CONTINUE > > > > > > override def onProcessingTime(time: Long, window: TimeWindow, ctx: > > > Trigger.TriggerContext) = TriggerResult.FIRE > > > > > > override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit > > > = { > > >ctx.deleteProcessingTimeTimer(window.maxTimestamp) > > > } > > > > > > override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): > > > Unit = { > > >val windowMaxTimestamp = window.maxTimestamp > > >if (windowMaxTimestamp > ctx.getCurrentProcessingTime) > > > ctx.registerProcessingTimeTimer(windowMaxTimestamp) > > > } > > > > > > override def canMerge: Boolean = true > > > > > > } > > > > > > Main process: > > > object MyCustomWindows { > > > > > > def main(args: Array[String]): Unit = { > > > > > >val env = StreamExecutionEnvironment.getExecutionEnvironment > > >env.getCheckpointConfig.setCheckpointInterval(10 * 1000) > > >env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > > >env.getConfig.setAutoWatermarkInterval(1000L) > > > > > >val sensorData: DataStream[SensorReading] = env > > > .addSource(new SensorSource) > > > .assignTimestampsAndWatermarks(new SensorTimeAssigner) > > > > > >val countsPerThirtySecs = sensorData > > > .keyBy(_.id) > > > .window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000))) > > > .trigger(new ProcessingTimeTrigger) > > > .process(new CountFunction) > > > > > >env.execute() > > > } > > > } > > > > > > Log results: > > > > > > > before update value : null > > > > after update value: 1 > > > > before update value : null > > > > after update value: 2 > > > > before update value : null > > > > after update value: 3 > > > > before update value : 3 > > > > after update value: 4 > > > > before update value : null > > > > after update value: 5 > > > > before update value : null > > > > after update value: 6 > > > > before update value : null > > > > after update value: 7 > > > > before update value : null > > > > after update value: 8 > > > > before update value : null > > > > after update value: 9 > > > > before update value : 9 > > > > after update value: 10 > > > > > > > > > Best regards > > > Utopia
Re: Cannot get value from ValueState in ProcessingTimeTrigger
Hi Utopia, The behavior may be correct. First, the default value is null. It's the correct value. `ValueStateDescriptor` has multiple constructors, some of them can let you specify a default value. However, these constructors are deprecated. And the doc does not recommend them.[1] For the other constructors which can not specify default values, it would be null. Second, before the window, there is a `keyBy` operation. it will partition your data. For each partition, the default value state is null. Best, Vino [1]: https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/index.html?org/apache/flink/api/common/state/ValueStateDescriptor.html Utopia 于2019年12月18日周三 下午7:20写道: > Hi, > > I want to get the last value stored in ValueState when processing element > in Trigger. > > But as the log shows that sometimes I can get the value, sometimes not. > > Only one key in my data(SensorReading). > > ValueState: > > class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] { > > private lazy val descriptor = new ValueStateDescriptor[Long]("desc", > classOf[Long]) > > var value = 1 > > override def onElement( r: SensorReading, timestamp: Long, window: > TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = { > > println("before update value : " + > ctx.getPartitionedState(descriptor).value()) > > ctx.getPartitionedState(descriptor).update(value) > > value += 1 > > println("after update value: " + > ctx.getPartitionedState(descriptor).value()) > > ctx.registerProcessingTimeTimer(window.maxTimestamp) > TriggerResult.CONTINUE > } > > override def onEventTime(time: Long, window: TimeWindow, ctx: > Trigger.TriggerContext) = TriggerResult.CONTINUE > > override def onProcessingTime(time: Long, window: TimeWindow, ctx: > Trigger.TriggerContext) = TriggerResult.FIRE > > override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = > { > ctx.deleteProcessingTimeTimer(window.maxTimestamp) > } > > override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): Unit > = { > val windowMaxTimestamp = window.maxTimestamp > if (windowMaxTimestamp > ctx.getCurrentProcessingTime) > ctx.registerProcessingTimeTimer(windowMaxTimestamp) > } > > override def canMerge: Boolean = true > > } > > > Main process: > > object MyCustomWindows { > > def main(args: Array[String]): Unit = { > > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.getCheckpointConfig.setCheckpointInterval(10 * 1000) > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > env.getConfig.setAutoWatermarkInterval(1000L) > > val sensorData: DataStream[SensorReading] = env > .addSource(new SensorSource) > .assignTimestampsAndWatermarks(new SensorTimeAssigner) > > val countsPerThirtySecs = sensorData > .keyBy(_.id) > .window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000))) > .trigger(new ProcessingTimeTrigger) > .process(new CountFunction) > > env.execute() > } > } > > > Log results: > > before update value : null > after update value: 1 > before update value : null > after update value: 2 > before update value : null > after update value: 3 > before update value : 3 > after update value: 4 > before update value : null > after update value: 5 > before update value : null > after update value: 6 > before update value : null > after update value: 7 > before update value : null > after update value: 8 > before update value : null > after update value: 9 > before update value : 9 > after update value: 10 > > > > Best regards > Utopia >
Cannot get value from ValueState in ProcessingTimeTrigger
Hi, I want to get the last value stored in ValueState when processing element in Trigger. But as the log shows that sometimes I can get the value, sometimes not. Only one key in my data(SensorReading). ValueState: class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] { private lazy val descriptor = new ValueStateDescriptor[Long]("desc", classOf[Long]) var value = 1 override def onElement( r: SensorReading, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = { println("before update value : " + ctx.getPartitionedState(descriptor).value()) ctx.getPartitionedState(descriptor).update(value) value += 1 println("after update value: " + ctx.getPartitionedState(descriptor).value()) ctx.registerProcessingTimeTimer(window.maxTimestamp) TriggerResult.CONTINUE } override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext) = TriggerResult.CONTINUE override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext) = TriggerResult.FIRE override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = { ctx.deleteProcessingTimeTimer(window.maxTimestamp) } override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): Unit = { val windowMaxTimestamp = window.maxTimestamp if (windowMaxTimestamp > ctx.getCurrentProcessingTime) ctx.registerProcessingTimeTimer(windowMaxTimestamp) } override def canMerge: Boolean = true } Main process: object MyCustomWindows { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.getCheckpointConfig.setCheckpointInterval(10 * 1000) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getConfig.setAutoWatermarkInterval(1000L) val sensorData: DataStream[SensorReading] = env .addSource(new SensorSource) .assignTimestampsAndWatermarks(new SensorTimeAssigner) val countsPerThirtySecs = sensorData .keyBy(_.id) .window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000))) .trigger(new ProcessingTimeTrigger) .process(new CountFunction) env.execute() } } Log results: before update value : null after update value: 1 before update value : null after update value: 2 before update value : null after update value: 3 before update value : 3 after update value: 4 before update value : null after update value: 5 before update value : null after update value: 6 before update value : null after update value: 7 before update value : null after update value: 8 before update value : null after update value: 9 before update value : 9 after update value: 10 Best regards Utopia