Re: Cannot get value from ValueState in ProcessingTimeTrigger

2019-12-18 Thread vino yang
Hi Utopia,

IMO, your analysis is correct.

Best,
Vino

Utopia  于2019年12月19日周四 上午12:44写道:

> Hi Vino,
>
> Maybe it is due to the type of window. What I used is
> ProcessingTimeSessionWindows, while keyedState is scoped to *window and
> key*. Window changes so that the ValueState is different.
>
> Best  regards
> Utopia
> 在 2019年12月18日 +0800 22:30,Utopia ,写道:
>
> Hi Vino,
>
> Thanks for your reply !
>
> The key of my input data is same value. So I think there is only one
> partition.
>
> And Why sometimes I can get the value stored in the ValueState before
> update?
>
> before update value : 3
>>
>> after update value: 4
>>
>>
> What’s more, How can I stored the previous value so that I can get the
> value when next element come in and invoke the onElement method?
>
>
>
> Best  regards
> Utopia
> 在 2019年12月18日 +0800 21:57,vino yang ,写道:
>
> Hi Utopia,
>
> The behavior may be correct.
>
> First, the default value is null. It's the correct value.
> `ValueStateDescriptor` has multiple constructors, some of them can let you
> specify a default value. However, these constructors are deprecated. And
> the doc does not recommend them.[1] For the other constructors which can
> not specify default values, it would be null.
>
> Second, before the window, there is a `keyBy` operation. it will partition
> your data. For each partition, the default value state is null.
>
> Best,
> Vino
>
> [1]:
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/index.html?org/apache/flink/api/common/state/ValueStateDescriptor.html
>
> Utopia  于2019年12月18日周三 下午7:20写道:
>
>> Hi,
>>
>> I want to get the last value stored in ValueState when processing element
>> in Trigger.
>>
>> But as the log shows that sometimes I can get the value, sometimes not.
>>
>> Only one key in my data(SensorReading).
>>
>> ValueState:
>>
>> class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] {
>>
>>   private lazy val descriptor = new ValueStateDescriptor[Long]("desc", 
>> classOf[Long])
>>
>>   var value = 1
>>
>>   override def onElement( r: SensorReading, timestamp: Long, window: 
>> TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
>>
>> println("before update value : " + 
>> ctx.getPartitionedState(descriptor).value())
>>
>> ctx.getPartitionedState(descriptor).update(value)
>>
>> value += 1
>>
>> println("after update value: " + 
>> ctx.getPartitionedState(descriptor).value())
>>
>> ctx.registerProcessingTimeTimer(window.maxTimestamp)
>> TriggerResult.CONTINUE
>>   }
>>
>>   override def onEventTime(time: Long, window: TimeWindow, ctx: 
>> Trigger.TriggerContext) = TriggerResult.CONTINUE
>>
>>   override def onProcessingTime(time: Long, window: TimeWindow, ctx: 
>> Trigger.TriggerContext) = TriggerResult.FIRE
>>
>>   override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit 
>> = {
>> ctx.deleteProcessingTimeTimer(window.maxTimestamp)
>>   }
>>
>>   override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): 
>> Unit = {
>> val windowMaxTimestamp = window.maxTimestamp
>> if (windowMaxTimestamp > ctx.getCurrentProcessingTime) 
>> ctx.registerProcessingTimeTimer(windowMaxTimestamp)
>>   }
>>
>>   override def canMerge: Boolean = true
>>
>> }
>>
>>
>> Main process:
>>
>> object MyCustomWindows {
>>
>>   def main(args: Array[String]): Unit = {
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> env.getConfig.setAutoWatermarkInterval(1000L)
>>
>> val sensorData: DataStream[SensorReading] = env
>>   .addSource(new SensorSource)
>>   .assignTimestampsAndWatermarks(new SensorTimeAssigner)
>>
>> val countsPerThirtySecs = sensorData
>>   .keyBy(_.id)
>>   .window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000)))
>>   .trigger(new ProcessingTimeTrigger)
>>   .process(new CountFunction)
>>
>> env.execute()
>>   }
>> }
>>
>>
>> Log results:
>>
>> before update value : null
>> after update value: 1
>> before update value : null
>> after update value: 2
>> before update value : null
>> after update value: 3
>> before update value : 3
>> after update value: 4
>> before update value : null
>> after update value: 5
>> before update value : null
>> after update value: 6
>> before update value : null
>> after update value: 7
>> before update value : null
>> after update value: 8
>> before update value : null
>> after update value: 9
>> before update value : 9
>> after update value: 10
>>
>>
>>
>> Best  regards
>> Utopia
>>
>


Re: Cannot get value from ValueState in ProcessingTimeTrigger

2019-12-18 Thread Utopia
Hi Vino,

Maybe it is due to the type of window. What I used is 
ProcessingTimeSessionWindows, while keyedState is scoped to window and key. 
Window changes so that the ValueState is different.

Best  regards
Utopia
在 2019年12月18日 +0800 22:30,Utopia ,写道:
> Hi Vino,
>
> Thanks for your reply !
>
> The key of my input data is same value. So I think there is only one 
> partition.
>
> And Why sometimes I can get the value stored in the ValueState before update?
> > > > > before update value : 3
> > > > > after update value: 4
>
> What’s more, How can I stored the previous value so that I can get the value 
> when next element come in and invoke the onElement method?
>
>
>
> Best  regards
> Utopia
> 在 2019年12月18日 +0800 21:57,vino yang ,写道:
> > Hi Utopia,
> >
> > The behavior may be correct.
> >
> > First, the default value is null. It's the correct value. 
> > `ValueStateDescriptor` has multiple constructors, some of them can let you 
> > specify a default value. However, these constructors are deprecated. And 
> > the doc does not recommend them.[1] For the other constructors which can 
> > not specify default values, it would be null.
> >
> > Second, before the window, there is a `keyBy` operation. it will partition 
> > your data. For each partition, the default value state is null.
> >
> > Best,
> > Vino
> >
> > [1]: 
> > https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/index.html?org/apache/flink/api/common/state/ValueStateDescriptor.html
> >
> > > Utopia  于2019年12月18日周三 下午7:20写道:
> > > > Hi,
> > > >
> > > > I want to get the last value stored in ValueState when processing 
> > > > element in Trigger.
> > > >
> > > > But as the log shows that sometimes I can get the value, sometimes not.
> > > >
> > > > Only one key in my data(SensorReading).
> > > >
> > > > ValueState:
> > > > class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] {
> > > >
> > > > private lazy val descriptor = new ValueStateDescriptor[Long]("desc", 
> > > > classOf[Long])
> > > > var value = 1
> > > > override def onElement( r: SensorReading, timestamp: Long, window: 
> > > > TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
> > > >
> > > >   println("before update value : " + 
> > > > ctx.getPartitionedState(descriptor).value())
> > > >
> > > >ctx.getPartitionedState(descriptor).update(value)
> > > >
> > > >value += 1
> > > >
> > > >println("after update value: " + 
> > > > ctx.getPartitionedState(descriptor).value())
> > > >
> > > >ctx.registerProcessingTimeTimer(window.maxTimestamp)
> > > >TriggerResult.CONTINUE
> > > > }
> > > >
> > > > override def onEventTime(time: Long, window: TimeWindow, ctx: 
> > > > Trigger.TriggerContext) = TriggerResult.CONTINUE
> > > >
> > > > override def onProcessingTime(time: Long, window: TimeWindow, ctx: 
> > > > Trigger.TriggerContext) = TriggerResult.FIRE
> > > >
> > > > override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): 
> > > > Unit = {
> > > >ctx.deleteProcessingTimeTimer(window.maxTimestamp)
> > > >  }
> > > >
> > > > override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): 
> > > > Unit = {
> > > >val windowMaxTimestamp = window.maxTimestamp
> > > >if (windowMaxTimestamp > ctx.getCurrentProcessingTime) 
> > > > ctx.registerProcessingTimeTimer(windowMaxTimestamp)
> > > >  }
> > > >
> > > > override def canMerge: Boolean = true
> > > >
> > > > }
> > > >
> > > > Main process:
> > > > object MyCustomWindows {
> > > >
> > > > def main(args: Array[String]): Unit = {
> > > >
> > > >val env = StreamExecutionEnvironment.getExecutionEnvironment
> > > >env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
> > > >env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> > > >env.getConfig.setAutoWatermarkInterval(1000L)
> > > >
> > > >val sensorData: DataStream[SensorReading] = env
> > > >  .addSource(new SensorSource)
> > > >  .assignTimestampsAndWatermarks(new SensorTimeAssigner)
> > > >
> > > >val countsPerThirtySecs = sensorData
> > > >  .keyBy(_.id)
> > > > .window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000)))
> > > >  .trigger(new ProcessingTimeTrigger)
> > > >  .process(new CountFunction)
> > > >
> > > >env.execute()
> > > >  }
> > > > }
> > > >
> > > > Log results:
> > > >
> > > > > before update value : null
> > > > > after update value: 1
> > > > > before update value : null
> > > > > after update value: 2
> > > > > before update value : null
> > > > > after update value: 3
> > > > > before update value : 3
> > > > > after update value: 4
> > > > > before update value : null
> > > > > after update value: 5
> > > > > before update value : null
> > > > > after update value: 6
> > > > > before update value : null
> > > > > after update value: 7
> > > > > before update value : null
> > > > > after update value: 8
> > > > > before update value : null
> > > > > after update value: 9
> > > > > before update value : 9
> > > >

Re: Cannot get value from ValueState in ProcessingTimeTrigger

2019-12-18 Thread Utopia
Hi Vino,

Thanks for your reply !

The key of my input data is same value. So I think there is only one partition.

And Why sometimes I can get the value stored in the ValueState before update?
> > > > before update value : 3
> > > > after update value: 4

What’s more, How can I stored the previous value so that I can get the value 
when next element come in and invoke the onElement method?



Best  regards
Utopia
在 2019年12月18日 +0800 21:57,vino yang ,写道:
> Hi Utopia,
>
> The behavior may be correct.
>
> First, the default value is null. It's the correct value. 
> `ValueStateDescriptor` has multiple constructors, some of them can let you 
> specify a default value. However, these constructors are deprecated. And the 
> doc does not recommend them.[1] For the other constructors which can not 
> specify default values, it would be null.
>
> Second, before the window, there is a `keyBy` operation. it will partition 
> your data. For each partition, the default value state is null.
>
> Best,
> Vino
>
> [1]: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/index.html?org/apache/flink/api/common/state/ValueStateDescriptor.html
>
> > Utopia  于2019年12月18日周三 下午7:20写道:
> > > Hi,
> > >
> > > I want to get the last value stored in ValueState when processing element 
> > > in Trigger.
> > >
> > > But as the log shows that sometimes I can get the value, sometimes not.
> > >
> > > Only one key in my data(SensorReading).
> > >
> > > ValueState:
> > > class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] {
> > >
> > > private lazy val descriptor = new ValueStateDescriptor[Long]("desc", 
> > > classOf[Long])
> > > var value = 1
> > > override def onElement( r: SensorReading, timestamp: Long, window: 
> > > TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
> > >
> > >   println("before update value : " + 
> > > ctx.getPartitionedState(descriptor).value())
> > >
> > >ctx.getPartitionedState(descriptor).update(value)
> > >
> > >value += 1
> > >
> > >println("after update value: " + 
> > > ctx.getPartitionedState(descriptor).value())
> > >
> > >ctx.registerProcessingTimeTimer(window.maxTimestamp)
> > >TriggerResult.CONTINUE
> > > }
> > >
> > > override def onEventTime(time: Long, window: TimeWindow, ctx: 
> > > Trigger.TriggerContext) = TriggerResult.CONTINUE
> > >
> > > override def onProcessingTime(time: Long, window: TimeWindow, ctx: 
> > > Trigger.TriggerContext) = TriggerResult.FIRE
> > >
> > > override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit 
> > > = {
> > >ctx.deleteProcessingTimeTimer(window.maxTimestamp)
> > >  }
> > >
> > > override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): 
> > > Unit = {
> > >val windowMaxTimestamp = window.maxTimestamp
> > >if (windowMaxTimestamp > ctx.getCurrentProcessingTime) 
> > > ctx.registerProcessingTimeTimer(windowMaxTimestamp)
> > >  }
> > >
> > > override def canMerge: Boolean = true
> > >
> > > }
> > >
> > > Main process:
> > > object MyCustomWindows {
> > >
> > > def main(args: Array[String]): Unit = {
> > >
> > >val env = StreamExecutionEnvironment.getExecutionEnvironment
> > >env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
> > >env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> > >env.getConfig.setAutoWatermarkInterval(1000L)
> > >
> > >val sensorData: DataStream[SensorReading] = env
> > >  .addSource(new SensorSource)
> > >  .assignTimestampsAndWatermarks(new SensorTimeAssigner)
> > >
> > >val countsPerThirtySecs = sensorData
> > >  .keyBy(_.id)
> > > .window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000)))
> > >  .trigger(new ProcessingTimeTrigger)
> > >  .process(new CountFunction)
> > >
> > >env.execute()
> > >  }
> > > }
> > >
> > > Log results:
> > >
> > > > before update value : null
> > > > after update value: 1
> > > > before update value : null
> > > > after update value: 2
> > > > before update value : null
> > > > after update value: 3
> > > > before update value : 3
> > > > after update value: 4
> > > > before update value : null
> > > > after update value: 5
> > > > before update value : null
> > > > after update value: 6
> > > > before update value : null
> > > > after update value: 7
> > > > before update value : null
> > > > after update value: 8
> > > > before update value : null
> > > > after update value: 9
> > > > before update value : 9
> > > > after update value: 10
> > >
> > >
> > > Best  regards
> > > Utopia


Re: Cannot get value from ValueState in ProcessingTimeTrigger

2019-12-18 Thread vino yang
Hi Utopia,

The behavior may be correct.

First, the default value is null. It's the correct value.
`ValueStateDescriptor` has multiple constructors, some of them can let you
specify a default value. However, these constructors are deprecated. And
the doc does not recommend them.[1] For the other constructors which can
not specify default values, it would be null.

Second, before the window, there is a `keyBy` operation. it will partition
your data. For each partition, the default value state is null.

Best,
Vino

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/index.html?org/apache/flink/api/common/state/ValueStateDescriptor.html

Utopia  于2019年12月18日周三 下午7:20写道:

> Hi,
>
> I want to get the last value stored in ValueState when processing element
> in Trigger.
>
> But as the log shows that sometimes I can get the value, sometimes not.
>
> Only one key in my data(SensorReading).
>
> ValueState:
>
> class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] {
>
>   private lazy val descriptor = new ValueStateDescriptor[Long]("desc", 
> classOf[Long])
>
>   var value = 1
>
>   override def onElement( r: SensorReading, timestamp: Long, window: 
> TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
>
> println("before update value : " + 
> ctx.getPartitionedState(descriptor).value())
>
> ctx.getPartitionedState(descriptor).update(value)
>
> value += 1
>
> println("after update value: " + 
> ctx.getPartitionedState(descriptor).value())
>
> ctx.registerProcessingTimeTimer(window.maxTimestamp)
> TriggerResult.CONTINUE
>   }
>
>   override def onEventTime(time: Long, window: TimeWindow, ctx: 
> Trigger.TriggerContext) = TriggerResult.CONTINUE
>
>   override def onProcessingTime(time: Long, window: TimeWindow, ctx: 
> Trigger.TriggerContext) = TriggerResult.FIRE
>
>   override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = 
> {
> ctx.deleteProcessingTimeTimer(window.maxTimestamp)
>   }
>
>   override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): Unit 
> = {
> val windowMaxTimestamp = window.maxTimestamp
> if (windowMaxTimestamp > ctx.getCurrentProcessingTime) 
> ctx.registerProcessingTimeTimer(windowMaxTimestamp)
>   }
>
>   override def canMerge: Boolean = true
>
> }
>
>
> Main process:
>
> object MyCustomWindows {
>
>   def main(args: Array[String]): Unit = {
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.getConfig.setAutoWatermarkInterval(1000L)
>
> val sensorData: DataStream[SensorReading] = env
>   .addSource(new SensorSource)
>   .assignTimestampsAndWatermarks(new SensorTimeAssigner)
>
> val countsPerThirtySecs = sensorData
>   .keyBy(_.id)
>   .window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000)))
>   .trigger(new ProcessingTimeTrigger)
>   .process(new CountFunction)
>
> env.execute()
>   }
> }
>
>
> Log results:
>
> before update value : null
> after update value: 1
> before update value : null
> after update value: 2
> before update value : null
> after update value: 3
> before update value : 3
> after update value: 4
> before update value : null
> after update value: 5
> before update value : null
> after update value: 6
> before update value : null
> after update value: 7
> before update value : null
> after update value: 8
> before update value : null
> after update value: 9
> before update value : 9
> after update value: 10
>
>
>
> Best  regards
> Utopia
>


Cannot get value from ValueState in ProcessingTimeTrigger

2019-12-18 Thread Utopia
Hi,

I want to get the last value stored in ValueState when processing element in 
Trigger.

But as the log shows that sometimes I can get the value, sometimes not.

Only one key in my data(SensorReading).

ValueState:
class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] {

private lazy val descriptor = new ValueStateDescriptor[Long]("desc", 
classOf[Long])
var value = 1
override def onElement( r: SensorReading, timestamp: Long, window: TimeWindow, 
ctx: Trigger.TriggerContext): TriggerResult = {

  println("before update value : " + 
ctx.getPartitionedState(descriptor).value())

   ctx.getPartitionedState(descriptor).update(value)

   value += 1

   println("after update value: " + ctx.getPartitionedState(descriptor).value())

   ctx.registerProcessingTimeTimer(window.maxTimestamp)
   TriggerResult.CONTINUE
}

override def onEventTime(time: Long, window: TimeWindow, ctx: 
Trigger.TriggerContext) = TriggerResult.CONTINUE

override def onProcessingTime(time: Long, window: TimeWindow, ctx: 
Trigger.TriggerContext) = TriggerResult.FIRE

override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
   ctx.deleteProcessingTimeTimer(window.maxTimestamp)
 }

override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): Unit = {
   val windowMaxTimestamp = window.maxTimestamp
   if (windowMaxTimestamp > ctx.getCurrentProcessingTime) 
ctx.registerProcessingTimeTimer(windowMaxTimestamp)
 }

override def canMerge: Boolean = true

}

Main process:
object MyCustomWindows {

def main(args: Array[String]): Unit = {

   val env = StreamExecutionEnvironment.getExecutionEnvironment
   env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
   env.getConfig.setAutoWatermarkInterval(1000L)

   val sensorData: DataStream[SensorReading] = env
 .addSource(new SensorSource)
 .assignTimestampsAndWatermarks(new SensorTimeAssigner)

   val countsPerThirtySecs = sensorData
 .keyBy(_.id)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000)))
 .trigger(new ProcessingTimeTrigger)
 .process(new CountFunction)

   env.execute()
 }
}

Log results:

before update value : null
after update value: 1
before update value : null
after update value: 2
before update value : null
after update value: 3
before update value : 3
after update value: 4
before update value : null
after update value: 5
before update value : null
after update value: 6
before update value : null
after update value: 7
before update value : null
after update value: 8
before update value : null
after update value: 9
before update value : 9
after update value: 10


Best  regards
Utopia