Re: [PR] [FLINK-33689][table-runtime] Fix JsonObjectAggFunction can't retract records when enabling LocalGlobal. [flink]
xishuaidelin commented on code in PR #23827: URL: https://github.com/apache/flink/pull/23827#discussion_r1452864030 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala: ## @@ -1916,4 +1916,44 @@ class AggregateITCase(aggMode: AggMode, miniBatch: MiniBatchMode, backend: State ) assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted) } + + @TestTemplate + def testGroupJsonObjectAggWithRetract(): Unit = { +val data = new mutable.MutableList[(Long, String, Long)] +data.+=((2L, "Hallo", 2L)) Review Comment: Hi lincoln, Thanks for your response. I have added the tests with nested agg and simplify the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33689][table-runtime] Fix JsonObjectAggFunction can't retract records when enabling LocalGlobal. [flink]
xishuaidelin commented on code in PR #23827: URL: https://github.com/apache/flink/pull/23827#discussion_r1452862528 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java: ## @@ -128,6 +137,15 @@ public void merge(Accumulator acc, Iterable others) throws Exceptio assertKeyNotPresent(acc, key); acc.map.put(key, other.map.get(key)); } +for (final StringData key : other.retractMap.keys()) { Review Comment: Hi xuyang, Thanks for your comments. if +U is eliminated by -U, It means two messages has same key and value. It is an expected behaviour. However, you reminded me that the implementation here needs to be changed. Retraction is supposed to be done first in merge function. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33689][table-runtime] Fix JsonObjectAggFunction can't retract records when enabling LocalGlobal. [flink]
xishuaidelin commented on code in PR #23827: URL: https://github.com/apache/flink/pull/23827#discussion_r1451985731 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java: ## @@ -128,6 +137,15 @@ public void merge(Accumulator acc, Iterable others) throws Exceptio assertKeyNotPresent(acc, key); acc.map.put(key, other.map.get(key)); } +for (final StringData key : other.retractMap.keys()) { Review Comment: Hi xuyang, thanks for your comments. It considers both the key and the corresponding value in the comparison. Therefore, I don't foresee any issues arising during the merge stage. However, the issue you mentioned could potentially occur at the local stage, such as within the retract function. I would fix it in retract function. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33689][table-runtime] Fix JsonObjectAggFunction can't retract records when enabling LocalGlobal. [flink]
xishuaidelin commented on code in PR #23827: URL: https://github.com/apache/flink/pull/23827#discussion_r1451985731 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java: ## @@ -128,6 +137,15 @@ public void merge(Accumulator acc, Iterable others) throws Exceptio assertKeyNotPresent(acc, key); acc.map.put(key, other.map.get(key)); } +for (final StringData key : other.retractMap.keys()) { Review Comment: Hi xuyang, thanks for your comments. It considers both the key and the corresponding value in the comparison. Therefore, I don't foresee any issues arising during the merge stage. However, the issue you mentioned could potentially occur at the local stage, such as within the retract function. I would fix it in retract function. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33689][table-runtime] Fix JsonObjectAggFunction can't retract records when enabling LocalGlobal. [flink]
lincoln-lil commented on code in PR #23827: URL: https://github.com/apache/flink/pull/23827#discussion_r1430848529 ## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala: ## @@ -1916,4 +1916,44 @@ class AggregateITCase(aggMode: AggMode, miniBatch: MiniBatchMode, backend: State ) assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted) } + + @TestTemplate + def testGroupJsonObjectAggWithRetract(): Unit = { +val data = new mutable.MutableList[(Long, String, Long)] +data.+=((2L, "Hallo", 2L)) Review Comment: This can be simplified, e.g., use a for loop -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33689][table-runtime] Fix JsonObjectAggFunction can't retract records when enabling LocalGlobal. [flink]
xuyangzhong commented on code in PR #23827: URL: https://github.com/apache/flink/pull/23827#discussion_r1429367900 ## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/JsonObjectAggFunction.java: ## @@ -128,6 +137,15 @@ public void merge(Accumulator acc, Iterable others) throws Exceptio assertKeyNotPresent(acc, key); acc.map.put(key, other.map.get(key)); } +for (final StringData key : other.retractMap.keys()) { Review Comment: IIUC, for retract streams, if -U comes before +U, +U might also be eliminated? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33689][table-runtime] Fix JsonObjectAggFunction can't retract records when enabling LocalGlobal. [flink]
xishuaidelin commented on PR #23827: URL: https://github.com/apache/flink/pull/23827#issuecomment-1831595252 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33689][table-runtime] Fix JsonObjectAggFunction can't retract records when enabling LocalGlobal. [flink]
flinkbot commented on PR #23827: URL: https://github.com/apache/flink/pull/23827#issuecomment-1831347603 ## CI report: * 14941540a4d1c8b4e6b278bc0f30dd14f3d27515 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org