[jira] [Commented] (FLINK-9528) Incorrect results: Filter does not treat Upsert messages correctly.

2020-05-17 Thread Hequn Cheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17109381#comment-17109381
 ] 

Hequn Cheng commented on FLINK-9528:


[~jark] (y)

> Incorrect results: Filter does not treat Upsert messages correctly.
> ---
>
> Key: FLINK-9528
> URL: https://issues.apache.org/jira/browse/FLINK-9528
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.3.3, 1.4.2, 1.5.0
>Reporter: Fabian Hueske
>Priority: Critical
> Fix For: 1.11.0
>
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between 
> retraction and upsert mode. A Calc looks at record (regardless of its update 
> semantics) and either discard it (predicate evaluates to false) or pass it on 
> (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for 
> upsert messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the 
> problem:
> {code:java}
>   @Test
>   def testUpsertsWithFilter(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.enableObjectReuse()
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val t = StreamTestData.get3TupleDataStream(env)
>   .assignAscendingTimestamps(_._1.toLong)
>   .toTable(tEnv, 'id, 'num, 'text)
> t.select('text.charLength() as 'len)
>   .groupBy('len)
>   .select('len, 'len.count as 'cnt)
>   // .where('cnt < 7)
>   .writeToSink(new TestUpsertSink(Array("len"), false))
> env.execute()
> val results = RowCollector.getAndClearValues
> val retracted = RowCollector.upsertResults(results, Array(0)).sorted
> val expectedWithoutFilter = List(
>   "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
> val expectedWithFilter = List(
> "2,1", "5,1", "11,1", "14,1", "25,1").sorted
> assertEquals(expectedWithoutFilter, retracted)
> // assertEquals(expectedWithFilter, retracted)
>   }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows 
> that do not fulfill the condition are removed from the result. However, the 
> filter only removes the upsert message such that the previous version remains 
> in the result.
> One solution could be to make a filter aware of the update semantics (retract 
> or upsert) and convert the upsert message into a delete message if the 
> predicate evaluates to false.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-9528) Incorrect results: Filter does not treat Upsert messages correctly.

2020-04-08 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078413#comment-17078413
 ] 

Timo Walther commented on FLINK-9528:
-

I changed the component to Blink planner.

> Incorrect results: Filter does not treat Upsert messages correctly.
> ---
>
> Key: FLINK-9528
> URL: https://issues.apache.org/jira/browse/FLINK-9528
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.3.3, 1.4.2, 1.5.0
>Reporter: Fabian Hueske
>Priority: Critical
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between 
> retraction and upsert mode. A Calc looks at record (regardless of its update 
> semantics) and either discard it (predicate evaluates to false) or pass it on 
> (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for 
> upsert messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the 
> problem:
> {code:java}
>   @Test
>   def testUpsertsWithFilter(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.enableObjectReuse()
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val t = StreamTestData.get3TupleDataStream(env)
>   .assignAscendingTimestamps(_._1.toLong)
>   .toTable(tEnv, 'id, 'num, 'text)
> t.select('text.charLength() as 'len)
>   .groupBy('len)
>   .select('len, 'len.count as 'cnt)
>   // .where('cnt < 7)
>   .writeToSink(new TestUpsertSink(Array("len"), false))
> env.execute()
> val results = RowCollector.getAndClearValues
> val retracted = RowCollector.upsertResults(results, Array(0)).sorted
> val expectedWithoutFilter = List(
>   "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
> val expectedWithFilter = List(
> "2,1", "5,1", "11,1", "14,1", "25,1").sorted
> assertEquals(expectedWithoutFilter, retracted)
> // assertEquals(expectedWithFilter, retracted)
>   }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows 
> that do not fulfill the condition are removed from the result. However, the 
> filter only removes the upsert message such that the previous version remains 
> in the result.
> One solution could be to make a filter aware of the update semantics (retract 
> or upsert) and convert the upsert message into a delete message if the 
> predicate evaluates to false.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-9528) Incorrect results: Filter does not treat Upsert messages correctly.

2020-04-08 Thread Jark Wu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078290#comment-17078290
 ] 

Jark Wu commented on FLINK-9528:


Hi [~twalthr], this is not resolved in Blink planner yet. But we have a plan to 
fix it in next releases. We already have some initial ideas about this problem. 

> Incorrect results: Filter does not treat Upsert messages correctly.
> ---
>
> Key: FLINK-9528
> URL: https://issues.apache.org/jira/browse/FLINK-9528
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.3.3, 1.4.2, 1.5.0
>Reporter: Fabian Hueske
>Priority: Critical
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between 
> retraction and upsert mode. A Calc looks at record (regardless of its update 
> semantics) and either discard it (predicate evaluates to false) or pass it on 
> (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for 
> upsert messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the 
> problem:
> {code:java}
>   @Test
>   def testUpsertsWithFilter(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.enableObjectReuse()
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val t = StreamTestData.get3TupleDataStream(env)
>   .assignAscendingTimestamps(_._1.toLong)
>   .toTable(tEnv, 'id, 'num, 'text)
> t.select('text.charLength() as 'len)
>   .groupBy('len)
>   .select('len, 'len.count as 'cnt)
>   // .where('cnt < 7)
>   .writeToSink(new TestUpsertSink(Array("len"), false))
> env.execute()
> val results = RowCollector.getAndClearValues
> val retracted = RowCollector.upsertResults(results, Array(0)).sorted
> val expectedWithoutFilter = List(
>   "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
> val expectedWithFilter = List(
> "2,1", "5,1", "11,1", "14,1", "25,1").sorted
> assertEquals(expectedWithoutFilter, retracted)
> // assertEquals(expectedWithFilter, retracted)
>   }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows 
> that do not fulfill the condition are removed from the result. However, the 
> filter only removes the upsert message such that the previous version remains 
> in the result.
> One solution could be to make a filter aware of the update semantics (retract 
> or upsert) and convert the upsert message into a delete message if the 
> predicate evaluates to false.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-9528) Incorrect results: Filter does not treat Upsert messages correctly.

2020-04-08 Thread Fabian Hueske (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078248#comment-17078248
 ] 

Fabian Hueske commented on FLINK-9528:
--

Thanks for the update [~hequn8128]! 
I'll reopen the ticket then.

> Incorrect results: Filter does not treat Upsert messages correctly.
> ---
>
> Key: FLINK-9528
> URL: https://issues.apache.org/jira/browse/FLINK-9528
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.3.3, 1.4.2, 1.5.0
>Reporter: Fabian Hueske
>Priority: Critical
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between 
> retraction and upsert mode. A Calc looks at record (regardless of its update 
> semantics) and either discard it (predicate evaluates to false) or pass it on 
> (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for 
> upsert messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the 
> problem:
> {code:java}
>   @Test
>   def testUpsertsWithFilter(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.enableObjectReuse()
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val t = StreamTestData.get3TupleDataStream(env)
>   .assignAscendingTimestamps(_._1.toLong)
>   .toTable(tEnv, 'id, 'num, 'text)
> t.select('text.charLength() as 'len)
>   .groupBy('len)
>   .select('len, 'len.count as 'cnt)
>   // .where('cnt < 7)
>   .writeToSink(new TestUpsertSink(Array("len"), false))
> env.execute()
> val results = RowCollector.getAndClearValues
> val retracted = RowCollector.upsertResults(results, Array(0)).sorted
> val expectedWithoutFilter = List(
>   "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
> val expectedWithFilter = List(
> "2,1", "5,1", "11,1", "14,1", "25,1").sorted
> assertEquals(expectedWithoutFilter, retracted)
> // assertEquals(expectedWithFilter, retracted)
>   }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows 
> that do not fulfill the condition are removed from the result. However, the 
> filter only removes the upsert message such that the previous version remains 
> in the result.
> One solution could be to make a filter aware of the update semantics (retract 
> or upsert) and convert the upsert message into a delete message if the 
> predicate evaluates to false.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-9528) Incorrect results: Filter does not treat Upsert messages correctly.

2020-04-08 Thread Hequn Cheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078244#comment-17078244
 ] 

Hequn Cheng commented on FLINK-9528:


Hi everyone, I have freed this ticket as I have switched my work from Table to 
PyFlink. Feel free to take this if you are interested. FYI: The bug is still 
exist in the blink planner. 

> Incorrect results: Filter does not treat Upsert messages correctly.
> ---
>
> Key: FLINK-9528
> URL: https://issues.apache.org/jira/browse/FLINK-9528
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.3.3, 1.4.2, 1.5.0
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Critical
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between 
> retraction and upsert mode. A Calc looks at record (regardless of its update 
> semantics) and either discard it (predicate evaluates to false) or pass it on 
> (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for 
> upsert messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the 
> problem:
> {code:java}
>   @Test
>   def testUpsertsWithFilter(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.enableObjectReuse()
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val t = StreamTestData.get3TupleDataStream(env)
>   .assignAscendingTimestamps(_._1.toLong)
>   .toTable(tEnv, 'id, 'num, 'text)
> t.select('text.charLength() as 'len)
>   .groupBy('len)
>   .select('len, 'len.count as 'cnt)
>   // .where('cnt < 7)
>   .writeToSink(new TestUpsertSink(Array("len"), false))
> env.execute()
> val results = RowCollector.getAndClearValues
> val retracted = RowCollector.upsertResults(results, Array(0)).sorted
> val expectedWithoutFilter = List(
>   "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
> val expectedWithFilter = List(
> "2,1", "5,1", "11,1", "14,1", "25,1").sorted
> assertEquals(expectedWithoutFilter, retracted)
> // assertEquals(expectedWithFilter, retracted)
>   }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows 
> that do not fulfill the condition are removed from the result. However, the 
> filter only removes the upsert message such that the previous version remains 
> in the result.
> One solution could be to make a filter aware of the update semantics (retract 
> or upsert) and convert the upsert message into a delete message if the 
> predicate evaluates to false.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-9528) Incorrect results: Filter does not treat Upsert messages correctly.

2020-04-08 Thread Fabian Hueske (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078225#comment-17078225
 ] 

Fabian Hueske commented on FLINK-9528:
--

I agree with [~twalthr]. 
I don't think we should close bug reports unless we know that the bug was fixed 
or is not relevant anymore (because a component was completely removed or ...).

> Incorrect results: Filter does not treat Upsert messages correctly.
> ---
>
> Key: FLINK-9528
> URL: https://issues.apache.org/jira/browse/FLINK-9528
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.3.3, 1.4.2, 1.5.0
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Critical
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between 
> retraction and upsert mode. A Calc looks at record (regardless of its update 
> semantics) and either discard it (predicate evaluates to false) or pass it on 
> (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for 
> upsert messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the 
> problem:
> {code:java}
>   @Test
>   def testUpsertsWithFilter(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.enableObjectReuse()
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val t = StreamTestData.get3TupleDataStream(env)
>   .assignAscendingTimestamps(_._1.toLong)
>   .toTable(tEnv, 'id, 'num, 'text)
> t.select('text.charLength() as 'len)
>   .groupBy('len)
>   .select('len, 'len.count as 'cnt)
>   // .where('cnt < 7)
>   .writeToSink(new TestUpsertSink(Array("len"), false))
> env.execute()
> val results = RowCollector.getAndClearValues
> val retracted = RowCollector.upsertResults(results, Array(0)).sorted
> val expectedWithoutFilter = List(
>   "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
> val expectedWithFilter = List(
> "2,1", "5,1", "11,1", "14,1", "25,1").sorted
> assertEquals(expectedWithoutFilter, retracted)
> // assertEquals(expectedWithFilter, retracted)
>   }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows 
> that do not fulfill the condition are removed from the result. However, the 
> filter only removes the upsert message such that the previous version remains 
> in the result.
> One solution could be to make a filter aware of the update semantics (retract 
> or upsert) and convert the upsert message into a delete message if the 
> predicate evaluates to false.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-9528) Incorrect results: Filter does not treat Upsert messages correctly.

2020-04-08 Thread Timo Walther (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078220#comment-17078220
 ] 

Timo Walther commented on FLINK-9528:
-

[~jark] is this issue solved in the Blink planner? It actually quite serious 
and was therefore marked as critical.

> Incorrect results: Filter does not treat Upsert messages correctly.
> ---
>
> Key: FLINK-9528
> URL: https://issues.apache.org/jira/browse/FLINK-9528
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.3.3, 1.4.2, 1.5.0
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Critical
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between 
> retraction and upsert mode. A Calc looks at record (regardless of its update 
> semantics) and either discard it (predicate evaluates to false) or pass it on 
> (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for 
> upsert messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the 
> problem:
> {code:java}
>   @Test
>   def testUpsertsWithFilter(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.enableObjectReuse()
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val t = StreamTestData.get3TupleDataStream(env)
>   .assignAscendingTimestamps(_._1.toLong)
>   .toTable(tEnv, 'id, 'num, 'text)
> t.select('text.charLength() as 'len)
>   .groupBy('len)
>   .select('len, 'len.count as 'cnt)
>   // .where('cnt < 7)
>   .writeToSink(new TestUpsertSink(Array("len"), false))
> env.execute()
> val results = RowCollector.getAndClearValues
> val retracted = RowCollector.upsertResults(results, Array(0)).sorted
> val expectedWithoutFilter = List(
>   "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
> val expectedWithFilter = List(
> "2,1", "5,1", "11,1", "14,1", "25,1").sorted
> assertEquals(expectedWithoutFilter, retracted)
> // assertEquals(expectedWithFilter, retracted)
>   }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows 
> that do not fulfill the condition are removed from the result. However, the 
> filter only removes the upsert message such that the previous version remains 
> in the result.
> One solution could be to make a filter aware of the update semantics (retract 
> or upsert) and convert the upsert message into a delete message if the 
> predicate evaluates to false.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-9528) Incorrect results: Filter does not treat Upsert messages correctly.

2018-06-14 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16512534#comment-16512534
 ] 

Fabian Hueske commented on FLINK-9528:
--

I don't think we need state in all Filters working under AccMode.
 Consider the following plan:
{code:java}
A -> GroupAgg(groupBy x, count(*) AS y) -> Filter(y < 10) -\
Join(A.x = B.x)
B -/
{code}
The Filter could work in AccMode (because the aggregation and join work on the 
same key). However, adding state to the filter would be unnecessary because the 
Join would automatically do the deduplication and also the messaging overhead 
wouldn't be too high because messages would not go over the network.

I'm quite sure this is true for all other internal AccMode connections except 
for the Sink case.
That's why I'm proposing to add the state to the sink instead of the filter as 
it seems to be the only case that would need the additional state.

> Incorrect results: Filter does not treat Upsert messages correctly.
> ---
>
> Key: FLINK-9528
> URL: https://issues.apache.org/jira/browse/FLINK-9528
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.3, 1.5.0, 1.4.2
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Critical
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between 
> retraction and upsert mode. A Calc looks at record (regardless of its update 
> semantics) and either discard it (predicate evaluates to false) or pass it on 
> (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for 
> upsert messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the 
> problem:
> {code:java}
>   @Test
>   def testUpsertsWithFilter(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.enableObjectReuse()
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val t = StreamTestData.get3TupleDataStream(env)
>   .assignAscendingTimestamps(_._1.toLong)
>   .toTable(tEnv, 'id, 'num, 'text)
> t.select('text.charLength() as 'len)
>   .groupBy('len)
>   .select('len, 'len.count as 'cnt)
>   // .where('cnt < 7)
>   .writeToSink(new TestUpsertSink(Array("len"), false))
> env.execute()
> val results = RowCollector.getAndClearValues
> val retracted = RowCollector.upsertResults(results, Array(0)).sorted
> val expectedWithoutFilter = List(
>   "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
> val expectedWithFilter = List(
> "2,1", "5,1", "11,1", "14,1", "25,1").sorted
> assertEquals(expectedWithoutFilter, retracted)
> // assertEquals(expectedWithFilter, retracted)
>   }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows 
> that do not fulfill the condition are removed from the result. However, the 
> filter only removes the upsert message such that the previous version remains 
> in the result.
> One solution could be to make a filter aware of the update semantics (retract 
> or upsert) and convert the upsert message into a delete message if the 
> predicate evaluates to false.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9528) Incorrect results: Filter does not treat Upsert messages correctly.

2018-06-12 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16510544#comment-16510544
 ] 

Hequn Cheng commented on FLINK-9528:


You are right, we should not add state to all filters. Add state only when 
filter is working under AccMode and with UniqueKey(Maybe we can give the filter 
a name, say, *UpsertFilter*). 

Considering the case: Source -> Agg -> UpsertFilter -> UpsertSink, we need to 
add a state to the UpsertFilter to send delete message when it is 
necessary(predicate evaluates to false). To make sure messages with same 
uniquekey go to a same place, we need to do keyby in UpsertFilter.

> Incorrect results: Filter does not treat Upsert messages correctly.
> ---
>
> Key: FLINK-9528
> URL: https://issues.apache.org/jira/browse/FLINK-9528
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.3, 1.5.0, 1.4.2
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Critical
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between 
> retraction and upsert mode. A Calc looks at record (regardless of its update 
> semantics) and either discard it (predicate evaluates to false) or pass it on 
> (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for 
> upsert messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the 
> problem:
> {code:java}
>   @Test
>   def testUpsertsWithFilter(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.enableObjectReuse()
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val t = StreamTestData.get3TupleDataStream(env)
>   .assignAscendingTimestamps(_._1.toLong)
>   .toTable(tEnv, 'id, 'num, 'text)
> t.select('text.charLength() as 'len)
>   .groupBy('len)
>   .select('len, 'len.count as 'cnt)
>   // .where('cnt < 7)
>   .writeToSink(new TestUpsertSink(Array("len"), false))
> env.execute()
> val results = RowCollector.getAndClearValues
> val retracted = RowCollector.upsertResults(results, Array(0)).sorted
> val expectedWithoutFilter = List(
>   "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
> val expectedWithFilter = List(
> "2,1", "5,1", "11,1", "14,1", "25,1").sorted
> assertEquals(expectedWithoutFilter, retracted)
> // assertEquals(expectedWithFilter, retracted)
>   }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows 
> that do not fulfill the condition are removed from the result. However, the 
> filter only removes the upsert message such that the previous version remains 
> in the result.
> One solution could be to make a filter aware of the update semantics (retract 
> or upsert) and convert the upsert message into a delete message if the 
> predicate evaluates to false.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9528) Incorrect results: Filter does not treat Upsert messages correctly.

2018-06-12 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16509320#comment-16509320
 ] 

Fabian Hueske commented on FLINK-9528:
--

I think we can provide efficient implementations for all internal operators 
(joins, aggregation) that ignore (and thereby filter) duplicate delete message. 
There would be some overhead for sending duplicate delete messages, but since 
both sender and receiver need to be keyed on the unique attributed, there would 
not be a shuffle involved.

The only operator that we would need to "protect" from duplicate deletes are 
upsert sinks. I think by adding state to the Filter operator, we would add 
unnecessary overhead if the following operator is not a sink. 

> Incorrect results: Filter does not treat Upsert messages correctly.
> ---
>
> Key: FLINK-9528
> URL: https://issues.apache.org/jira/browse/FLINK-9528
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.3, 1.5.0, 1.4.2
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Critical
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between 
> retraction and upsert mode. A Calc looks at record (regardless of its update 
> semantics) and either discard it (predicate evaluates to false) or pass it on 
> (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for 
> upsert messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the 
> problem:
> {code:java}
>   @Test
>   def testUpsertsWithFilter(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.enableObjectReuse()
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val t = StreamTestData.get3TupleDataStream(env)
>   .assignAscendingTimestamps(_._1.toLong)
>   .toTable(tEnv, 'id, 'num, 'text)
> t.select('text.charLength() as 'len)
>   .groupBy('len)
>   .select('len, 'len.count as 'cnt)
>   // .where('cnt < 7)
>   .writeToSink(new TestUpsertSink(Array("len"), false))
> env.execute()
> val results = RowCollector.getAndClearValues
> val retracted = RowCollector.upsertResults(results, Array(0)).sorted
> val expectedWithoutFilter = List(
>   "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
> val expectedWithFilter = List(
> "2,1", "5,1", "11,1", "14,1", "25,1").sorted
> assertEquals(expectedWithoutFilter, retracted)
> // assertEquals(expectedWithFilter, retracted)
>   }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows 
> that do not fulfill the condition are removed from the result. However, the 
> filter only removes the upsert message such that the previous version remains 
> in the result.
> One solution could be to make a filter aware of the update semantics (retract 
> or upsert) and convert the upsert message into a delete message if the 
> predicate evaluates to false.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9528) Incorrect results: Filter does not treat Upsert messages correctly.

2018-06-11 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16508175#comment-16508175
 ] 

Hequn Cheng commented on FLINK-9528:


Hi, [~fhueske] If we agree to add a state we can simply add the state in the 
original filter instead of adding a duplicated one. The state filter not only 
brings state cost but also shuffle cost, but a GroupByHaving operator can solve 
the problem in most cases. 



> Incorrect results: Filter does not treat Upsert messages correctly.
> ---
>
> Key: FLINK-9528
> URL: https://issues.apache.org/jira/browse/FLINK-9528
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.3, 1.5.0, 1.4.2
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Critical
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between 
> retraction and upsert mode. A Calc looks at record (regardless of its update 
> semantics) and either discard it (predicate evaluates to false) or pass it on 
> (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for 
> upsert messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the 
> problem:
> {code:java}
>   @Test
>   def testUpsertsWithFilter(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.enableObjectReuse()
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val t = StreamTestData.get3TupleDataStream(env)
>   .assignAscendingTimestamps(_._1.toLong)
>   .toTable(tEnv, 'id, 'num, 'text)
> t.select('text.charLength() as 'len)
>   .groupBy('len)
>   .select('len, 'len.count as 'cnt)
>   // .where('cnt < 7)
>   .writeToSink(new TestUpsertSink(Array("len"), false))
> env.execute()
> val results = RowCollector.getAndClearValues
> val retracted = RowCollector.upsertResults(results, Array(0)).sorted
> val expectedWithoutFilter = List(
>   "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
> val expectedWithFilter = List(
> "2,1", "5,1", "11,1", "14,1", "25,1").sorted
> assertEquals(expectedWithoutFilter, retracted)
> // assertEquals(expectedWithFilter, retracted)
>   }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows 
> that do not fulfill the condition are removed from the result. However, the 
> filter only removes the upsert message such that the previous version remains 
> in the result.
> One solution could be to make a filter aware of the update semantics (retract 
> or upsert) and convert the upsert message into a delete message if the 
> predicate evaluates to false.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9528) Incorrect results: Filter does not treat Upsert messages correctly.

2018-06-07 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504489#comment-16504489
 ] 

Fabian Hueske commented on FLINK-9528:
--

True, highly-selective filters would definitely be an issue. However, most 
operators could handle duplicates quite well and would not need to have them 
filtered because they have the necessary state anyway. Only sinks are affected 
by this. Why not add a duplicate filter for upsert sinks? This would keep the 
filter stateless and efficient.

The duplicate filter for the upsert sink would need a boolean flag per key, and 
the number of keys depends on the selectivity of the filter, i.e., the size of 
the state is proportional to the gains. Later we could improve it by using a 
probabilistic filter (proposed in FLINK-8601).

> Incorrect results: Filter does not treat Upsert messages correctly.
> ---
>
> Key: FLINK-9528
> URL: https://issues.apache.org/jira/browse/FLINK-9528
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.3, 1.5.0, 1.4.2
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Critical
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between 
> retraction and upsert mode. A Calc looks at record (regardless of its update 
> semantics) and either discard it (predicate evaluates to false) or pass it on 
> (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for 
> upsert messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the 
> problem:
> {code:java}
>   @Test
>   def testUpsertsWithFilter(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.enableObjectReuse()
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val t = StreamTestData.get3TupleDataStream(env)
>   .assignAscendingTimestamps(_._1.toLong)
>   .toTable(tEnv, 'id, 'num, 'text)
> t.select('text.charLength() as 'len)
>   .groupBy('len)
>   .select('len, 'len.count as 'cnt)
>   // .where('cnt < 7)
>   .writeToSink(new TestUpsertSink(Array("len"), false))
> env.execute()
> val results = RowCollector.getAndClearValues
> val retracted = RowCollector.upsertResults(results, Array(0)).sorted
> val expectedWithoutFilter = List(
>   "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
> val expectedWithFilter = List(
> "2,1", "5,1", "11,1", "14,1", "25,1").sorted
> assertEquals(expectedWithoutFilter, retracted)
> // assertEquals(expectedWithFilter, retracted)
>   }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows 
> that do not fulfill the condition are removed from the result. However, the 
> filter only removes the upsert message such that the previous version remains 
> in the result.
> One solution could be to make a filter aware of the update semantics (retract 
> or upsert) and convert the upsert message into a delete message if the 
> predicate evaluates to false.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9528) Incorrect results: Filter does not treat Upsert messages correctly.

2018-06-07 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16504471#comment-16504471
 ] 

Hequn Cheng commented on FLINK-9528:


Hi, [~fhueske]. Considering the anti-spamming scenario, users probably just 
want to get top 1% data from the result of group by, the rest 99% of the data 
will all become delete messages after the {{Filter}}. This would be a disaster 
for a storage. Especailly for the case, most of the coming keys are new ones 
and can not be swallowed by a cache.

Adding a GroupByHaving operator is a good way but this only cover the case that 
filter can be pushed down into group by. If a filter can not be pushed down, we 
can turn the upsert stream into a retract stream to ensure correctness(or throw 
exception and inform user to use {{RetractTableSink}}). What do you think?

> Incorrect results: Filter does not treat Upsert messages correctly.
> ---
>
> Key: FLINK-9528
> URL: https://issues.apache.org/jira/browse/FLINK-9528
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.3, 1.5.0, 1.4.2
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Critical
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between 
> retraction and upsert mode. A Calc looks at record (regardless of its update 
> semantics) and either discard it (predicate evaluates to false) or pass it on 
> (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for 
> upsert messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the 
> problem:
> {code:java}
>   @Test
>   def testUpsertsWithFilter(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.enableObjectReuse()
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val t = StreamTestData.get3TupleDataStream(env)
>   .assignAscendingTimestamps(_._1.toLong)
>   .toTable(tEnv, 'id, 'num, 'text)
> t.select('text.charLength() as 'len)
>   .groupBy('len)
>   .select('len, 'len.count as 'cnt)
>   // .where('cnt < 7)
>   .writeToSink(new TestUpsertSink(Array("len"), false))
> env.execute()
> val results = RowCollector.getAndClearValues
> val retracted = RowCollector.upsertResults(results, Array(0)).sorted
> val expectedWithoutFilter = List(
>   "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
> val expectedWithFilter = List(
> "2,1", "5,1", "11,1", "14,1", "25,1").sorted
> assertEquals(expectedWithoutFilter, retracted)
> // assertEquals(expectedWithFilter, retracted)
>   }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows 
> that do not fulfill the condition are removed from the result. However, the 
> filter only removes the upsert message such that the previous version remains 
> in the result.
> One solution could be to make a filter aware of the update semantics (retract 
> or upsert) and convert the upsert message into a delete message if the 
> predicate evaluates to false.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9528) Incorrect results: Filter does not treat Upsert messages correctly.

2018-06-06 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16503155#comment-16503155
 ] 

Fabian Hueske commented on FLINK-9528:
--

Hi [~hequn8128], can you explain in more detail what you mean by:

{quote}However it may bring some extra side effects, for instance, user can not 
filter the data what they don't need(actually), although the redundant messages 
won't affect the correctness.{quote}

I'm not sure about adding state or a special communication mode. From my point 
of view, we should not add additional storage overhead (state) or special code 
paths if the result is correct already. It is true that we would send more 
records than necessary, but they can be swallowed by the next operator. A sink 
that does not want to emit duplicate delete messages could also add state to 
remove duplicates (or a simple in-memory cache to reduce them).

As an optimization, we can also think about adding a GroupByHaving operator 
that does the filtering internally. However, we definitely need a general 
solution for Filters with Upsert inputs.


> Incorrect results: Filter does not treat Upsert messages correctly.
> ---
>
> Key: FLINK-9528
> URL: https://issues.apache.org/jira/browse/FLINK-9528
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.3, 1.5.0, 1.4.2
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Critical
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between 
> retraction and upsert mode. A Calc looks at record (regardless of its update 
> semantics) and either discard it (predicate evaluates to false) or pass it on 
> (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for 
> upsert messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the 
> problem:
> {code:java}
>   @Test
>   def testUpsertsWithFilter(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.enableObjectReuse()
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val t = StreamTestData.get3TupleDataStream(env)
>   .assignAscendingTimestamps(_._1.toLong)
>   .toTable(tEnv, 'id, 'num, 'text)
> t.select('text.charLength() as 'len)
>   .groupBy('len)
>   .select('len, 'len.count as 'cnt)
>   // .where('cnt < 7)
>   .writeToSink(new TestUpsertSink(Array("len"), false))
> env.execute()
> val results = RowCollector.getAndClearValues
> val retracted = RowCollector.upsertResults(results, Array(0)).sorted
> val expectedWithoutFilter = List(
>   "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
> val expectedWithFilter = List(
> "2,1", "5,1", "11,1", "14,1", "25,1").sorted
> assertEquals(expectedWithoutFilter, retracted)
> // assertEquals(expectedWithFilter, retracted)
>   }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows 
> that do not fulfill the condition are removed from the result. However, the 
> filter only removes the upsert message such that the previous version remains 
> in the result.
> One solution could be to make a filter aware of the update semantics (retract 
> or upsert) and convert the upsert message into a delete message if the 
> predicate evaluates to false.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9528) Incorrect results: Filter does not treat Upsert messages correctly.

2018-06-05 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16502103#comment-16502103
 ] 

Hequn Cheng commented on FLINK-9528:


You are right. The duplicate delete messages in upsert mode won't affect the 
correctness. However it may bring some extra side effects, for instance, user 
can not filter the data what they don't need(actually), although the redundant 
messages won't affect the correctness.

To avoid the duplicate delete messages, we can keep a state in Filter. The 
Filter only send one delete message when the predicate evaluates from true to 
false. But this will bring state or shuffle cost.

I was thinking whether there is a way that we can send a bundle message from 
the upstream aggregate node. The bundle message contains a retract message and 
an update message(retract-old). For the {{Filter}}, it will turn the 
bundle message into a normal message when the insert-new is filtered. And for 
the {{UpsertTableSink}}, it can throw away the useless retract-old message when 
received the bundle.



> Incorrect results: Filter does not treat Upsert messages correctly.
> ---
>
> Key: FLINK-9528
> URL: https://issues.apache.org/jira/browse/FLINK-9528
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.3, 1.5.0, 1.4.2
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Critical
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between 
> retraction and upsert mode. A Calc looks at record (regardless of its update 
> semantics) and either discard it (predicate evaluates to false) or pass it on 
> (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for 
> upsert messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the 
> problem:
> {code:java}
>   @Test
>   def testUpsertsWithFilter(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.enableObjectReuse()
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val t = StreamTestData.get3TupleDataStream(env)
>   .assignAscendingTimestamps(_._1.toLong)
>   .toTable(tEnv, 'id, 'num, 'text)
> t.select('text.charLength() as 'len)
>   .groupBy('len)
>   .select('len, 'len.count as 'cnt)
>   // .where('cnt < 7)
>   .writeToSink(new TestUpsertSink(Array("len"), false))
> env.execute()
> val results = RowCollector.getAndClearValues
> val retracted = RowCollector.upsertResults(results, Array(0)).sorted
> val expectedWithoutFilter = List(
>   "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
> val expectedWithFilter = List(
> "2,1", "5,1", "11,1", "14,1", "25,1").sorted
> assertEquals(expectedWithoutFilter, retracted)
> // assertEquals(expectedWithFilter, retracted)
>   }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows 
> that do not fulfill the condition are removed from the result. However, the 
> filter only removes the upsert message such that the previous version remains 
> in the result.
> One solution could be to make a filter aware of the update semantics (retract 
> or upsert) and convert the upsert message into a delete message if the 
> predicate evaluates to false.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9528) Incorrect results: Filter does not treat Upsert messages correctly.

2018-06-05 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16501716#comment-16501716
 ] 

Fabian Hueske commented on FLINK-9528:
--

It is correct that a Filter might emit duplicate delete messages, but these 
should not affect the correctness (it is not possible to remove something that 
is not there). I think we can implement our internal operator such that they 
tolerate such deletes. However, I agree that sinks might need to be aware that 
we forward duplicate delete messages.

If we don't do it in the Filter, we would need a retraction-to-upsert converter 
before the {{UpsertTableSink}}, which is even more expensive, since we need to 
materialize the table again and also handle twice the amount of messages 
(delete + insert).


> Incorrect results: Filter does not treat Upsert messages correctly.
> ---
>
> Key: FLINK-9528
> URL: https://issues.apache.org/jira/browse/FLINK-9528
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.3, 1.5.0, 1.4.2
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Critical
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between 
> retraction and upsert mode. A Calc looks at record (regardless of its update 
> semantics) and either discard it (predicate evaluates to false) or pass it on 
> (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for 
> upsert messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the 
> problem:
> {code:java}
>   @Test
>   def testUpsertsWithFilter(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.enableObjectReuse()
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val t = StreamTestData.get3TupleDataStream(env)
>   .assignAscendingTimestamps(_._1.toLong)
>   .toTable(tEnv, 'id, 'num, 'text)
> t.select('text.charLength() as 'len)
>   .groupBy('len)
>   .select('len, 'len.count as 'cnt)
>   // .where('cnt < 7)
>   .writeToSink(new TestUpsertSink(Array("len"), false))
> env.execute()
> val results = RowCollector.getAndClearValues
> val retracted = RowCollector.upsertResults(results, Array(0)).sorted
> val expectedWithoutFilter = List(
>   "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
> val expectedWithFilter = List(
> "2,1", "5,1", "11,1", "14,1", "25,1").sorted
> assertEquals(expectedWithoutFilter, retracted)
> // assertEquals(expectedWithFilter, retracted)
>   }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows 
> that do not fulfill the condition are removed from the result. However, the 
> filter only removes the upsert message such that the previous version remains 
> in the result.
> One solution could be to make a filter aware of the update semantics (retract 
> or upsert) and convert the upsert message into a delete message if the 
> predicate evaluates to false.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9528) Incorrect results: Filter does not treat Upsert messages correctly.

2018-06-05 Thread Hequn Cheng (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9528?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16501601#comment-16501601
 ] 

Hequn Cheng commented on FLINK-9528:


Hi [~fhueske], Thanks for reporting this. What about setting 
needsUpdatesAsRetraction to true for {{Filter}}? If we let the filter convert 
the upsert message into a delete message by itself, we need to keep a state in 
{{Filter}} so that the {{Filter}} will not send redundant delete messages. It 
is a little expensive. The redundant delete messages will cause wrong result in 
some cases.

> Incorrect results: Filter does not treat Upsert messages correctly.
> ---
>
> Key: FLINK-9528
> URL: https://issues.apache.org/jira/browse/FLINK-9528
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.3, 1.5.0, 1.4.2
>Reporter: Fabian Hueske
>Assignee: Hequn Cheng
>Priority: Critical
>
> Currently, Filters (i.e., Calcs with predicates) do not distinguish between 
> retraction and upsert mode. A Calc looks at record (regardless of its update 
> semantics) and either discard it (predicate evaluates to false) or pass it on 
> (predicate evaluates to true).
> This works fine for messages with retraction semantics but is not correct for 
> upsert messages.
> The following test case (can be pasted into {{TableSinkITCase}}) shows the 
> problem:
> {code:java}
>   @Test
>   def testUpsertsWithFilter(): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.getConfig.enableObjectReuse()
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val t = StreamTestData.get3TupleDataStream(env)
>   .assignAscendingTimestamps(_._1.toLong)
>   .toTable(tEnv, 'id, 'num, 'text)
> t.select('text.charLength() as 'len)
>   .groupBy('len)
>   .select('len, 'len.count as 'cnt)
>   // .where('cnt < 7)
>   .writeToSink(new TestUpsertSink(Array("len"), false))
> env.execute()
> val results = RowCollector.getAndClearValues
> val retracted = RowCollector.upsertResults(results, Array(0)).sorted
> val expectedWithoutFilter = List(
>   "2,1", "5,1", "9,9", "10,7", "11,1", "14,1", "25,1").sorted
> val expectedWithFilter = List(
> "2,1", "5,1", "11,1", "14,1", "25,1").sorted
> assertEquals(expectedWithoutFilter, retracted)
> // assertEquals(expectedWithFilter, retracted)
>   }
> {code}
> When we add a filter on the aggregation result, we would expect that all rows 
> that do not fulfill the condition are removed from the result. However, the 
> filter only removes the upsert message such that the previous version remains 
> in the result.
> One solution could be to make a filter aware of the update semantics (retract 
> or upsert) and convert the upsert message into a delete message if the 
> predicate evaluates to false.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)