回复: DataStream API min max aggregation on other fields

2019-12-19 Thread Lu Weizheng
Yes, the unpredictable non-key and non-aggregated fields make me confused. As 
Biao said, It is because the purged window state.
So when I want to use max or min, I should only use aggregated field. Other 
fields are not defined, I should take care not use them.

Thank you guys for your replies!

发件人: Biao Liu 
发送时间: 2019年12月19日 18:10
收件人: vino yang 
抄送: Lu Weizheng ; user@flink.apache.org 

主题: Re: DataStream API min max aggregation on other fields

Hi Lu,

@vino yang<mailto:yanghua1...@gmail.com> I think what he means is that the 
"max" semantics between window and non-window are different. It changes 
non-aggregated fields unpredictably.

That's really an interesting question.

I take a look at the relevant implementation. From the perspective of codes, 
"max" always keeps the non-aggregated fields with the value of first arrived 
record, which should be (0, 0, x) in this case. However when the window is 
purged, the state (which keeps non-aggregated fields of first arrived record 
and the maximum field) will be cleared. That means the "first arrived record" 
will always be reset when a window is purged. That's why the second column 
increases unpredictably.

The semantics here is so confused to me.

Thanks,
Biao /'bɪ.aʊ/



On Thu, 19 Dec 2019 at 17:50, vino yang 
mailto:yanghua1...@gmail.com>> wrote:
Hi weizheng,

IMHO, I do not know where is not clear to you? Is the result not correct? Can 
you share the correct result based on your understanding?

The "keyBy" specifies group field and min/max do the aggregation in the other 
field based on the position you specified.

Best,
Vino

Lu Weizheng mailto:luweizhen...@hotmail.com>> 
于2019年12月19日周四 下午5:00写道:
Hi all,

On a KeyedStream, when I use maxBy or minBy, I will get the max or min element. 
It means other fields will be kept as the max or min element. This is quite 
clear. However, when I use max or min, how do Flink do on other fields?


val tupleStream = senv.fromElements(
  (0, 0, 0), (0, 1, 1), (0, 2, 2),
  (1, 0, 6), (1, 1, 7), (1, 2, 8)
)
//  (0,0,0)
//  (0,0,1)
//  (0,0,2)
//  (1,0,6)
//  (1,0,7)
//  (1,0,8)
val maxByStream = tupleStream.keyBy(0).max(2).print()

In this case, the second field use the first element's 0.


class IntTupleSource extends RichSourceFunction[(Int, Int, Int)]{

  var isRunning: Boolean = true
  var i = 0

  val rand = new Random()

  override def run(srcCtx: SourceContext[(Int, Int, Int)]): Unit = {

while (isRunning) {

  // 将数据源收集写入SourceContext
  srcCtx.collect((0, i, i))
  i += 1
  Thread.sleep(1000)
}
  }

  override def cancel(): Unit = {
isRunning = false
  }
}

//(0,0,0)
//(0,1,2)
//(0,3,4)
//(0,5,6)
//(0,7,8)
//(0,9,10)

val maxWindowStream = senv.addSource(new IntTupleSource)
  .keyBy(0)
  .timeWindow(Time.milliseconds(2000))
  .max(2).print()


In this case, the result is not so clear...

So, for max and min, the two operator can not make sure the result of other 
fields ?

Thank you so much if anyone can replay.

Weizheng


Re: DataStream API min max aggregation on other fields

2019-12-19 Thread Biao Liu
Hi Lu,

@vino yang  I think what he means is that the "max"
semantics between window and non-window are different. It changes
non-aggregated fields unpredictably.

That's really an interesting question.

I take a look at the relevant implementation. From the perspective of
codes, "max" always keeps the non-aggregated fields with the value of first
arrived record, which should be (0, 0, x) in this case. However when the
window is purged, the state (which keeps non-aggregated fields of first
arrived record and the maximum field) will be cleared. That means the
"first arrived record" will always be reset when a window is purged. That's
why the second column increases unpredictably.

The semantics here is so confused to me.

Thanks,
Biao /'bɪ.aʊ/



On Thu, 19 Dec 2019 at 17:50, vino yang  wrote:

> Hi weizheng,
>
> IMHO, I do not know where is not clear to you? Is the result not correct?
> Can you share the correct result based on your understanding?
>
> The "keyBy" specifies group field and min/max do the aggregation in the
> other field based on the position you specified.
>
> Best,
> Vino
>
> Lu Weizheng  于2019年12月19日周四 下午5:00写道:
>
>> Hi all,
>>
>> On a KeyedStream, when I use maxBy or minBy, I will get the max or min
>> element. It means other fields will be kept as the max or min element. This
>> is quite clear. However, when I use max or min, how do Flink do on other
>> fields?
>>
>> val tupleStream = senv.fromElements(
>>   (0, 0, 0), (0, 1, 1), (0, 2, 2),
>>   (1, 0, 6), (1, 1, 7), (1, 2, 8)
>> )
>> //  (0,0,0)
>> //  (0,0,1)
>> //  (0,0,2)
>> //  (1,0,6)
>> //  (1,0,7)
>> //  (1,0,8)
>> val maxByStream = tupleStream.keyBy(0).max(2).print()
>>
>> In this case, the second field use the first element's 0.
>>
>> class IntTupleSource extends RichSourceFunction[(Int, Int, Int)]{
>>
>>   var isRunning: Boolean = true
>>   var i = 0
>>
>>   val rand = new Random()
>>
>>   override def run(srcCtx: SourceContext[(Int, Int, Int)]): Unit = {
>>
>> while (isRunning) {
>>
>>   // 将数据源收集写入SourceContext
>>   srcCtx.collect((0, i, i))
>>   i += 1
>>   Thread.sleep(1000)
>> }
>>   }
>>
>>   override def cancel(): Unit = {
>> isRunning = false
>>   }
>> }
>>
>> //(0,0,0)
>> //(0,1,2)
>> //(0,3,4)
>> //(0,5,6)
>> //(0,7,8)
>> //(0,9,10)
>>
>> val maxWindowStream = senv.addSource(new IntTupleSource)
>>   .keyBy(0)
>>   .timeWindow(Time.milliseconds(2000))
>>   .max(2).print()
>>
>>
>>
>> In this case, the result is not so clear...
>>
>> So, for max and min, the two operator can not make sure the result of
>> other fields ?
>>
>> Thank you so much if anyone can replay.
>>
>> Weizheng
>>
>


Re: DataStream API min max aggregation on other fields

2019-12-19 Thread vino yang
Hi weizheng,

IMHO, I do not know where is not clear to you? Is the result not correct?
Can you share the correct result based on your understanding?

The "keyBy" specifies group field and min/max do the aggregation in the
other field based on the position you specified.

Best,
Vino

Lu Weizheng  于2019年12月19日周四 下午5:00写道:

> Hi all,
>
> On a KeyedStream, when I use maxBy or minBy, I will get the max or min
> element. It means other fields will be kept as the max or min element. This
> is quite clear. However, when I use max or min, how do Flink do on other
> fields?
>
> val tupleStream = senv.fromElements(
>   (0, 0, 0), (0, 1, 1), (0, 2, 2),
>   (1, 0, 6), (1, 1, 7), (1, 2, 8)
> )
> //  (0,0,0)
> //  (0,0,1)
> //  (0,0,2)
> //  (1,0,6)
> //  (1,0,7)
> //  (1,0,8)
> val maxByStream = tupleStream.keyBy(0).max(2).print()
>
> In this case, the second field use the first element's 0.
>
> class IntTupleSource extends RichSourceFunction[(Int, Int, Int)]{
>
>   var isRunning: Boolean = true
>   var i = 0
>
>   val rand = new Random()
>
>   override def run(srcCtx: SourceContext[(Int, Int, Int)]): Unit = {
>
> while (isRunning) {
>
>   // 将数据源收集写入SourceContext
>   srcCtx.collect((0, i, i))
>   i += 1
>   Thread.sleep(1000)
> }
>   }
>
>   override def cancel(): Unit = {
> isRunning = false
>   }
> }
>
> //(0,0,0)
> //(0,1,2)
> //(0,3,4)
> //(0,5,6)
> //(0,7,8)
> //(0,9,10)
>
> val maxWindowStream = senv.addSource(new IntTupleSource)
>   .keyBy(0)
>   .timeWindow(Time.milliseconds(2000))
>   .max(2).print()
>
>
>
> In this case, the result is not so clear...
>
> So, for max and min, the two operator can not make sure the result of
> other fields ?
>
> Thank you so much if anyone can replay.
>
> Weizheng
>


DataStream API min max aggregation on other fields

2019-12-19 Thread Lu Weizheng
Hi all,

On a KeyedStream, when I use maxBy or minBy, I will get the max or min element. 
It means other fields will be kept as the max or min element. This is quite 
clear. However, when I use max or min, how do Flink do on other fields?


val tupleStream = senv.fromElements(
  (0, 0, 0), (0, 1, 1), (0, 2, 2),
  (1, 0, 6), (1, 1, 7), (1, 2, 8)
)
//  (0,0,0)
//  (0,0,1)
//  (0,0,2)
//  (1,0,6)
//  (1,0,7)
//  (1,0,8)
val maxByStream = tupleStream.keyBy(0).max(2).print()

In this case, the second field use the first element's 0.


class IntTupleSource extends RichSourceFunction[(Int, Int, Int)]{

  var isRunning: Boolean = true
  var i = 0

  val rand = new Random()

  override def run(srcCtx: SourceContext[(Int, Int, Int)]): Unit = {

while (isRunning) {

  // 将数据源收集写入SourceContext
  srcCtx.collect((0, i, i))
  i += 1
  Thread.sleep(1000)
}
  }

  override def cancel(): Unit = {
isRunning = false
  }
}

//(0,0,0)
//(0,1,2)
//(0,3,4)
//(0,5,6)
//(0,7,8)
//(0,9,10)

val maxWindowStream = senv.addSource(new IntTupleSource)
  .keyBy(0)
  .timeWindow(Time.milliseconds(2000))
  .max(2).print()


In this case, the result is not so clear...

So, for max and min, the two operator can not make sure the result of other 
fields ?

Thank you so much if anyone can replay.

Weizheng