Re: [PR] [FLINK-33689][table-runtime] Fix JsonObjectAggFunction can't retract records when enabling LocalGlobal. [flink]

2024-01-15 Thread via GitHub


xishuaidelin commented on code in PR #23827:
URL: https://github.com/apache/flink/pull/23827#discussion_r1452864030


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala:
##
@@ -1916,4 +1916,44 @@ class AggregateITCase(aggMode: AggMode, miniBatch: 
MiniBatchMode, backend: State
   )
 assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
   }
+
+  @TestTemplate
+  def testGroupJsonObjectAggWithRetract(): Unit = {
+val data = new mutable.MutableList[(Long, String, Long)]
+data.+=((2L, "Hallo", 2L))

Review Comment:
   Hi lincoln, Thanks for your response. I have added the tests with nested agg 
and simplify the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33689][table-runtime] Fix JsonObjectAggFunction can't retract records when enabling LocalGlobal. [flink]

2024-01-15 Thread via GitHub


xishuaidelin commented on code in PR #23827:
URL: https://github.com/apache/flink/pull/23827#discussion_r1452862528


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java:
##
@@ -128,6 +137,15 @@ public void merge(Accumulator acc, Iterable 
others) throws Exceptio
 assertKeyNotPresent(acc, key);
 acc.map.put(key, other.map.get(key));
 }
+for (final StringData key : other.retractMap.keys()) {

Review Comment:
   Hi xuyang, Thanks for your comments. if +U is eliminated by -U, It means two 
messages has same key and value. It is an expected behaviour. However, you 
reminded me that the implementation here needs to be changed. Retraction is 
supposed to be done first in merge function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33689][table-runtime] Fix JsonObjectAggFunction can't retract records when enabling LocalGlobal. [flink]

2024-01-15 Thread via GitHub


xishuaidelin commented on code in PR #23827:
URL: https://github.com/apache/flink/pull/23827#discussion_r1451985731


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java:
##
@@ -128,6 +137,15 @@ public void merge(Accumulator acc, Iterable 
others) throws Exceptio
 assertKeyNotPresent(acc, key);
 acc.map.put(key, other.map.get(key));
 }
+for (final StringData key : other.retractMap.keys()) {

Review Comment:
   Hi xuyang, thanks for your comments. It considers both the key and the 
corresponding value in the comparison. Therefore, I don't foresee any issues 
arising during the merge stage. However, the issue you mentioned could 
potentially occur at the local stage, such as within the retract function. I 
would fix it in retract function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33689][table-runtime] Fix JsonObjectAggFunction can't retract records when enabling LocalGlobal. [flink]

2024-01-14 Thread via GitHub


xishuaidelin commented on code in PR #23827:
URL: https://github.com/apache/flink/pull/23827#discussion_r1451985731


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java:
##
@@ -128,6 +137,15 @@ public void merge(Accumulator acc, Iterable 
others) throws Exceptio
 assertKeyNotPresent(acc, key);
 acc.map.put(key, other.map.get(key));
 }
+for (final StringData key : other.retractMap.keys()) {

Review Comment:
   Hi xuyang, thanks for your comments. It considers both the key and the 
corresponding value in the comparison. Therefore, I don't foresee any issues 
arising during the merge stage. However, the issue you mentioned could 
potentially occur at the local stage, such as within the retract function. I 
would fix it in retract function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33689][table-runtime] Fix JsonObjectAggFunction can't retract records when enabling LocalGlobal. [flink]

2023-12-18 Thread via GitHub


lincoln-lil commented on code in PR #23827:
URL: https://github.com/apache/flink/pull/23827#discussion_r1430848529


##
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala:
##
@@ -1916,4 +1916,44 @@ class AggregateITCase(aggMode: AggMode, miniBatch: 
MiniBatchMode, backend: State
   )
 assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted)
   }
+
+  @TestTemplate
+  def testGroupJsonObjectAggWithRetract(): Unit = {
+val data = new mutable.MutableList[(Long, String, Long)]
+data.+=((2L, "Hallo", 2L))

Review Comment:
   This can be simplified, e.g., use a for loop



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33689][table-runtime] Fix JsonObjectAggFunction can't retract records when enabling LocalGlobal. [flink]

2023-12-17 Thread via GitHub


xuyangzhong commented on code in PR #23827:
URL: https://github.com/apache/flink/pull/23827#discussion_r1429367900


##
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java:
##
@@ -128,6 +137,15 @@ public void merge(Accumulator acc, Iterable 
others) throws Exceptio
 assertKeyNotPresent(acc, key);
 acc.map.put(key, other.map.get(key));
 }
+for (final StringData key : other.retractMap.keys()) {

Review Comment:
   IIUC, for retract streams, if -U comes before +U, +U might also be 
eliminated?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33689][table-runtime] Fix JsonObjectAggFunction can't retract records when enabling LocalGlobal. [flink]

2023-11-29 Thread via GitHub


xishuaidelin commented on PR #23827:
URL: https://github.com/apache/flink/pull/23827#issuecomment-1831595252

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33689][table-runtime] Fix JsonObjectAggFunction can't retract records when enabling LocalGlobal. [flink]

2023-11-28 Thread via GitHub


flinkbot commented on PR #23827:
URL: https://github.com/apache/flink/pull/23827#issuecomment-1831347603

   
   ## CI report:
   
   * 14941540a4d1c8b4e6b278bc0f30dd14f3d27515 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org