[GitHub] [flink] flinkbot commented on pull request #22702: [FLINK-32159] AbstractColumnReader throws NPE

2023-06-02 Thread via GitHub


flinkbot commented on PR #22702:
URL: https://github.com/apache/flink/pull/22702#issuecomment-1574604531

   
   ## CI report:
   
   * 840fae1396f01bc55a231f2dacd9597cd7e31c25 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32159) Hudi Source throws NPE

2023-06-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32159:
---
Labels: pull-request-available  (was: )

> Hudi Source throws NPE
> --
>
> Key: FLINK-32159
> URL: https://issues.apache.org/jira/browse/FLINK-32159
> Project: Flink
>  Issue Type: Bug
>  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>Affects Versions: 1.15.0, 1.16.0, 1.17.0, 1.18.0
>Reporter: Bo Cui
>Priority: Major
>  Labels: pull-request-available
> Attachments: image-2023-05-23-14-45-29-151.png
>
>
> spark/hive write hudi, and flink read hudi and job failed. because 
> !image-2023-05-23-14-45-29-151.png!
>  
> The null judgment logic should be added to AbstractColumnReader#readToVector
> https://github.com/apache/flink/blob/119b8c584dc865ee8a40a5c6410dddf8b36bac5a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/AbstractColumnReader.java#LL155C19-L155C20



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] cuibo01 opened a new pull request, #22702: [FLINK-32159] AbstractColumnReader throws NPE

2023-06-02 Thread via GitHub


cuibo01 opened a new pull request, #22702:
URL: https://github.com/apache/flink/pull/22702

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32243) Bump okhttp version to 4.11.0

2023-06-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32243:
---
Labels: pull-request-available  (was: )

> Bump okhttp version to 4.11.0
> -
>
> Key: FLINK-32243
> URL: https://issues.apache.org/jira/browse/FLINK-32243
> Project: Flink
>  Issue Type: Improvement
>  Components: Kubernetes Operator
>Reporter: ConradJam
>Priority: Major
>  Labels: pull-request-available
>
> Bump kubernetes operator okhttp  version to 4.11.0



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] czy006 opened a new pull request, #610: [FLINK-32243] Bump okhttp version to 4.11.0

2023-06-02 Thread via GitHub


czy006 opened a new pull request, #610:
URL: https://github.com/apache/flink-kubernetes-operator/pull/610

   Bump okhttp version to 4.11.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] xccui commented on a diff in pull request #609: [FLINK-32171] Add PostStart hook to flink k8s operator helm

2023-06-02 Thread via GitHub


xccui commented on code in PR #609:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/609#discussion_r1214659153


##
helm/flink-kubernetes-operator/values.yaml:
##
@@ -183,3 +183,7 @@ operatorHealth:
   startupProbe:
 failureThreshold: 30
 periodSeconds: 10
+
+# Set exec command for the postStart hook of the main container
+postStart:
+  execCommand: []

Review Comment:
   Sounds good. I'll make an update



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32171) Add PostStart hook to flink k8s operator helm

2023-06-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32171:
---
Labels: pull-request-available  (was: )

> Add PostStart hook to flink k8s operator helm
> -
>
> Key: FLINK-32171
> URL: https://issues.apache.org/jira/browse/FLINK-32171
> Project: Flink
>  Issue Type: New Feature
>  Components: Kubernetes Operator
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Minor
>  Labels: pull-request-available
> Fix For: kubernetes-operator-1.6.0, kubernetes-operator-1.5.1
>
>
> I feel it will be convenient to add a PostStart hook optional config to flink 
> k8s operator helm (e.g. when users need to download some Flink plugins).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #609: [FLINK-32171] Add PostStart hook to flink k8s operator helm

2023-06-02 Thread via GitHub


gyfora commented on code in PR #609:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/609#discussion_r1214506467


##
helm/flink-kubernetes-operator/values.yaml:
##
@@ -183,3 +183,7 @@ operatorHealth:
   startupProbe:
 failureThreshold: 30
 periodSeconds: 10
+
+# Set exec command for the postStart hook of the main container
+postStart:
+  execCommand: []

Review Comment:
   I think it would be more intuitive to provide the entire yaml content of the 
`postStart` section instead of `execCommand` which would require extra 
documentation and would limit the usage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-jdbc] matriv commented on pull request #29: [FLINK-31551] Add support for CrateDB

2023-06-02 Thread via GitHub


matriv commented on PR #29:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/29#issuecomment-1573891656

   Thank you all involved in the PR and for all the iterations!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-jdbc] snuyanzin commented on pull request #29: [FLINK-31551] Add support for CrateDB

2023-06-02 Thread via GitHub


snuyanzin commented on PR #29:
URL: 
https://github.com/apache/flink-connector-jdbc/pull/29#issuecomment-1573890436

   thanks for the contribution @matriv 
   thanks for the review @libenchao , @eskabetxe 
   
   i'm about to merge it this weekend


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-32245) NonDeterministicTests #testTemporalFunctionsInBatchMode failure masked due to incorrect test initialization

2023-06-02 Thread lincoln lee (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728734#comment-17728734
 ] 

lincoln lee commented on FLINK-32245:
-

fixed in master: bfe49b2973d4ffc8f7404a376cab1e419b53406a

> NonDeterministicTests #testTemporalFunctionsInBatchMode failure masked due to 
> incorrect test initialization
> ---
>
> Key: FLINK-32245
> URL: https://issues.apache.org/jira/browse/FLINK-32245
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> The test case NonDeterministicTests #testTemporalFunctionsInBatchMode has 
> been consistently failing due to incorrect test initialization.
>  
> However, this failure has been masked because the test class name ends with 
> "Tests", causing the CI to skip the test case, which has been further 
> validated by searching through the historical logs of the CI.
> This issue needs to be addressed, and the test case should be executed to 
> ensure proper testing. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-32245) NonDeterministicTests #testTemporalFunctionsInBatchMode failure masked due to incorrect test initialization

2023-06-02 Thread lincoln lee (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

lincoln lee closed FLINK-32245.
---
Resolution: Fixed

> NonDeterministicTests #testTemporalFunctionsInBatchMode failure masked due to 
> incorrect test initialization
> ---
>
> Key: FLINK-32245
> URL: https://issues.apache.org/jira/browse/FLINK-32245
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> The test case NonDeterministicTests #testTemporalFunctionsInBatchMode has 
> been consistently failing due to incorrect test initialization.
>  
> However, this failure has been masked because the test class name ends with 
> "Tests", causing the CI to skip the test case, which has been further 
> validated by searching through the historical logs of the CI.
> This issue needs to be addressed, and the test case should be executed to 
> ensure proper testing. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] lincoln-lil merged pull request #22701: [FLINK-32245][table-planner] Rename NonDeterministicTests to NonDeterministicTest and fix incorrect test initialization

2023-06-02 Thread via GitHub


lincoln-lil merged PR #22701:
URL: https://github.com/apache/flink/pull/22701


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] ottomata commented on pull request #604: [FLINK-32041] - Allow operator to manage leases when using watchNamespaces

2023-06-02 Thread via GitHub


ottomata commented on PR #604:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/604#issuecomment-1573642377

   Okay, thank you!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] luoyuxia commented on a diff in pull request #22593: [FLINK-32053][table-planner] Introduce StateMetadata to ExecNode to support configure operator-level state TTL via CompiledPlan

2023-06-02 Thread via GitHub


luoyuxia commented on code in PR #22593:
URL: https://github.com/apache/flink/pull/22593#discussion_r1214112329


##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/StateMetadata.java:
##
@@ -84,22 +84,23 @@ public StateMetadata(
 stateIndex,
 TimeUtils.parseDuration(
 Preconditions.checkNotNull(stateTtl, "state ttl should 
not be null")),
-Preconditions.checkNotNull(stateName, "state name should not 
be null"));
+stateName);
 }
 
-public StateMetadata(int stateIndex, @Nonnull Duration stateTtl, @Nonnull 
String stateName) {
+public StateMetadata(int stateIndex, Duration stateTtl, String stateName) {
 Preconditions.checkArgument(stateIndex >= 0, "state index should start 
from 0");
 this.stateIndex = stateIndex;
-this.stateTtl = stateTtl;
-this.stateName = stateName;
+this.stateTtl = Preconditions.checkNotNull(stateTtl, "state ttl should 
not be null");
+this.stateName = Preconditions.checkNotNull(stateName, "state name 
should not be null");
 }
 
 public int getStateIndex() {
 return stateIndex;
 }
 
-public long getStateTtl() {
-return stateTtl.toMillis();
+@JsonGetter(value = "ttl")
+public String getStateTtl() {
+return TimeUtils.formatWithHighestUnit(stateTtl);
 }
 
 public String getStateName() {

Review Comment:
   Can this method removed?



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/TransformationsTest.java:
##
@@ -256,12 +276,17 @@ private static void checkUidModification(
 // Helper methods
 // 

 
-private static CompiledPlan minimalPlan(TableEnvironment env) {
+private static CompiledPlan planFromFlink1_18(TableEnvironment env) {

Review Comment:
   rename to `planFromCurrentFlink`?
   As release moves on, the name `planFromFlink1_18` may be out of date.



##
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java:
##
@@ -332,7 +319,7 @@ protected Transformation createJoinTransformation(
 }
 }
 
-private Transformation createSyncLookupJoinWithState(
+protected Transformation createSyncLookupJoinWithState(

Review Comment:
   Is there any strong reason to make it proected and make 
`createSyncLookupJoinWithState`? 
   For me, it's not so much like a base method to be overrided by children 
classes at least for `BatchExecLookupJoin` since there won't be  
LookupJoinWithState in  `BatchExecLookupJoin` .
   
   If we do want to make it a   base method ,  I think we should throw an 
exception in default implementation to avoid the method be called by mistake 
but without specific implemtation.



##
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.java:
##
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.stream.jsonplan;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.utils.JsonPlanTestBase;
+import org.apache.flink.table.planner.utils.JsonTestUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Tests for configuring operator-level state TTL via {@link
+ * org.apache.flink.table.api.CompiledPlan}.
+ */
+public class ConfigureOperatorLevelStateTtlJsonITCase extends JsonPlanTestBase 
{
+
+@Test
+public void testDifferentStateTtlForDifferentOneInputOperator() throws 
Exception {
+String dataId =

[jira] [Commented] (FLINK-30629) ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable

2023-06-02 Thread Liu (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-30629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728714#comment-17728714
 ] 

Liu commented on FLINK-30629:
-

[~Sergey Nuyanzin] Thanks. From the log, we can see the logs in time order:
 # The dispatcher shuts down for that the client's heartbeat timeout.
 # The client begins to report its heartbeat.

The reason is that the client will report its heartbeat after calling the 
method waitUntilJobInitializationFinished. In this method, we try to get the 
job's status by waiting 

exponentially and it may take a while. There are two ways to fix the test:
 # Increase the client's timeout from 500 ms to 1 second or more.
 # In the method waitUntilJobInitializationFinished, try to get the job's 
status more frequently.

What do you think? cc [~xtsong] 

> ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat is unstable
> -
>
> Key: FLINK-30629
> URL: https://issues.apache.org/jira/browse/FLINK-30629
> Project: Flink
>  Issue Type: Bug
>  Components: Client / Job Submission
>Affects Versions: 1.17.0, 1.18.0
>Reporter: Xintong Song
>Assignee: Weijie Guo
>Priority: Critical
>  Labels: pull-request-available, test-stability
> Fix For: 1.17.0
>
> Attachments: ClientHeartbeatTestLog.txt, 
> logs-cron_azure-test_cron_azure_core-1685497478.zip
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=44690=logs=77a9d8e1-d610-59b3-fc2a-4766541e0e33=125e07e7-8de0-5c6c-a541-a567415af3ef=10819
> {code:java}
> Jan 11 04:32:39 [ERROR] Tests run: 3, Failures: 0, Errors: 1, Skipped: 0, 
> Time elapsed: 21.02 s <<< FAILURE! - in 
> org.apache.flink.client.ClientHeartbeatTest
> Jan 11 04:32:39 [ERROR] 
> org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat
>   Time elapsed: 9.157 s  <<< ERROR!
> Jan 11 04:32:39 java.lang.IllegalStateException: MiniCluster is not yet 
> running or has already been shut down.
> Jan 11 04:32:39   at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniCluster.getDispatcherGatewayFuture(MiniCluster.java:1044)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniCluster.runDispatcherCommand(MiniCluster.java:917)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniCluster.getJobStatus(MiniCluster.java:841)
> Jan 11 04:32:39   at 
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobStatus(MiniClusterJobClient.java:91)
> Jan 11 04:32:39   at 
> org.apache.flink.client.ClientHeartbeatTest.testJobRunningIfClientReportHeartbeat(ClientHeartbeatTest.java:79)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] yuchen-ecnu commented on pull request #22659: [FLINK-32186][runtime-web] Support subtask stack auto-search when red…

2023-06-02 Thread via GitHub


yuchen-ecnu commented on PR #22659:
URL: https://github.com/apache/flink/pull/22659#issuecomment-1573601846

   Hi @Myasuka, can you help me to review this PR? 
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier

2023-06-02 Thread via GitHub


TanYuxin-tyx commented on code in PR #22652:
URL: https://github.com/apache/flink/pull/22652#discussion_r1214225246


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClientTest.java:
##
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingBufferAccumulator;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingTierProducerAgent;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.TieredStorageTestUtils.generateRandomData;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+/** Tests for {@link TieredStorageProducerClient}. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class TieredStorageProducerClientTest {
+
+private static final int NUM_TOTAL_BUFFERS = 1000;
+
+private static final int NETWORK_BUFFER_SIZE = 1024;
+
+@Parameter public boolean isBroadcast;
+
+private NetworkBufferPool globalPool;
+
+@Parameters(name = "isBroadcast={0}")
+public static Collection parameters() {
+return Arrays.asList(false, true);
+}
+
+@BeforeEach
+void before() {
+globalPool = new NetworkBufferPool(NUM_TOTAL_BUFFERS, 
NETWORK_BUFFER_SIZE);
+}
+
+@AfterEach
+void after() {
+globalPool.destroy();
+}
+
+@TestTemplate
+void testWriteRecordsToEmptyStorageTiers() {
+int numSubpartitions = 10;
+int bufferSize = 1024;
+Random random = new Random();
+
+TieredStorageProducerClient tieredStorageProducerClient =
+createTieredStorageProducerClient(numSubpartitions, 
Collections.emptyList());
+assertThatThrownBy(
+() ->
+tieredStorageProducerClient.write(
+generateRandomData(bufferSize, random),
+new TieredStorageSubpartitionId(0),
+Buffer.DataType.DATA_BUFFER,
+isBroadcast))
+.isInstanceOf(RuntimeException.class)
+.hasMessageContaining("Failed to choose a storage tier");
+}
+
+@TestTemplate
+void testWriteRecords() throws IOException {
+int numSubpartitions = 10;
+int bufferSize = 1024;
+int maxNumToWriteRecordsPerSubpartition = 1000;
+Random random = new Random();
+
+TestingTierProducerAgent tierProducerAgent = new 
TestingTierProducerAgent();
+tierProducerAgent.setTryStartNewSegmentReturnValueSupplier(() -> true);
+tierProducerAgent.setTryWriteReturnValueSupplier(() -> new Boolean[] 
{false, true});
+
+TieredStorageProducerClient tieredStorageProducerClient =
+createTieredStorageProducerClient(
+numSubpartitions, 
Collections.singletonList(tierProducerAgent));
+
+int numWriteRecords = 0;
+for (int j = 0; j < numSubpartitions; j++) {
+for 

[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier

2023-06-02 Thread via GitHub


TanYuxin-tyx commented on code in PR #22652:
URL: https://github.com/apache/flink/pull/22652#discussion_r1214224183


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClientTest.java:
##
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingBufferAccumulator;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingTierProducerAgent;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.TieredStorageTestUtils.generateRandomData;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+/** Tests for {@link TieredStorageProducerClient}. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class TieredStorageProducerClientTest {
+
+private static final int NUM_TOTAL_BUFFERS = 1000;
+
+private static final int NETWORK_BUFFER_SIZE = 1024;
+
+@Parameter public boolean isBroadcast;
+
+private NetworkBufferPool globalPool;
+
+@Parameters(name = "isBroadcast={0}")
+public static Collection parameters() {
+return Arrays.asList(false, true);
+}
+
+@BeforeEach
+void before() {
+globalPool = new NetworkBufferPool(NUM_TOTAL_BUFFERS, 
NETWORK_BUFFER_SIZE);
+}
+
+@AfterEach
+void after() {
+globalPool.destroy();
+}
+
+@TestTemplate
+void testWriteRecordsToEmptyStorageTiers() {
+int numSubpartitions = 10;
+int bufferSize = 1024;
+Random random = new Random();
+
+TieredStorageProducerClient tieredStorageProducerClient =
+createTieredStorageProducerClient(numSubpartitions, 
Collections.emptyList());
+assertThatThrownBy(
+() ->
+tieredStorageProducerClient.write(
+generateRandomData(bufferSize, random),
+new TieredStorageSubpartitionId(0),
+Buffer.DataType.DATA_BUFFER,
+isBroadcast))
+.isInstanceOf(RuntimeException.class)
+.hasMessageContaining("Failed to choose a storage tier");
+}
+
+@TestTemplate
+void testWriteRecords() throws IOException {
+int numSubpartitions = 10;
+int bufferSize = 1024;
+int maxNumToWriteRecordsPerSubpartition = 1000;
+Random random = new Random();
+
+TestingTierProducerAgent tierProducerAgent = new 
TestingTierProducerAgent();
+tierProducerAgent.setTryStartNewSegmentReturnValueSupplier(() -> true);
+tierProducerAgent.setTryWriteReturnValueSupplier(() -> new Boolean[] 
{false, true});
+
+TieredStorageProducerClient tieredStorageProducerClient =
+createTieredStorageProducerClient(
+numSubpartitions, 
Collections.singletonList(tierProducerAgent));
+
+int numWriteRecords = 0;
+for (int j = 0; j < numSubpartitions; j++) {
+for 

[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier

2023-06-02 Thread via GitHub


TanYuxin-tyx commented on code in PR #22652:
URL: https://github.com/apache/flink/pull/22652#discussion_r1214223911


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTierProducerAgent.java:
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+import java.util.function.Supplier;
+
+/** Test implementation for {@link TierProducerAgent}. */
+public class TestingTierProducerAgent implements TierProducerAgent {
+
+private Supplier tryStartNewSegmentReturnValueSupplier;
+
+private Supplier tryWriteReturnValueSupplier;
+
+private int numTotalReceivedBuffers;
+
+private int tryWriteCounter;
+
+private boolean isClosed;
+
+public void setTryStartNewSegmentReturnValueSupplier(
+Supplier tryStartNewSegmentReturnValueSupplier) {
+this.tryStartNewSegmentReturnValueSupplier = 
tryStartNewSegmentReturnValueSupplier;
+}
+
+public void setTryWriteReturnValueSupplier(Supplier 
tryWriteReturnValueSupplier) {
+this.tryWriteReturnValueSupplier = tryWriteReturnValueSupplier;
+}
+
+public int numTotalReceivedBuffers() {
+return numTotalReceivedBuffers;
+}
+
+public boolean isClosed() {
+return isClosed;
+}
+
+@Override
+public boolean tryStartNewSegment(TieredStorageSubpartitionId 
subpartitionId, int segmentId) {
+return tryStartNewSegmentReturnValueSupplier.get();
+}
+
+@Override
+public boolean tryWrite(TieredStorageSubpartitionId subpartitionId, Buffer 
finishedBuffer) {
+boolean isSuccess = 
tryWriteReturnValueSupplier.get()[(tryWriteCounter++) % 2];
+if (isSuccess) {
+numTotalReceivedBuffers++;
+}
+return isSuccess;

Review Comment:
   Fixed.



##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTierProducerAgent.java:
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+import java.util.function.Supplier;
+
+/** Test implementation for {@link TierProducerAgent}. */
+public class TestingTierProducerAgent implements TierProducerAgent {

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier

2023-06-02 Thread via GitHub


TanYuxin-tyx commented on code in PR #22652:
URL: https://github.com/apache/flink/pull/22652#discussion_r1214223698


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTierProducerAgent.java:
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+import java.util.function.Supplier;
+
+/** Test implementation for {@link TierProducerAgent}. */
+public class TestingTierProducerAgent implements TierProducerAgent {
+
+private Supplier tryStartNewSegmentReturnValueSupplier;
+
+private Supplier tryWriteReturnValueSupplier;

Review Comment:
   Fixed.



##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTierProducerAgent.java:
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+import java.util.function.Supplier;
+
+/** Test implementation for {@link TierProducerAgent}. */
+public class TestingTierProducerAgent implements TierProducerAgent {
+
+private Supplier tryStartNewSegmentReturnValueSupplier;
+
+private Supplier tryWriteReturnValueSupplier;
+
+private int numTotalReceivedBuffers;
+
+private int tryWriteCounter;
+
+private boolean isClosed;
+
+public void setTryStartNewSegmentReturnValueSupplier(
+Supplier tryStartNewSegmentReturnValueSupplier) {
+this.tryStartNewSegmentReturnValueSupplier = 
tryStartNewSegmentReturnValueSupplier;
+}
+
+public void setTryWriteReturnValueSupplier(Supplier 
tryWriteReturnValueSupplier) {
+this.tryWriteReturnValueSupplier = tryWriteReturnValueSupplier;
+}
+
+public int numTotalReceivedBuffers() {
+return numTotalReceivedBuffers;
+}
+
+public boolean isClosed() {
+return isClosed;
+}
+
+@Override
+public boolean tryStartNewSegment(TieredStorageSubpartitionId 
subpartitionId, int segmentId) {
+return tryStartNewSegmentReturnValueSupplier.get();
+}
+
+@Override
+public boolean tryWrite(TieredStorageSubpartitionId subpartitionId, Buffer 
finishedBuffer) {
+boolean isSuccess = 
tryWriteReturnValueSupplier.get()[(tryWriteCounter++) % 2];
+if (isSuccess) {
+numTotalReceivedBuffers++;
+}
+return isSuccess;
+}
+
+@Override
+public void close() {
+this.isClosed = true;

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier

2023-06-02 Thread via GitHub


TanYuxin-tyx commented on code in PR #22652:
URL: https://github.com/apache/flink/pull/22652#discussion_r1214223224


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTierProducerAgent.java:
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+import java.util.function.Supplier;
+
+/** Test implementation for {@link TierProducerAgent}. */
+public class TestingTierProducerAgent implements TierProducerAgent {
+
+private Supplier tryStartNewSegmentReturnValueSupplier;

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32247) Normal group by with time attributes after a window group by is interpreted as GlobalWindowAggregate

2023-06-02 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren updated FLINK-32247:
--
Description: 
Considering a SQL statement below:
{code:java}
CREATE TABLE source1 (
`id` BIGINT,
`item` STRING,
`price` DOUBLE,
`ts` STRING,
`rowtime` AS TO_TIMESTAMP(`ts`),
WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND) 
WITH (
'connector' = 'datagen'
);

SELECT `window_start`, `window_end`, `window_time`, COUNT(*) FROM (
SELECT `window_start`, `window_end`, `window_time`, `item`, SUM(`price`) AS 
`price_sum` FROM
TABLE (TUMBLE(TABLE source1, DESCRIPTOR(`rowtime`), INTERVAL '1' 
MINUTES))
GROUP BY `window_start`, `window_end`, `window_time`, `item`) 
GROUP BY `window_start`, `window_end`, `window_time`; /* Use time attributes 
defined in the previous window aggregation as grouping keys */{code}
which should be a group aggregation after a window aggregation, but the planner 
is interpreting the latter aggregation as a GroupWindowAggregation:
{code:java}
== Optimized Physical Plan ==
Calc(select=[window_start, window_end, window_time, EXPR$3])
+- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[1 min])], 
select=[COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, end('w$) AS 
window_end, rowtime('w$) AS window_time])
   +- Exchange(distribution=[single])
      +- LocalWindowAggregate(window=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[1 min])], select=[COUNT(*) AS count1$0, 
slice_end('w$) AS $window_end])
         +- Calc(select=[window_start, window_end, window_time])
            +- GlobalWindowAggregate(groupBy=[item], 
window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[item, start('w$) 
AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
               +- Exchange(distribution=[hash[item]])
                  +- LocalWindowAggregate(groupBy=[item], 
window=[TUMBLE(time_col=[rowtime], size=[1 min])], select=[item, slice_end('w$) 
AS $slice_end])
                     +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 5000:INTERVAL SECOND)])
                        +- Calc(select=[item, price, TO_TIMESTAMP(ts) AS 
rowtime])
                           +- TableSourceScan(table=[[default_catalog, 
default_database, source1]], fields=[id, item, price, ts]) {code}
The trick is that the latter group aggregation uses time attributes defined in 
the previous window aggregation as grouping keys.

 

  was:
Considering a SQL statement below:
{code:java}
CREATE TABLE source1 (
`id` BIGINT,
`item` STRING,
`price` DOUBLE,
`ts` STRING,
`rowtime` AS TO_TIMESTAMP(`ts`),
WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND) 
WITH (
'connector' = 'datagen'
);

SELECT `window_start`, `window_end`, `window_time`, COUNT(*) FROM (
SELECT `window_start`, `window_end`, `window_time`, `item`, SUM(`price`) AS 
`price_sum` FROM
TABLE (TUMBLE(TABLE source1, DESCRIPTOR(`rowtime`), INTERVAL '1' 
MINUTES))
GROUP BY `window_start`, `window_end`, `window_time`, `item`) 
GROUP BY `window_start`, `window_end`, `window_time`; /* Use time attributes 
defined in the previous window aggregation as grouping keys */{code}
which should be a group aggregation after a windowed aggregation, but the 
planner is interpreting the latter aggregation as a GroupWindowAggregation:
{code:java}
== Optimized Physical Plan ==
Calc(select=[window_start, window_end, window_time, EXPR$3])
+- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[1 min])], 
select=[COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, end('w$) AS 
window_end, rowtime('w$) AS window_time])
   +- Exchange(distribution=[single])
      +- LocalWindowAggregate(window=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[1 min])], select=[COUNT(*) AS count1$0, 
slice_end('w$) AS $window_end])
         +- Calc(select=[window_start, window_end, window_time])
            +- GlobalWindowAggregate(groupBy=[item], 
window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[item, start('w$) 
AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
               +- Exchange(distribution=[hash[item]])
                  +- LocalWindowAggregate(groupBy=[item], 
window=[TUMBLE(time_col=[rowtime], size=[1 min])], select=[item, slice_end('w$) 
AS $slice_end])
                     +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 5000:INTERVAL SECOND)])
                        +- Calc(select=[item, price, TO_TIMESTAMP(ts) AS 
rowtime])
                           +- TableSourceScan(table=[[default_catalog, 
default_database, source1]], fields=[id, item, price, ts]) {code}
The trick is that the latter group aggregation uses time attributes defined in 
the previous window aggregation as grouping keys.

 


> Normal group by with time attributes after a window group by 

[jira] [Commented] (FLINK-32247) Normal group by with time attributes after a window group by is interpreted as GlobalWindowAggregate

2023-06-02 Thread Qingsheng Ren (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728674#comment-17728674
 ] 

Qingsheng Ren commented on FLINK-32247:
---

Not sure if this is by-design or a bug in planner

> Normal group by with time attributes after a window group by is interpreted 
> as GlobalWindowAggregate
> 
>
> Key: FLINK-32247
> URL: https://issues.apache.org/jira/browse/FLINK-32247
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.2, 1.18.0, 1.17.1
>Reporter: Qingsheng Ren
>Priority: Major
>
> Considering a SQL statement below:
> {code:java}
> CREATE TABLE source1 (
> `id` BIGINT,
> `item` STRING,
> `price` DOUBLE,
> `ts` STRING,
> `rowtime` AS TO_TIMESTAMP(`ts`),
> WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND) 
> WITH (
> 'connector' = 'datagen'
> );
> SELECT `window_start`, `window_end`, `window_time`, COUNT(*) FROM (
> SELECT `window_start`, `window_end`, `window_time`, `item`, SUM(`price`) 
> AS `price_sum` FROM
> TABLE (TUMBLE(TABLE source1, DESCRIPTOR(`rowtime`), INTERVAL '1' 
> MINUTES))
> GROUP BY `window_start`, `window_end`, `window_time`, `item`) 
> GROUP BY `window_start`, `window_end`, `window_time`; /* Use time attributes 
> defined in the previous window aggregation as grouping keys */{code}
> which should be a group aggregation after a window aggregation, but the 
> planner is interpreting the latter aggregation as a GroupWindowAggregation:
> {code:java}
> == Optimized Physical Plan ==
> Calc(select=[window_start, window_end, window_time, EXPR$3])
> +- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[1 
> min])], select=[COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, 
> end('w$) AS window_end, rowtime('w$) AS window_time])
>    +- Exchange(distribution=[single])
>       +- LocalWindowAggregate(window=[TUMBLE(win_start=[window_start], 
> win_end=[window_end], size=[1 min])], select=[COUNT(*) AS count1$0, 
> slice_end('w$) AS $window_end])
>          +- Calc(select=[window_start, window_end, window_time])
>             +- GlobalWindowAggregate(groupBy=[item], 
> window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[item, 
> start('w$) AS window_start, end('w$) AS window_end, rowtime('w$) AS 
> window_time])
>                +- Exchange(distribution=[hash[item]])
>                   +- LocalWindowAggregate(groupBy=[item], 
> window=[TUMBLE(time_col=[rowtime], size=[1 min])], select=[item, 
> slice_end('w$) AS $slice_end])
>                      +- WatermarkAssigner(rowtime=[rowtime], 
> watermark=[-(rowtime, 5000:INTERVAL SECOND)])
>                         +- Calc(select=[item, price, TO_TIMESTAMP(ts) AS 
> rowtime])
>                            +- TableSourceScan(table=[[default_catalog, 
> default_database, source1]], fields=[id, item, price, ts]) {code}
> The trick is that the latter group aggregation uses time attributes defined 
> in the previous window aggregation as grouping keys.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32247) Normal group by with time attributes after a window group by is interpreted as GlobalWindowAggregate

2023-06-02 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren updated FLINK-32247:
--
Description: 
Considering a SQL statement below:
{code:java}
CREATE TABLE source1 (
`id` BIGINT,
`item` STRING,
`price` DOUBLE,
`ts` STRING,
`rowtime` AS TO_TIMESTAMP(`ts`),
WATERMARK FOR rowtime AS rowtime - INTERVAL '5' SECOND) 
WITH (
'connector' = 'datagen'
);

SELECT `window_start`, `window_end`, `window_time`, COUNT(*) FROM (
SELECT `window_start`, `window_end`, `window_time`, `item`, SUM(`price`) AS 
`price_sum` FROM
TABLE (TUMBLE(TABLE source1, DESCRIPTOR(`rowtime`), INTERVAL '1' 
MINUTES))
GROUP BY `window_start`, `window_end`, `window_time`, `item`) 
GROUP BY `window_start`, `window_end`, `window_time`; /* Use time attributes 
defined in the previous window aggregation as grouping keys */{code}
which should be a group aggregation after a windowed aggregation, but the 
planner is interpreting the latter aggregation as a GroupWindowAggregation:
{code:java}
== Optimized Physical Plan ==
Calc(select=[window_start, window_end, window_time, EXPR$3])
+- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[1 min])], 
select=[COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, end('w$) AS 
window_end, rowtime('w$) AS window_time])
   +- Exchange(distribution=[single])
      +- LocalWindowAggregate(window=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[1 min])], select=[COUNT(*) AS count1$0, 
slice_end('w$) AS $window_end])
         +- Calc(select=[window_start, window_end, window_time])
            +- GlobalWindowAggregate(groupBy=[item], 
window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[item, start('w$) 
AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
               +- Exchange(distribution=[hash[item]])
                  +- LocalWindowAggregate(groupBy=[item], 
window=[TUMBLE(time_col=[rowtime], size=[1 min])], select=[item, slice_end('w$) 
AS $slice_end])
                     +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 5000:INTERVAL SECOND)])
                        +- Calc(select=[item, price, TO_TIMESTAMP(ts) AS 
rowtime])
                           +- TableSourceScan(table=[[default_catalog, 
default_database, source1]], fields=[id, item, price, ts]) {code}
The trick is that the latter group aggregation uses time attributes defined in 
the previous window aggregation as grouping keys.

 

  was:
Considering a SQL statement below:

 
{code:java}
SELECT `window_start`, `window_end`, `window_time`, COUNT(*) FROM (

SELECT `window_start`, `window_end`, `window_time`, `item`, SUM(`price`) AS 
`price_sum` FROM
TABLE (TUMBLE(TABLE source1, DESCRIPTOR(`rowtime`), INTERVAL '1' MINUTES))
GROUP BY `window_start`, `window_end`, `window_time`, `item`) 

GROUP BY `window_start`, `window_end`, `window_time`; {code}
which should be a group aggregation after a windowed aggregation, but the 
planner is interpreting the latter aggregation as a GroupWindowAggregation:
{code:java}
== Optimized Physical Plan ==
Calc(select=[window_start, window_end, window_time, EXPR$3])
+- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[1 min])], 
select=[COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, end('w$) AS 
window_end, rowtime('w$) AS window_time])
   +- Exchange(distribution=[single])
      +- LocalWindowAggregate(window=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[1 min])], select=[COUNT(*) AS count1$0, 
slice_end('w$) AS $window_end])
         +- Calc(select=[window_start, window_end, window_time])
            +- GlobalWindowAggregate(groupBy=[item], 
window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[item, start('w$) 
AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
               +- Exchange(distribution=[hash[item]])
                  +- LocalWindowAggregate(groupBy=[item], 
window=[TUMBLE(time_col=[rowtime], size=[1 min])], select=[item, slice_end('w$) 
AS $slice_end])
                     +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 5000:INTERVAL SECOND)])
                        +- Calc(select=[item, price, TO_TIMESTAMP(ts) AS 
rowtime])
                           +- TableSourceScan(table=[[default_catalog, 
default_database, source1]], fields=[id, item, price, ts]) {code}
 


> Normal group by with time attributes after a window group by is interpreted 
> as GlobalWindowAggregate
> 
>
> Key: FLINK-32247
> URL: https://issues.apache.org/jira/browse/FLINK-32247
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.2, 1.18.0, 1.17.1
>Reporter: Qingsheng Ren
>

[jira] [Updated] (FLINK-32247) Normal group by with time attributes after a window group by is interpreted as GlobalWindowAggregate

2023-06-02 Thread Qingsheng Ren (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32247?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Qingsheng Ren updated FLINK-32247:
--
Description: 
Considering a SQL statement below:

 
{code:java}
SELECT `window_start`, `window_end`, `window_time`, COUNT(*) FROM (

SELECT `window_start`, `window_end`, `window_time`, `item`, SUM(`price`) AS 
`price_sum` FROM
TABLE (TUMBLE(TABLE source1, DESCRIPTOR(`rowtime`), INTERVAL '1' MINUTES))
GROUP BY `window_start`, `window_end`, `window_time`, `item`) 

GROUP BY `window_start`, `window_end`, `window_time`; {code}
which should be a group aggregation after a windowed aggregation, but the 
planner is interpreting the latter aggregation as a GroupWindowAggregation:
{code:java}
== Optimized Physical Plan ==
Calc(select=[window_start, window_end, window_time, EXPR$3])
+- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[1 min])], 
select=[COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, end('w$) AS 
window_end, rowtime('w$) AS window_time])
   +- Exchange(distribution=[single])
      +- LocalWindowAggregate(window=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[1 min])], select=[COUNT(*) AS count1$0, 
slice_end('w$) AS $window_end])
         +- Calc(select=[window_start, window_end, window_time])
            +- GlobalWindowAggregate(groupBy=[item], 
window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[item, start('w$) 
AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
               +- Exchange(distribution=[hash[item]])
                  +- LocalWindowAggregate(groupBy=[item], 
window=[TUMBLE(time_col=[rowtime], size=[1 min])], select=[item, slice_end('w$) 
AS $slice_end])
                     +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 5000:INTERVAL SECOND)])
                        +- Calc(select=[item, price, TO_TIMESTAMP(ts) AS 
rowtime])
                           +- TableSourceScan(table=[[default_catalog, 
default_database, source1]], fields=[id, item, price, ts]) {code}
 

  was:
Considering a SQL statement below:

 
{code:java}
SELECT `window_start`, `window_end`, `window_time`, COUNT(*) FROM (SELECT 
`window_start`, `window_end`, `window_time`, `item`, SUM(`price`) AS 
`price_sum` FROM TABLE (TUMBLE(TABLE source1, DESCRIPTOR(`rowtime`), 
INTERVAL '1' MINUTES)) GROUP BY `window_start`, `window_end`, 
`window_time`, `item`) GROUP BY `window_start`, `window_end`, `window_time`; 
{code}
which should be a group aggregation after a windowed aggregation, but the 
planner is interpreting the latter aggregation as a GroupWindowAggregation:
{code:java}
== Optimized Physical Plan ==
Calc(select=[window_start, window_end, window_time, EXPR$3])
+- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[1 min])], 
select=[COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, end('w$) AS 
window_end, rowtime('w$) AS window_time])
   +- Exchange(distribution=[single])
      +- LocalWindowAggregate(window=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[1 min])], select=[COUNT(*) AS count1$0, 
slice_end('w$) AS $window_end])
         +- Calc(select=[window_start, window_end, window_time])
            +- GlobalWindowAggregate(groupBy=[item], 
window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[item, start('w$) 
AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
               +- Exchange(distribution=[hash[item]])
                  +- LocalWindowAggregate(groupBy=[item], 
window=[TUMBLE(time_col=[rowtime], size=[1 min])], select=[item, slice_end('w$) 
AS $slice_end])
                     +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 5000:INTERVAL SECOND)])
                        +- Calc(select=[item, price, TO_TIMESTAMP(ts) AS 
rowtime])
                           +- TableSourceScan(table=[[default_catalog, 
default_database, source1]], fields=[id, item, price, ts]) {code}
 


> Normal group by with time attributes after a window group by is interpreted 
> as GlobalWindowAggregate
> 
>
> Key: FLINK-32247
> URL: https://issues.apache.org/jira/browse/FLINK-32247
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.2, 1.18.0, 1.17.1
>Reporter: Qingsheng Ren
>Priority: Major
>
> Considering a SQL statement below:
>  
> {code:java}
> SELECT `window_start`, `window_end`, `window_time`, COUNT(*) FROM (
> SELECT `window_start`, `window_end`, `window_time`, `item`, SUM(`price`) 
> AS `price_sum` FROM
> TABLE (TUMBLE(TABLE source1, DESCRIPTOR(`rowtime`), INTERVAL '1' MINUTES))
> GROUP BY `window_start`, `window_end`, `window_time`, `item`) 
> GROUP BY `window_start`, `window_end`, `window_time`; 

[jira] [Created] (FLINK-32247) Normal group by with time attributes after a window group by is interpreted as GlobalWindowAggregate

2023-06-02 Thread Qingsheng Ren (Jira)
Qingsheng Ren created FLINK-32247:
-

 Summary: Normal group by with time attributes after a window group 
by is interpreted as GlobalWindowAggregate
 Key: FLINK-32247
 URL: https://issues.apache.org/jira/browse/FLINK-32247
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.17.1, 1.16.2, 1.18.0
Reporter: Qingsheng Ren


Considering a SQL statement below:

 
{code:java}
SELECT `window_start`, `window_end`, `window_time`, COUNT(*) FROM (SELECT 
`window_start`, `window_end`, `window_time`, `item`, SUM(`price`) AS 
`price_sum` FROM TABLE (TUMBLE(TABLE source1, DESCRIPTOR(`rowtime`), 
INTERVAL '1' MINUTES)) GROUP BY `window_start`, `window_end`, 
`window_time`, `item`) GROUP BY `window_start`, `window_end`, `window_time`; 
{code}
which should be a group aggregation after a windowed aggregation, but the 
planner is interpreting the latter aggregation as a GroupWindowAggregation:
{code:java}
== Optimized Physical Plan ==
Calc(select=[window_start, window_end, window_time, EXPR$3])
+- GlobalWindowAggregate(window=[TUMBLE(win_end=[$window_end], size=[1 min])], 
select=[COUNT(count1$0) AS EXPR$3, start('w$) AS window_start, end('w$) AS 
window_end, rowtime('w$) AS window_time])
   +- Exchange(distribution=[single])
      +- LocalWindowAggregate(window=[TUMBLE(win_start=[window_start], 
win_end=[window_end], size=[1 min])], select=[COUNT(*) AS count1$0, 
slice_end('w$) AS $window_end])
         +- Calc(select=[window_start, window_end, window_time])
            +- GlobalWindowAggregate(groupBy=[item], 
window=[TUMBLE(slice_end=[$slice_end], size=[1 min])], select=[item, start('w$) 
AS window_start, end('w$) AS window_end, rowtime('w$) AS window_time])
               +- Exchange(distribution=[hash[item]])
                  +- LocalWindowAggregate(groupBy=[item], 
window=[TUMBLE(time_col=[rowtime], size=[1 min])], select=[item, slice_end('w$) 
AS $slice_end])
                     +- WatermarkAssigner(rowtime=[rowtime], 
watermark=[-(rowtime, 5000:INTERVAL SECOND)])
                        +- Calc(select=[item, price, TO_TIMESTAMP(ts) AS 
rowtime])
                           +- TableSourceScan(table=[[default_catalog, 
default_database, source1]], fields=[id, item, price, ts]) {code}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32246) javax.management.InstanceAlreadyExistsException

2023-06-02 Thread Aitozi (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728658#comment-17728658
 ] 

Aitozi commented on FLINK-32246:


where do you run this sql ? sql-client ? Can you provide a demo case to 
reproduce this ?

> javax.management.InstanceAlreadyExistsException
> ---
>
> Key: FLINK-32246
> URL: https://issues.apache.org/jira/browse/FLINK-32246
> Project: Flink
>  Issue Type: Bug
>  Components: Connectors / Kafka
>Affects Versions: 1.15.2
>Reporter: jeff-zou
>Priority: Major
>
> Flink SQL throws an 
> exception(javax.management.InstanceAlreadyExistsException) when trying to 
> perform multiple sink operations on the same kafka source .
>  
> sql example:
> {code:java}
> create table kafka_source() with ('connector'='kafka');
> insert into sink_table1 select * from kafka_source;
> insert into sink_table2 select * from kafka_source; {code}
> The Exception as below:
> {code:java}
> javax.management.InstanceAlreadyExistsException: 
> kafka.admin.client:type=app-info,id=*
>  
> java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)
>  
> java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
>  
> java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
>  
> java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
>  
> java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
>  
> java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>  
> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
>  
> org.apache.kafka.clients.admin.KafkaAdminClient.(KafkaAdminClient.java:500)
>  
> org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:444)
>  org.apache.kafka.clients.admin.Admin.create(Admin.java:59)
>  org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39)
>  
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getKafkaAdminClient(KafkaSourceEnumerator.java:410)
>  
> org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.start(KafkaSourceEnumerator.java:151)
>  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$start$1(SourceCoordinator.java:209)
>  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$9(SourceCoordinator.java:406)
>  
> org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
>  
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>  
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>  
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  java.base/java.lang.Thread.run(Thread.java:829) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] swuferhong commented on a diff in pull request #22684: [FLINK-32220][table-runtime] Improving the adaptive local hash agg code to avoid get value from RowData repeatedly

2023-06-02 Thread via GitHub


swuferhong commented on code in PR #22684:
URL: https://github.com/apache/flink/pull/22684#discussion_r1214139725


##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala:
##
@@ -231,25 +230,23 @@ object ProjectionCodeGenerator {
   inputTerm: String,
   targetType: LogicalType,
   index: Int): GeneratedExpression = {
-val fieldType = getFieldTypes(inputType).get(index)
-val resultTypeTerm = primitiveTypeTermForType(fieldType)
-val defaultValue = primitiveDefaultValue(fieldType)
-val readCode = rowFieldReadAccess(index.toString, inputTerm, fieldType)
-val Seq(fieldTerm, nullTerm) =
-  ctx.addReusableLocalVariables((resultTypeTerm, "field"), ("boolean", 
"isNull"))
-
-val inputCode =
-  s"""
- |$nullTerm = $inputTerm.isNullAt($index);
- |$fieldTerm = $defaultValue;
- |if (!$nullTerm) {
- |  $fieldTerm = $readCode;
- |}
-   """.stripMargin.trim
-
-val expression = GeneratedExpression(fieldTerm, nullTerm, inputCode, 
fieldType)
+val fieldExpr = getReuseFieldExprForAggFunc(ctx, inputType, inputTerm, 
index)
 // Convert the projected value type to sum agg func target type.
-ScalarOperatorGens.generateCast(ctx, expression, targetType, true)
+ScalarOperatorGens.generateCast(ctx, fieldExpr, targetType, true)

Review Comment:
   nit: Removing the IDEA warning:  Changing to 
`ScalarOperatorGens.generateCast(ctx, fieldExpr, targetType, nullOnFailure = 
true)`



##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala:
##
@@ -231,25 +230,23 @@ object ProjectionCodeGenerator {
   inputTerm: String,
   targetType: LogicalType,
   index: Int): GeneratedExpression = {
-val fieldType = getFieldTypes(inputType).get(index)
-val resultTypeTerm = primitiveTypeTermForType(fieldType)
-val defaultValue = primitiveDefaultValue(fieldType)
-val readCode = rowFieldReadAccess(index.toString, inputTerm, fieldType)
-val Seq(fieldTerm, nullTerm) =
-  ctx.addReusableLocalVariables((resultTypeTerm, "field"), ("boolean", 
"isNull"))
-
-val inputCode =
-  s"""
- |$nullTerm = $inputTerm.isNullAt($index);
- |$fieldTerm = $defaultValue;
- |if (!$nullTerm) {
- |  $fieldTerm = $readCode;
- |}
-   """.stripMargin.trim
-
-val expression = GeneratedExpression(fieldTerm, nullTerm, inputCode, 
fieldType)
+val fieldExpr = getReuseFieldExprForAggFunc(ctx, inputType, inputTerm, 
index)
 // Convert the projected value type to sum agg func target type.
-ScalarOperatorGens.generateCast(ctx, expression, targetType, true)
+ScalarOperatorGens.generateCast(ctx, fieldExpr, targetType, true)
+  }
+
+  /** Get reuse field expr if it has been evaluated before for adaptive local 
hash aggregation. */
+  def getReuseFieldExprForAggFunc(
+  ctx: CodeGeneratorContext,
+  inputType: LogicalType,
+  inputTerm: String,
+  index: Int) = {
+// reuse the field access code if it has been evaluated before

Review Comment:
   ->  // Reuse the field access code if it has been evaluated before.



##
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProjectionCodeGenerator.scala:
##
@@ -231,25 +230,23 @@ object ProjectionCodeGenerator {
   inputTerm: String,
   targetType: LogicalType,
   index: Int): GeneratedExpression = {
-val fieldType = getFieldTypes(inputType).get(index)
-val resultTypeTerm = primitiveTypeTermForType(fieldType)
-val defaultValue = primitiveDefaultValue(fieldType)
-val readCode = rowFieldReadAccess(index.toString, inputTerm, fieldType)
-val Seq(fieldTerm, nullTerm) =
-  ctx.addReusableLocalVariables((resultTypeTerm, "field"), ("boolean", 
"isNull"))
-
-val inputCode =
-  s"""
- |$nullTerm = $inputTerm.isNullAt($index);
- |$fieldTerm = $defaultValue;
- |if (!$nullTerm) {
- |  $fieldTerm = $readCode;
- |}
-   """.stripMargin.trim
-
-val expression = GeneratedExpression(fieldTerm, nullTerm, inputCode, 
fieldType)
+val fieldExpr = getReuseFieldExprForAggFunc(ctx, inputType, inputTerm, 
index)
 // Convert the projected value type to sum agg func target type.
-ScalarOperatorGens.generateCast(ctx, expression, targetType, true)
+ScalarOperatorGens.generateCast(ctx, fieldExpr, targetType, true)
+  }
+
+  /** Get reuse field expr if it has been evaluated before for adaptive local 
hash aggregation. */
+  def getReuseFieldExprForAggFunc(
+  ctx: CodeGeneratorContext,
+  inputType: LogicalType,
+  inputTerm: String,
+  index: Int) = {

Review Comment:
   Add type annotation : ` index: Int): GeneratedExpression = {`



-- 
This is an automated message from the Apache Git Service.
To respond to 

[jira] [Resolved] (FLINK-31894) ExceptionHistory and REST API failure label integration

2023-06-02 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-31894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Morávek resolved FLINK-31894.
---
Fix Version/s: 1.18.0
   Resolution: Fixed

> ExceptionHistory and REST API failure label integration
> ---
>
> Key: FLINK-31894
> URL: https://issues.apache.org/jira/browse/FLINK-31894
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / REST
>Reporter: Panagiotis Garefalakis
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-31894) ExceptionHistory and REST API failure label integration

2023-06-02 Thread Jira


 [ 
https://issues.apache.org/jira/browse/FLINK-31894?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Morávek reassigned FLINK-31894:
-

Assignee: Panagiotis Garefalakis

> ExceptionHistory and REST API failure label integration
> ---
>
> Key: FLINK-31894
> URL: https://issues.apache.org/jira/browse/FLINK-31894
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / REST
>Reporter: Panagiotis Garefalakis
>Assignee: Panagiotis Garefalakis
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-31894) ExceptionHistory and REST API failure label integration

2023-06-02 Thread Jira


[ 
https://issues.apache.org/jira/browse/FLINK-31894?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17728640#comment-17728640
 ] 

David Morávek commented on FLINK-31894:
---

master: 28c20ad70c7100ae2358fa3f8936663f30811f78

> ExceptionHistory and REST API failure label integration
> ---
>
> Key: FLINK-31894
> URL: https://issues.apache.org/jira/browse/FLINK-31894
> Project: Flink
>  Issue Type: Sub-task
>  Components: Runtime / REST
>Reporter: Panagiotis Garefalakis
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] dmvk merged pull request #22643: [FLINK-31894][runtime] ExceptionHistory and REST API failure label integration

2023-06-02 Thread via GitHub


dmvk merged PR #22643:
URL: https://github.com/apache/flink/pull/22643


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zoltar9264 commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-02 Thread via GitHub


zoltar9264 commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1214124394


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##
@@ -395,18 +394,16 @@ public void release() {
 /** Previous snapshot with uploaded sst files. */
 protected static class PreviousSnapshot {
 
-@Nullable private final Map confirmedSstFiles;
+@Nullable private final Map 
confirmedSstFiles;
 
-protected PreviousSnapshot(@Nullable Map 
confirmedSstFiles) {
+protected PreviousSnapshot(
+@Nullable Map 
confirmedSstFiles) {
 this.confirmedSstFiles = confirmedSstFiles;
 }
 
 protected Optional getUploaded(StateHandleID 
stateHandleID) {
 if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-// we introduce a placeholder state handle, that is replaced 
with the
-// original from the shared state registry (created from a 
previous checkpoint)
-return Optional.of(
-new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));

Review Comment:
   > maybe using `Path` or `File` instead of `String` for `localPath`.
   
   I think localPath is just a file name, `Path` is a interface and not 
implement `Serializable`, `File` does implement `Serializable`, but it will 
also become a path string when it is finally serialized into the _metadata 
file. So I think `String` is clearer and enough ?
   
   About 'only use PlaceholderStreamStateHandle while the origin handle is 
ByteStreamStateHandle':
   In fact I want to suggest developers to do this in the doc of 
`PlaceholderStreamStateHandle` and `SharedStateRegistry`. Perhaps some bugs 
could have been avoided if developers followed this advice and only performed 
replacements on `PlaceholderStreamStateHandle` . I'm not sure if this is 
over-designed, if so please correct me.  :heart:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22701: [FLINK-32245][table-planner] Rename NonDeterministicTests to NonDeterministicTest and fix incorrect test initialization

2023-06-02 Thread via GitHub


flinkbot commented on PR #22701:
URL: https://github.com/apache/flink/pull/22701#issuecomment-1573405644

   
   ## CI report:
   
   * 016cbcb8431da5a91e0a690c395567e50e633e33 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32245) NonDeterministicTests #testTemporalFunctionsInBatchMode failure masked due to incorrect test initialization

2023-06-02 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32245?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32245:
---
Labels: pull-request-available  (was: )

> NonDeterministicTests #testTemporalFunctionsInBatchMode failure masked due to 
> incorrect test initialization
> ---
>
> Key: FLINK-32245
> URL: https://issues.apache.org/jira/browse/FLINK-32245
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.18.0, 1.17.1
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> The test case NonDeterministicTests #testTemporalFunctionsInBatchMode has 
> been consistently failing due to incorrect test initialization.
>  
> However, this failure has been masked because the test class name ends with 
> "Tests", causing the CI to skip the test case, which has been further 
> validated by searching through the historical logs of the CI.
> This issue needs to be addressed, and the test case should be executed to 
> ensure proper testing. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] LadyForest opened a new pull request, #22701: [FLINK-32245][table-planner] Rename NonDeterministicTests to NonDeterministicTest and fix incorrect test initialization

2023-06-02 Thread via GitHub


LadyForest opened a new pull request, #22701:
URL: https://github.com/apache/flink/pull/22701

   ## What is the purpose of the change
   
   This PR fixes the incorrect test initialization for `NonDeterministicTests` 
which uses `streamTableEnv` to test batch behavior, which fails of course but 
has not been revealed due to the JUnit cannot recognize test ends with "Tests".
   
   
   ## Brief change log
   
   - Rename `NonDeterministicTests` to `NonDeterministicTest` to make CI 
recognize the test.
   - Fix the initialization way for `ExpressionTestBase` on how to init table 
env, which should be based on  streaming or batch mode.
   
   
   ## Verifying this change
   
   
   This change is already covered by existing tests.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no**/ don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier

2023-06-02 Thread via GitHub


reswqa commented on code in PR #22652:
URL: https://github.com/apache/flink/pull/22652#discussion_r1214057806


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTierProducerAgent.java:
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+import java.util.function.Supplier;
+
+/** Test implementation for {@link TierProducerAgent}. */
+public class TestingTierProducerAgent implements TierProducerAgent {
+
+private Supplier tryStartNewSegmentReturnValueSupplier;

Review Comment:
   ```suggestion
   private Supplier tryStartNewSegmentSupplier;
   ```
   
   In general, for mock class, we named it's fields as follows:
   `MethodName+(Runnable/Supplier/Consumer/Function)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier

2023-06-02 Thread via GitHub


reswqa commented on code in PR #22652:
URL: https://github.com/apache/flink/pull/22652#discussion_r1214057806


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTierProducerAgent.java:
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+import java.util.function.Supplier;
+
+/** Test implementation for {@link TierProducerAgent}. */
+public class TestingTierProducerAgent implements TierProducerAgent {
+
+private Supplier tryStartNewSegmentReturnValueSupplier;

Review Comment:
   ```suggestion
   private Supplier tryStartNewSegmentSupplier;
   ```
   ```suggestion
   private Supplier tryStartNewSegmentSupplier;
   ```
   
   In general, for mock class, we named it's fields as follows:
   `MethodName+(Runnable/Supplier/Consumer/Function)`



##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTierProducerAgent.java:
##
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+
+import java.util.function.Supplier;
+
+/** Test implementation for {@link TierProducerAgent}. */
+public class TestingTierProducerAgent implements TierProducerAgent {
+
+private Supplier tryStartNewSegmentReturnValueSupplier;
+
+private Supplier tryWriteReturnValueSupplier;

Review Comment:
   ```suggestion
  private BiFunction 
tryWriterFunction;  
   ```



##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClientTest.java:
##
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.TestingBufferAccumulator;
+import 

[GitHub] [flink] TanYuxin-tyx commented on pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier

2023-06-02 Thread via GitHub


TanYuxin-tyx commented on PR #22652:
URL: https://github.com/apache/flink/pull/22652#issuecomment-1573354370

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier

2023-06-02 Thread via GitHub


TanYuxin-tyx commented on code in PR #22652:
URL: https://github.com/apache/flink/pull/22652#discussion_r1214057512


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java:
##
@@ -80,13 +107,23 @@ public void write(
 
 if (isBroadcast && !isBroadcastOnly) {
 for (int i = 0; i < numSubpartitions; ++i) {
-bufferAccumulator.receive(record.duplicate(), subpartitionId, 
dataType);
+// As the tiered storage subpartition ID is created only for 
broadcast records,
+// which are fewer than normal records, the performance impact 
of generating new
+// TieredStorageSubpartitionId objects is expected to be 
manageable. If the
+// performance is significantly affected, this logic will be 
optimized accordingly.
+bufferAccumulator.receive(
+record.duplicate(), new 
TieredStorageSubpartitionId(i), dataType);
 }
 } else {
 bufferAccumulator.receive(record, subpartitionId, dataType);
 }
 }
 
+public void setMetricStatisticsUpdater(
+Consumer 
metricStatisticsUpdater) {
+this.metricStatisticsUpdater = metricStatisticsUpdater;

Review Comment:
   Ah. I missed this. Fixed this now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-02 Thread via GitHub


rkhachatryan commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1214051048


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##
@@ -395,18 +394,16 @@ public void release() {
 /** Previous snapshot with uploaded sst files. */
 protected static class PreviousSnapshot {
 
-@Nullable private final Map confirmedSstFiles;
+@Nullable private final Map 
confirmedSstFiles;
 
-protected PreviousSnapshot(@Nullable Map 
confirmedSstFiles) {
+protected PreviousSnapshot(
+@Nullable Map 
confirmedSstFiles) {
 this.confirmedSstFiles = confirmedSstFiles;
 }
 
 protected Optional getUploaded(StateHandleID 
stateHandleID) {
 if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-// we introduce a placeholder state handle, that is replaced 
with the
-// original from the shared state registry (created from a 
previous checkpoint)
-return Optional.of(
-new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));

Review Comment:
   Thanks @zoltar9264 ,
   
   > Do you mean change IncrementalRemoteKeyedStateHandle like this ?
   
   Yes, I mean something like that, maybe using `Path` or `File` instead of 
`String` for `localPath`.
   
   > I suggest only use PlaceholderStreamStateHandle while the origin handle is 
ByteStreamStateHandle. This pr already implemented not use 
PlaceholderStreamStateHandle calculate checkpointed size, I want keep it.
   
   Can you share the motivation?
   I think that this will just add an additional `instanceof` and increase 
complexity. It would also easier to break if we add a new type of handle that 
needs replacement. Or am I missing something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] rkhachatryan commented on a diff in pull request #22669: [FLINK-29913] fix shared state be discarded by mistake when maxConcurrentCheckpoint>1

2023-06-02 Thread via GitHub


rkhachatryan commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1214051048


##
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##
@@ -395,18 +394,16 @@ public void release() {
 /** Previous snapshot with uploaded sst files. */
 protected static class PreviousSnapshot {
 
-@Nullable private final Map confirmedSstFiles;
+@Nullable private final Map 
confirmedSstFiles;
 
-protected PreviousSnapshot(@Nullable Map 
confirmedSstFiles) {
+protected PreviousSnapshot(
+@Nullable Map 
confirmedSstFiles) {
 this.confirmedSstFiles = confirmedSstFiles;
 }
 
 protected Optional getUploaded(StateHandleID 
stateHandleID) {
 if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-// we introduce a placeholder state handle, that is replaced 
with the
-// original from the shared state registry (created from a 
previous checkpoint)
-return Optional.of(
-new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));

Review Comment:
   > Do you mean change IncrementalRemoteKeyedStateHandle like this ?
   
   Yes, I mean something like that, maybe using `Path` or `File` instead of 
`String` for `localPath`.
   
   > I suggest only use PlaceholderStreamStateHandle while the origin handle is 
ByteStreamStateHandle. This pr already implemented not use 
PlaceholderStreamStateHandle calculate checkpointed size, I want keep it.
   
   Can you share the motivation?
   I think that this will just add an additional `instanceof` and increase 
complexity. It would also easier to break if we add a new type of handle that 
needs replacement. Or am I missing something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] reswqa commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier

2023-06-02 Thread via GitHub


reswqa commented on code in PR #22652:
URL: https://github.com/apache/flink/pull/22652#discussion_r1214045783


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java:
##
@@ -80,13 +107,23 @@ public void write(
 
 if (isBroadcast && !isBroadcastOnly) {
 for (int i = 0; i < numSubpartitions; ++i) {
-bufferAccumulator.receive(record.duplicate(), subpartitionId, 
dataType);
+// As the tiered storage subpartition ID is created only for 
broadcast records,
+// which are fewer than normal records, the performance impact 
of generating new
+// TieredStorageSubpartitionId objects is expected to be 
manageable. If the
+// performance is significantly affected, this logic will be 
optimized accordingly.
+bufferAccumulator.receive(
+record.duplicate(), new 
TieredStorageSubpartitionId(i), dataType);
 }
 } else {
 bufferAccumulator.receive(record, subpartitionId, dataType);
 }
 }
 
+public void setMetricStatisticsUpdater(
+Consumer 
metricStatisticsUpdater) {
+this.metricStatisticsUpdater = metricStatisticsUpdater;

Review Comment:
   It seems that this line is not updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier

2023-06-02 Thread via GitHub


TanYuxin-tyx commented on PR #22652:
URL: https://github.com/apache/flink/pull/22652#issuecomment-1573304176

   @reswqa  Thanks for helping review, I have addressed the comments. Could you 
help take a look again?
   Due to the significant code changes in the test code, I didn't use a fix-up 
mode to fix it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier

2023-06-02 Thread via GitHub


TanYuxin-tyx commented on code in PR #22652:
URL: https://github.com/apache/flink/pull/22652#discussion_r1214037314


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClientTest.java:
##
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link TieredStorageProducerClient}. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class TieredStorageProducerClientTest {
+
+private static final int NUM_TOTAL_BUFFERS = 1000;
+
+private static final int NETWORK_BUFFER_SIZE = 1024;
+
+private static final float NUM_BUFFERS_TRIGGER_FLUSH_RATIO = 0.6f;
+
+private static final int NUM_BUFFERS_IN_A_SEGMENT = 5;
+
+@Parameter public boolean isBroadcast;
+
+private NetworkBufferPool globalPool;
+
+@Parameters(name = "isBroadcast={0}")
+public static Collection parameters() {
+return Arrays.asList(false, true);
+}
+
+@BeforeEach
+void before() {
+globalPool = new NetworkBufferPool(NUM_TOTAL_BUFFERS, 
NETWORK_BUFFER_SIZE);
+}
+
+@AfterEach
+void after() {
+globalPool.destroy();
+}
+
+@TestTemplate
+void testWriteRecordsToEmptyStorageTiers() throws IOException {
+int numSubpartitions = 10;
+int numBuffersInPool = 10;
+int bufferSize = 1024;
+Random random = new Random();
+
+TieredStorageMemoryManagerImpl storageMemoryManager =
+createStorageMemoryManager(numBuffersInPool);
+BufferAccumulator bufferAccumulator =
+new HashBufferAccumulator(numSubpartitions, bufferSize, 
storageMemoryManager);
+
+TieredStorageProducerClient tieredStorageProducerClient =
+new TieredStorageProducerClient(
+numSubpartitions, false, bufferAccumulator, null, 
Collections.emptyList());
+
+assertThatThrownBy(
+() ->
+tieredStorageProducerClient.write(
+generateRandomData(bufferSize, random),
+new TieredStorageSubpartitionId(0),
+Buffer.DataType.DATA_BUFFER,
+isBroadcast))
+.isInstanceOf(RuntimeException.class)
+.hasMessageContaining("Failed to choose a storage tier");
+}
+
+@TestTemplate
+void testEmptyMetricUpdater() throws IOException {
+int numSubpartitions = 10;
+int numBuffersInPool = 10;
+int bufferSize = 1024;
+Random random = new Random();
+
+TieredStorageMemoryManagerImpl storageMemoryManager =
+createStorageMemoryManager(numBuffersInPool);
+BufferAccumulator bufferAccumulator =
+new 

[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier

2023-06-02 Thread via GitHub


TanYuxin-tyx commented on code in PR #22652:
URL: https://github.com/apache/flink/pull/22652#discussion_r1214036888


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClientTest.java:
##
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link TieredStorageProducerClient}. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class TieredStorageProducerClientTest {
+
+private static final int NUM_TOTAL_BUFFERS = 1000;
+
+private static final int NETWORK_BUFFER_SIZE = 1024;
+
+private static final float NUM_BUFFERS_TRIGGER_FLUSH_RATIO = 0.6f;
+
+private static final int NUM_BUFFERS_IN_A_SEGMENT = 5;
+
+@Parameter public boolean isBroadcast;
+
+private NetworkBufferPool globalPool;
+
+@Parameters(name = "isBroadcast={0}")
+public static Collection parameters() {
+return Arrays.asList(false, true);
+}
+
+@BeforeEach
+void before() {
+globalPool = new NetworkBufferPool(NUM_TOTAL_BUFFERS, 
NETWORK_BUFFER_SIZE);
+}
+
+@AfterEach
+void after() {
+globalPool.destroy();
+}
+
+@TestTemplate
+void testWriteRecordsToEmptyStorageTiers() throws IOException {
+int numSubpartitions = 10;
+int numBuffersInPool = 10;
+int bufferSize = 1024;
+Random random = new Random();
+
+TieredStorageMemoryManagerImpl storageMemoryManager =
+createStorageMemoryManager(numBuffersInPool);
+BufferAccumulator bufferAccumulator =
+new HashBufferAccumulator(numSubpartitions, bufferSize, 
storageMemoryManager);
+
+TieredStorageProducerClient tieredStorageProducerClient =
+new TieredStorageProducerClient(
+numSubpartitions, false, bufferAccumulator, null, 
Collections.emptyList());
+
+assertThatThrownBy(
+() ->
+tieredStorageProducerClient.write(
+generateRandomData(bufferSize, random),
+new TieredStorageSubpartitionId(0),
+Buffer.DataType.DATA_BUFFER,
+isBroadcast))
+.isInstanceOf(RuntimeException.class)
+.hasMessageContaining("Failed to choose a storage tier");
+}
+
+@TestTemplate
+void testEmptyMetricUpdater() throws IOException {
+int numSubpartitions = 10;
+int numBuffersInPool = 10;
+int bufferSize = 1024;
+Random random = new Random();
+
+TieredStorageMemoryManagerImpl storageMemoryManager =
+createStorageMemoryManager(numBuffersInPool);
+BufferAccumulator bufferAccumulator =
+new 

[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier

2023-06-02 Thread via GitHub


TanYuxin-tyx commented on code in PR #22652:
URL: https://github.com/apache/flink/pull/22652#discussion_r1214036360


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClientTest.java:
##
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link TieredStorageProducerClient}. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class TieredStorageProducerClientTest {
+
+private static final int NUM_TOTAL_BUFFERS = 1000;
+
+private static final int NETWORK_BUFFER_SIZE = 1024;
+
+private static final float NUM_BUFFERS_TRIGGER_FLUSH_RATIO = 0.6f;
+
+private static final int NUM_BUFFERS_IN_A_SEGMENT = 5;
+
+@Parameter public boolean isBroadcast;
+
+private NetworkBufferPool globalPool;
+
+@Parameters(name = "isBroadcast={0}")
+public static Collection parameters() {
+return Arrays.asList(false, true);
+}
+
+@BeforeEach
+void before() {
+globalPool = new NetworkBufferPool(NUM_TOTAL_BUFFERS, 
NETWORK_BUFFER_SIZE);
+}
+
+@AfterEach
+void after() {
+globalPool.destroy();
+}
+
+@TestTemplate
+void testWriteRecordsToEmptyStorageTiers() throws IOException {
+int numSubpartitions = 10;
+int numBuffersInPool = 10;
+int bufferSize = 1024;
+Random random = new Random();
+
+TieredStorageMemoryManagerImpl storageMemoryManager =
+createStorageMemoryManager(numBuffersInPool);
+BufferAccumulator bufferAccumulator =
+new HashBufferAccumulator(numSubpartitions, bufferSize, 
storageMemoryManager);
+
+TieredStorageProducerClient tieredStorageProducerClient =
+new TieredStorageProducerClient(
+numSubpartitions, false, bufferAccumulator, null, 
Collections.emptyList());
+
+assertThatThrownBy(
+() ->
+tieredStorageProducerClient.write(
+generateRandomData(bufferSize, random),
+new TieredStorageSubpartitionId(0),
+Buffer.DataType.DATA_BUFFER,
+isBroadcast))
+.isInstanceOf(RuntimeException.class)
+.hasMessageContaining("Failed to choose a storage tier");

Review Comment:
   Fixed. Used the `TestingBufferAccumulator `



##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClientTest.java:
##
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright 

[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier

2023-06-02 Thread via GitHub


TanYuxin-tyx commented on code in PR #22652:
URL: https://github.com/apache/flink/pull/22652#discussion_r1214035752


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClientTest.java:
##
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link TieredStorageProducerClient}. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class TieredStorageProducerClientTest {
+
+private static final int NUM_TOTAL_BUFFERS = 1000;
+
+private static final int NETWORK_BUFFER_SIZE = 1024;
+
+private static final float NUM_BUFFERS_TRIGGER_FLUSH_RATIO = 0.6f;
+
+private static final int NUM_BUFFERS_IN_A_SEGMENT = 5;
+
+@Parameter public boolean isBroadcast;
+
+private NetworkBufferPool globalPool;
+
+@Parameters(name = "isBroadcast={0}")
+public static Collection parameters() {
+return Arrays.asList(false, true);
+}
+
+@BeforeEach
+void before() {
+globalPool = new NetworkBufferPool(NUM_TOTAL_BUFFERS, 
NETWORK_BUFFER_SIZE);
+}
+
+@AfterEach
+void after() {
+globalPool.destroy();
+}
+
+@TestTemplate
+void testWriteRecordsToEmptyStorageTiers() throws IOException {
+int numSubpartitions = 10;
+int numBuffersInPool = 10;
+int bufferSize = 1024;
+Random random = new Random();
+
+TieredStorageMemoryManagerImpl storageMemoryManager =
+createStorageMemoryManager(numBuffersInPool);
+BufferAccumulator bufferAccumulator =
+new HashBufferAccumulator(numSubpartitions, bufferSize, 
storageMemoryManager);

Review Comment:
   Because we have mocked a `TesttingBufferAccumulatr`,  we don't need this 
anymore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier

2023-06-02 Thread via GitHub


TanYuxin-tyx commented on code in PR #22652:
URL: https://github.com/apache/flink/pull/22652#discussion_r1214034785


##
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClientTest.java:
##
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link TieredStorageProducerClient}. */
+@ExtendWith(ParameterizedTestExtension.class)
+public class TieredStorageProducerClientTest {
+
+private static final int NUM_TOTAL_BUFFERS = 1000;
+
+private static final int NETWORK_BUFFER_SIZE = 1024;
+
+private static final float NUM_BUFFERS_TRIGGER_FLUSH_RATIO = 0.6f;
+
+private static final int NUM_BUFFERS_IN_A_SEGMENT = 5;

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier

2023-06-02 Thread via GitHub


TanYuxin-tyx commented on code in PR #22652:
URL: https://github.com/apache/flink/pull/22652#discussion_r1214031770


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierProducerAgent.java:
##
@@ -35,9 +41,21 @@ public interface TierProducerAgent {
  */
 boolean tryStartNewSegment(TieredStorageSubpartitionId subpartitionId, int 
segmentId);
 
-/** Writes the finished {@link Buffer} to the consumer. */
-boolean write(TieredStorageSubpartitionId subpartitionId, Buffer 
finishedBuffer)
-throws IOException;
+/**
+ * Writes the finished {@link Buffer} to the consumer.
+ *
+ * Note that the tier must ensure that the buffer is written 
successfully without any
+ * exceptions, in order to guarantee that the buffer will be recycled. If 
this method throws an
+ * exception in the subsequent modifications, the caller should make sure 
that the buffer is
+ * recycled finally.
+ *
+ * @param subpartitionId the subpartition id that the buffer is writing to
+ * @param finishedBuffer the writing buffer
+ * @return return true if the buffer is written successfully, return false 
if the current
+ * segment can not store this buffer and the current segment is 
finished. When returning
+ * false, the agent should try start a new segment before writing the 
buffer.
+ */
+boolean write(TieredStorageSubpartitionId subpartitionId, Buffer 
finishedBuffer);

Review Comment:
   Renamed it.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java:
##
@@ -100,26 +137,95 @@ public void close() {
  */
 private void writeAccumulatedBuffers(
 TieredStorageSubpartitionId subpartitionId, List 
accumulatedBuffers) {
-try {
-for (Buffer finishedBuffer : accumulatedBuffers) {
-writeAccumulatedBuffer(subpartitionId, finishedBuffer);
+Iterator bufferIterator = accumulatedBuffers.iterator();
+
+int numWriteBytes = 0;
+int numWriteBuffers = 0;
+while (bufferIterator.hasNext()) {
+Buffer buffer = bufferIterator.next();
+try {
+writeAccumulatedBuffer(subpartitionId, buffer);
+} catch (IOException ioe) {
+buffer.recycleBuffer();
+while (bufferIterator.hasNext()) {
+bufferIterator.next().recycleBuffer();
+}
+ExceptionUtils.rethrow(ioe);
 }
-} catch (IOException e) {
-ExceptionUtils.rethrow(e);
+numWriteBuffers++;
+numWriteBytes += buffer.readableBytes();
 }
+updateMetricStatistics(numWriteBuffers, numWriteBytes);
 }
 
 /**
  * Write the accumulated buffer of this subpartitionId to an appropriate 
tier. After the tier is
  * decided, the buffer will be written to the selected tier.
  *
+ * Note that the method only throws an exception when choosing a 
storage tier, so the caller
+ * should ensure that the buffer is recycled when throwing an exception.
+ *
  * @param subpartitionId the subpartition identifier
  * @param accumulatedBuffer one accumulated buffer of this subpartition
  */
 private void writeAccumulatedBuffer(
 TieredStorageSubpartitionId subpartitionId, Buffer 
accumulatedBuffer)
 throws IOException {
-// TODO, Try to write the accumulated buffer to the appropriate tier. 
After the tier is
-// decided, then write the accumulated buffer to the tier.
+Buffer compressedBuffer = compressBufferIfPossible(accumulatedBuffer);
+
+if (currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()] 
== null) {
+chooseStorageTierToStartSegment(subpartitionId);
+}
+
+boolean isSuccess =
+
currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()].write(
+subpartitionId, compressedBuffer);
+if (!isSuccess) {
+chooseStorageTierToStartSegment(subpartitionId);
+isSuccess =
+
currentSubpartitionTierAgent[subpartitionId.getSubpartitionId()].write(
+subpartitionId, compressedBuffer);
+checkState(isSuccess, "Failed to write the first buffer to the new 
segment");
+}

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier

2023-06-02 Thread via GitHub


TanYuxin-tyx commented on code in PR #22652:
URL: https://github.com/apache/flink/pull/22652#discussion_r1214031417


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierProducerAgent.java:
##
@@ -35,9 +41,21 @@ public interface TierProducerAgent {
  */
 boolean tryStartNewSegment(TieredStorageSubpartitionId subpartitionId, int 
segmentId);
 
-/** Writes the finished {@link Buffer} to the consumer. */
-boolean write(TieredStorageSubpartitionId subpartitionId, Buffer 
finishedBuffer)
-throws IOException;
+/**
+ * Writes the finished {@link Buffer} to the consumer.
+ *
+ * Note that the tier must ensure that the buffer is written 
successfully without any
+ * exceptions, in order to guarantee that the buffer will be recycled. If 
this method throws an
+ * exception in the subsequent modifications, the caller should make sure 
that the buffer is
+ * recycled finally.

Review Comment:
   Fixed.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierProducerAgent.java:
##
@@ -21,9 +21,15 @@
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
 
-import java.io.IOException;
-
-/** The producer-side agent of a Tier. */
+/**
+ * The producer-side agent of a Tier.
+ *
+ * Note that when writing a buffer to a tier, the {@link TierProducerAgent} 
should first call
+ * {@code tryStartNewSegment} to start a new segment. The agent can then 
continue writing the buffer
+ * to the tier as long as the return value of {@code write} is true. If the 
return value of {@code
+ * write} is false, it indicates that the current segment can no longer store 
the buffer, and the
+ * agent should try to start a new segment before writing the buffer.
+ */
 public interface TierProducerAgent {

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier

2023-06-02 Thread via GitHub


TanYuxin-tyx commented on code in PR #22652:
URL: https://github.com/apache/flink/pull/22652#discussion_r1214031240


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java:
##
@@ -80,13 +107,23 @@ public void write(
 
 if (isBroadcast && !isBroadcastOnly) {
 for (int i = 0; i < numSubpartitions; ++i) {
-bufferAccumulator.receive(record.duplicate(), subpartitionId, 
dataType);
+// As the tiered storage subpartition ID is created only for 
broadcast records,
+// which are fewer than normal records, the performance impact 
of generating new
+// TieredStorageSubpartitionId objects is expected to be 
manageable. If the
+// performance is significantly affected, this logic will be 
optimized accordingly.
+bufferAccumulator.receive(
+record.duplicate(), new 
TieredStorageSubpartitionId(i), dataType);
 }
 } else {
 bufferAccumulator.receive(record, subpartitionId, dataType);
 }
 }
 
+public void setMetricStatisticsUpdater(
+Consumer 
metricStatisticsUpdater) {
+this.metricStatisticsUpdater = metricStatisticsUpdater;

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx commented on a diff in pull request #22652: [FLINK-31640][network] Write the accumulated buffers to the right storage tier

2023-06-02 Thread via GitHub


TanYuxin-tyx commented on code in PR #22652:
URL: https://github.com/apache/flink/pull/22652#discussion_r1214030893


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageProducerClient.java:
##
@@ -100,26 +137,95 @@ public void close() {
  */
 private void writeAccumulatedBuffers(
 TieredStorageSubpartitionId subpartitionId, List 
accumulatedBuffers) {
-try {
-for (Buffer finishedBuffer : accumulatedBuffers) {
-writeAccumulatedBuffer(subpartitionId, finishedBuffer);
+Iterator bufferIterator = accumulatedBuffers.iterator();
+
+int numWriteBytes = 0;
+int numWriteBuffers = 0;
+while (bufferIterator.hasNext()) {
+Buffer buffer = bufferIterator.next();

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 commented on pull request #22681: [FLINK-32199][runtime] Remove redundant metrics in TaskMetricStore after rescale down

2023-06-02 Thread via GitHub


wanglijie95 commented on PR #22681:
URL: https://github.com/apache/flink/pull/22681#issuecomment-1573280214

   @JunRuiLee This pr was successfully merged into master. But it had code 
conflicts when I cherry-pick it to release-1.17 and release-1.16, please solve 
the conflict and create PRs for this two branch, thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] wanglijie95 closed pull request #22681: [FLINK-32199][runtime] Remove redundant metrics in TaskMetricStore after rescale down

2023-06-02 Thread via GitHub


wanglijie95 closed pull request #22681: [FLINK-32199][runtime] Remove redundant 
metrics in TaskMetricStore after rescale down
URL: https://github.com/apache/flink/pull/22681


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] snuyanzin commented on pull request #22672: [BP-1.17][FLINK-32023][API / DataStream] Add config execution.buffer-…

2023-06-02 Thread via GitHub


snuyanzin commented on PR #22672:
URL: https://github.com/apache/flink/pull/22672#issuecomment-1573240299

   rebase to the latest commit in branch
   the reason of failure is https://issues.apache.org/jira/browse/FLINK-32231
   which was merged recentlly to this branch


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (FLINK-32246) javax.management.InstanceAlreadyExistsException

2023-06-02 Thread jeff-zou (Jira)
jeff-zou created FLINK-32246:


 Summary: javax.management.InstanceAlreadyExistsException
 Key: FLINK-32246
 URL: https://issues.apache.org/jira/browse/FLINK-32246
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Kafka
Affects Versions: 1.15.2
Reporter: jeff-zou


Flink SQL throws an exception(javax.management.InstanceAlreadyExistsException) 
when trying to perform multiple sink operations on the same kafka source .

 

sql example:
{code:java}
create table kafka_source() with ('connector'='kafka');
insert into sink_table1 select * from kafka_source;
insert into sink_table2 select * from kafka_source; {code}
The Exception as below:
{code:java}
javax.management.InstanceAlreadyExistsException: 
kafka.admin.client:type=app-info,id=*
 
java.management/com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436)
 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855)
 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955)
 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890)
 
java.management/com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320)
 
java.management/com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)
 
org.apache.kafka.clients.admin.KafkaAdminClient.(KafkaAdminClient.java:500)
 
org.apache.kafka.clients.admin.KafkaAdminClient.createInternal(KafkaAdminClient.java:444)
 org.apache.kafka.clients.admin.Admin.create(Admin.java:59)
 org.apache.kafka.clients.admin.AdminClient.create(AdminClient.java:39)
 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.getKafkaAdminClient(KafkaSourceEnumerator.java:410)
 
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator.start(KafkaSourceEnumerator.java:151)
 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$start$1(SourceCoordinator.java:209)
 
org.apache.flink.runtime.source.coordinator.SourceCoordinator.lambda$runInEventLoop$9(SourceCoordinator.java:406)
 
org.apache.flink.util.ThrowableCatchingRunnable.run(ThrowableCatchingRunnable.java:40)
 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
 java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
 java.base/java.lang.Thread.run(Thread.java:829) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] Myracle commented on pull request #22672: [BP-1.17][FLINK-32023][API / DataStream] Add config execution.buffer-…

2023-06-02 Thread via GitHub


Myracle commented on PR #22672:
URL: https://github.com/apache/flink/pull/22672#issuecomment-1573230300

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #604: [FLINK-32041] - Allow operator to manage leases when using watchNamespaces

2023-06-02 Thread via GitHub


gyfora commented on PR #604:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/604#issuecomment-1573210364

   @ottomata we generally try to follow a 2 month release cycle. Details can be 
found here: 
https://cwiki.apache.org/confluence/display/FLINK/Release+Schedule+and+Planning
   
   In your case I see no harm in using the helm chart from the `main` branch.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org