[GitHub] [flink-connector-aws] reswqa closed pull request #78: [hotfix][Connectors/Kinesis] Nightly CI always failed as archunit violations

2023-06-18 Thread via GitHub


reswqa closed pull request #78: [hotfix][Connectors/Kinesis] Nightly CI always 
failed as archunit violations
URL: https://github.com/apache/flink-connector-aws/pull/78


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-aws] reswqa commented on pull request #78: [hotfix][Connectors/Kinesis] Nightly CI always failed as archunit violations

2023-06-18 Thread via GitHub


reswqa commented on PR #78:
URL: 
https://github.com/apache/flink-connector-aws/pull/78#issuecomment-1596541116

   Closed as Chesnay's comment in 
[FLINK-31804](https://issues.apache.org/jira/browse/FLINK-31804): `Connectors 
should just skip the architecture tests when running against later versions.`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on pull request #621: [FLINK-32334] Also check if no taskmanager are running while waiting for cluster shutdown

2023-06-18 Thread via GitHub


gyfora commented on PR #621:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/621#issuecomment-1596513964

   The PR looks good but you have some check style validations before I can 
merge this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-31314) Hybrid Shuffle may not release result partitions when running multiple jobs in a session cluster

2023-06-18 Thread Yuxin Tan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31314?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuxin Tan closed FLINK-31314.
-
Resolution: Won't Fix

> Hybrid Shuffle may not release result partitions when running multiple jobs 
> in a session cluster
> 
>
> Key: FLINK-31314
> URL: https://issues.apache.org/jira/browse/FLINK-31314
> Project: Flink
>  Issue Type: Bug
>Reporter: Yuxin Tan
>Assignee: Yuxin Tan
>Priority: Major
>  Labels: pull-request-available
>
> When I test jobs in a session cluster, I found many result partitions 
> belonging to the finished jobs may not be released.
> I have reproduce the issue locally by running 
> HybridShuffleITCase#testHybridFullExchanges repeatedly.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] TanYuxin-tyx commented on pull request #22089: [FLINK-31314][runtime] Fix Hybrid Shuffle may not release result partitions belonging to the finished jobs

2023-06-18 Thread via GitHub


TanYuxin-tyx commented on PR #22089:
URL: https://github.com/apache/flink/pull/22089#issuecomment-1596483633

   Closing it.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] TanYuxin-tyx closed pull request #22089: [FLINK-31314][runtime] Fix Hybrid Shuffle may not release result partitions belonging to the finished jobs

2023-06-18 Thread via GitHub


TanYuxin-tyx closed pull request #22089: [FLINK-31314][runtime] Fix Hybrid 
Shuffle may not release result partitions belonging to the finished jobs
URL: https://github.com/apache/flink/pull/22089


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-32332) Jar files for catalog function are not listed correctly

2023-06-18 Thread Fang Yong (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fang Yong closed FLINK-32332.
-
Resolution: Won't Fix

> Jar files for catalog function are not listed correctly
> ---
>
> Key: FLINK-32332
> URL: https://issues.apache.org/jira/browse/FLINK-32332
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Gateway
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Priority: Major
>  Labels: pull-request-available
>
> `SHOW JARS` statement will list all jar files in the catalog, but the jar 
> files for catalog function will not be listed before it is used in the 
> specific session of gateway



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32332) Jar files for catalog function are not listed correctly

2023-06-18 Thread Fang Yong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17733997#comment-17733997
 ] 

Fang Yong commented on FLINK-32332:
---

Thanks [~jark] to clear this. If so, I will close this issue and the `show 
jars` problem can be fixed in https://issues.apache.org/jira/browse/FLINK-32309 
cc [~fsk119]

> Jar files for catalog function are not listed correctly
> ---
>
> Key: FLINK-32332
> URL: https://issues.apache.org/jira/browse/FLINK-32332
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Gateway
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Priority: Major
>  Labels: pull-request-available
>
> `SHOW JARS` statement will list all jar files in the catalog, but the jar 
> files for catalog function will not be listed before it is used in the 
> specific session of gateway



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] FangYongs closed pull request #22777: [FLINK-32332][sql-gateway] Show jars being used by function

2023-06-18 Thread via GitHub


FangYongs closed pull request #22777: [FLINK-32332][sql-gateway] Show jars 
being used by function
URL: https://github.com/apache/flink/pull/22777


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-connector-aws] Samrat002 commented on a diff in pull request #47: [FLINK-30481][FLIP-277] GlueCatalog Implementation

2023-06-18 Thread via GitHub


Samrat002 commented on code in PR #47:
URL: 
https://github.com/apache/flink-connector-aws/pull/47#discussion_r1233485463


##
flink-catalog-aws/flink-catalog-aws-glue/src/main/java/org/apache/flink/table/catalog/glue/GlueCatalogOptions.java:
##
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog.glue;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.glue.constants.GlueCatalogConstants;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/** A collection of {@link ConfigOption} which is used in GlueCatalog. */
+public class GlueCatalogOptions extends CommonCatalogOptions {
+
+public static final ConfigOption DEFAULT_DATABASE =
+ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
+.stringType()
+.defaultValue(GenericInMemoryCatalog.DEFAULT_DB);
+
+public static final ConfigOption INPUT_FORMAT =
+ConfigOptions.key(GlueCatalogConstants.TABLE_INPUT_FORMAT)
+.stringType()
+.noDefaultValue();
+
+public static final ConfigOption OUTPUT_FORMAT =
+ConfigOptions.key(GlueCatalogConstants.TABLE_OUTPUT_FORMAT)
+.stringType()
+.noDefaultValue();
+
+public static final ConfigOption GLUE_CATALOG_ENDPOINT =
+ConfigOptions.key(AWSConfigConstants.GLUE_CATALOG_ENDPOINT)
+.stringType()
+.noDefaultValue();
+
+public static final ConfigOption GLUE_CATALOG_ID =
+
ConfigOptions.key(AWSConfigConstants.GLUE_CATALOG_ID).stringType().noDefaultValue();
+
+public static final ConfigOption GLUE_ACCOUNT_ID =
+
ConfigOptions.key(AWSConfigConstants.GLUE_ACCOUNT_ID).stringType().noDefaultValue();
+public static final ConfigOption CREDENTIAL_PROVIDER =
+ConfigOptions.key(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER)
+.stringType()
+.noDefaultValue();
+
+public static final ConfigOption HTTP_CLIENT_TYPE =
+ConfigOptions.key(AWSConfigConstants.HTTP_CLIENT_TYPE)
+.stringType()
+.defaultValue(AWSConfigConstants.CLIENT_TYPE_APACHE);
+
+public static final ConfigOption REGION =
+
ConfigOptions.key(AWSConfigConstants.AWS_REGION).stringType().noDefaultValue();
+
+public static final ConfigOption HTTP_PROTOCOL_VERSION =

Review Comment:
   I think this config is required , it will help user to pass these config 
from the sql client and can connect to different glue data catalog with 
different instances of flink's gluecatalog 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22821: [FLINK-32351][table] Introduce base interfaces for call procedure

2023-06-18 Thread via GitHub


flinkbot commented on PR #22821:
URL: https://github.com/apache/flink/pull/22821#issuecomment-1596437586

   
   ## CI report:
   
   * b4bd39b644bf46533db9be1d5ab119cdd6d6a0b2 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32351) Introduce base interfaces for call procedure

2023-06-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32351?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32351:
---
Labels: pull-request-available  (was: )

> Introduce base interfaces for call procedure
> 
>
> Key: FLINK-32351
> URL: https://issues.apache.org/jira/browse/FLINK-32351
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: luoyuxia
>Assignee: luoyuxia
>Priority: Major
>  Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] luoyuxia opened a new pull request, #22821: [FLINK-32351][table] Introduce base interfaces for call procedure

2023-06-18 Thread via GitHub


luoyuxia opened a new pull request, #22821:
URL: https://github.com/apache/flink/pull/22821

   
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   Please make sure both new and modified tests in this PR follows the 
conventions defined in our code quality guide: 
https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] flinkbot commented on pull request #22820: [FLINK-32374][table-planner] Fix the issue that ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for overwriting

2023-06-18 Thread via GitHub


flinkbot commented on PR #22820:
URL: https://github.com/apache/flink/pull/22820#issuecomment-1596418228

   
   ## CI report:
   
   * f44931f66bdfc9ec6e141f4185982980038b445c UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] LadyForest commented on pull request #22818: [FLINK-31956][table] Extend COMPILE AND EXECUTE PLAN statement to read/write from/to Flink FileSystem

2023-06-18 Thread via GitHub


LadyForest commented on PR #22818:
URL: https://github.com/apache/flink/pull/22818#issuecomment-1596414747

   Should rebase master after https://github.com/apache/flink/pull/22820 merge


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-32374) ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for overwriting

2023-06-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32374:
---
Labels: pull-request-available  (was: )

> ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for 
> overwriting
> --
>
> Key: FLINK-32374
> URL: https://issues.apache.org/jira/browse/FLINK-32374
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.17.0, 1.16.1, 1.16.2, 1.18.0, 1.17.1
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> If the existing JSON plan is not truncated when overwriting, and the newly 
> generated JSON plan contents are shorter than the previous JSON plan content, 
> the plan be an invalid JSON.
> h4. How to reproduce
> {code:sql}
> Flink SQL> create table debug_sink (f0 int, f1 string) with ('connector' = 
> 'blackhole');
> [INFO] Execute statement succeed.
> Flink SQL> create table dummy_source (f0 int, f1 int, f2 string, f3 string) 
> with ('connector' = 'datagen');
> [INFO] Execute statement succeed.
> Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink 
> select if(f0 > f1, f0, f1) as f0, concat(f2, f3) as f1 from dummy_source;
> [INFO] Execute statement succeed.
> Flink SQL> set 'table.plan.force-recompile' = 'true';
> [INFO] Execute statement succeed.
> Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink 
> select * from (values (2, 'bye')) T (id, message);
> [INFO] Execute statement succeed.
> {code}
> cat -n debug.json, and check L#67
> {code:json}
>  1{
>  2  "flinkVersion" : "1.17",
>  3  "nodes" : [ {
>  4"id" : 15,
>  5"type" : "stream-exec-values_1",
>  6"tuples" : [ [ {
>  7  "kind" : "LITERAL",
>  8  "value" : "2",
>  9  "type" : "INT NOT NULL"
> 10}, {
> 11  "kind" : "LITERAL",
> 12  "value" : "bye",
> 13  "type" : "CHAR(3) NOT NULL"
> 14} ] ],
> 15"outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3) 
> NOT NULL>",
> 16"description" : "Values(tuples=[[{ 2, _UTF-16LE'bye' }]])",
> 17"inputProperties" : [ ]
> 18  }, {
> 19"id" : 16,
> 20"type" : "stream-exec-sink_1",
> 21"configuration" : {
> 22  "table.exec.sink.keyed-shuffle" : "AUTO",
> 23  "table.exec.sink.not-null-enforcer" : "ERROR",
> 24  "table.exec.sink.type-length-enforcer" : "IGNORE",
> 25  "table.exec.sink.upsert-materialize" : "AUTO"
> 26},
> 27"dynamicTableSink" : {
> 28  "table" : {
> 29"identifier" : 
> "`default_catalog`.`default_database`.`debug_sink`",
> 30"resolvedTable" : {
> 31  "schema" : {
> 32"columns" : [ {
> 33  "name" : "f0",
> 34  "dataType" : "INT"
> 35}, {
> 36  "name" : "f1",
> 37  "dataType" : "VARCHAR(2147483647)"
> 38} ],
> 39"watermarkSpecs" : [ ]
> 40  },
> 41  "partitionKeys" : [ ],
> 42  "options" : {
> 43"connector" : "blackhole"
> 44  }
> 45}
> 46  }
> 47},
> 48"inputChangelogMode" : [ "INSERT" ],
> 49"inputProperties" : [ {
> 50  "requiredDistribution" : {
> 51"type" : "UNKNOWN"
> 52  },
> 53  "damBehavior" : "PIPELINED",
> 54  "priority" : 0
> 55} ],
> 56"outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3) 
> NOT NULL>",
> 57"description" : 
> "Sink(table=[default_catalog.default_database.debug_sink], fields=[id, 
> message])"
> 58  } ],
> 59  "edges" : [ {
> 60"source" : 15,
> 61"target" : 16,
> 62"shuffle" : {
> 63  "type" : "FORWARD"
> 64},
> 65"shuffleMode" : "PIPELINED"
> 66  } ]
> 67} "$CONCAT$1",
> 68  "operands" : [ {
> 69"kind" : "INPUT_REF",
> 70"inputIndex" : 2,
> 71"type" : 

[GitHub] [flink] LadyForest opened a new pull request, #22820: [FLINK-32374][table-planner] Fix the issue that ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for overwriting

2023-06-18 Thread via GitHub


LadyForest opened a new pull request, #22820:
URL: https://github.com/apache/flink/pull/22820

   ## What is the purpose of the change
   
   This PR fixes the issue that `COMPILED PLAN` statement may generate invalid 
JSON when overwriting the existing JSON with a shorter version.
   
   
   ## Brief change log
   
   -  Add `TRUNCATE_EXISTING` to `ExecNodeGraphInternalPlan#writeToFile`
   
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as 
`CompilePlanITCase#testCompilePlanOverwrite`.
   
   - Roll back the changes made on src and run this case will reproduce the 
error. Apply the changes made on src and re-run the case to verify the fix.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): No
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: No
 - The serializers: No
 - The runtime per-record code paths (performance sensitive): No
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: No
 - The S3 file system connector: No
   
   ## Documentation
   
 - Does this pull request introduce a new feature? No
 - If yes, how is the feature documented? N.A.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] ruanhang1993 commented on a diff in pull request #22721: [FLINK-27243][table] Support SHOW PARTITIONS statement for partitioned table

2023-06-18 Thread via GitHub


ruanhang1993 commented on code in PR #22721:
URL: https://github.com/apache/flink/pull/22721#discussion_r1233467048


##
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ShowPartitionsOperation.java:
##
@@ -66,10 +66,6 @@ public CatalogPartitionSpec getPartitionSpec() {
 return partitionSpec;
 }
 
-public String getDefaultPartitionName() {

Review Comment:
   Why delete this getter here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-20628) Port RabbitMQ Sources to FLIP-27 API

2023-06-18 Thread RocMarshal (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17733990#comment-17733990
 ] 

RocMarshal commented on FLINK-20628:


Hi, [pscls|https://github.com/pscls] Thank you very much for the contribution.
I notice that this ticket has not been updated for a long time.
Would you like to continue advancing it ?
After the PR completed, FLINK-25380 will be introduced.
Looking forward to your opinion.
Thanks.

CC [~martijnvisser] [~monster#12] 

> Port RabbitMQ Sources to FLIP-27 API
> 
>
> Key: FLINK-20628
> URL: https://issues.apache.org/jira/browse/FLINK-20628
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors/ RabbitMQ
>Reporter: Jan Westphal
>Priority: Minor
>  Labels: auto-deprioritized-major, pull-request-available
> Fix For: 1.12.0
>
>
> *Structure*
> The new RabbitMQ Source will have three components:
>  * RabbitMQ enumerator that receives one RabbitMQ Channel Config.
>  * RabbitMQ splits contain the RabbitMQ Channel Config
>  * RabbitMQ Readers which subscribe to the same RabbitMQ channel and receive 
> the messages (automatically load balanced by RabbitMQ).
> *Checkpointing Enumerators*
> The enumerator only needs to checkpoint the RabbitMQ channel config since the 
> continuous discovery of new unread/unhandled messages is taken care of by the 
> subscribed RabbitMQ readers and RabbitMQ itself.
> *Checkpointing Readers*
> The new RabbitMQ Source needs to ensure that every reader can be checkpointed.
> Since RabbitMQ is non-persistent and cannot be read by offset, a combined 
> usage of checkpoints and message acknowledgments is necessary. Until a 
> received message is checkpointed by a reader, it will stay in an 
> un-acknowledge state. As soon as the checkpoint is created, the messages from 
> the last checkpoint can be acknowledged as handled against RabbitMQ and thus 
> will be deleted only then. Messages need to be acknowledged one by one as 
> messages are handled by each SourceReader individually.
> When deserializing the messages we will make use of the implementation in the 
> existing RabbitMQ Source.
> *Message Delivery Guarantees* 
> Unacknowledged messages of a reader will be redelivered by RabbitMQ 
> automatically to other consumers of the same channel if the reader goes down.
>  
> This Source is going to only support at-least-once as this is the default 
> RabbitMQ behavior and thus everything else would require changes to RabbitMQ 
> itself or would impair the idea of parallelizing SourceReaders.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32332) Jar files for catalog function are not listed correctly

2023-06-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32332:
---
Labels: pull-request-available  (was: )

> Jar files for catalog function are not listed correctly
> ---
>
> Key: FLINK-32332
> URL: https://issues.apache.org/jira/browse/FLINK-32332
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Gateway
>Affects Versions: 1.18.0
>Reporter: Fang Yong
>Priority: Major
>  Labels: pull-request-available
>
> `SHOW JARS` statement will list all jar files in the catalog, but the jar 
> files for catalog function will not be listed before it is used in the 
> specific session of gateway



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] flinkbot commented on pull request #22819: [FLINK-32370][client] Fix warn log in result fetcher when job is finished

2023-06-18 Thread via GitHub


flinkbot commented on PR #22819:
URL: https://github.com/apache/flink/pull/22819#issuecomment-1596392637

   
   ## CI report:
   
   * 41b9ce75fe99467f82f22563d064d209daa64e46 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Closed] (FLINK-31721) Move JobStatusHook to flink-core module

2023-06-18 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-31721?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia closed FLINK-31721.

Fix Version/s: 1.18.0
   Resolution: Fixed

master: fa94fb5a0271590c89edf6f3c0d4f274cf995e7d

> Move JobStatusHook to flink-core module
> ---
>
> Key: FLINK-31721
> URL: https://issues.apache.org/jira/browse/FLINK-31721
> Project: Flink
>  Issue Type: Sub-task
>  Components: API / Core
>Reporter: tartarus
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
>
> Flink Sql needs to use JobStatusHook mechanism to implement atomic CTAS 
> semantics, but the Table part module can't access flink-runtime module, so we 
> need to move JobStatusHook to flink-core module



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-32370) JDBC SQl gateway e2e test is unstable

2023-06-18 Thread ASF GitHub Bot (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-32370:
---
Labels: pull-request-available  (was: )

> JDBC SQl gateway e2e test is unstable
> -
>
> Key: FLINK-32370
> URL: https://issues.apache.org/jira/browse/FLINK-32370
> Project: Flink
>  Issue Type: Technical Debt
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.18.0
>
> Attachments: flink-vsts-sql-gateway-0-fv-az75-650.log, 
> flink-vsts-standalonesession-0-fv-az75-650.log, 
> flink-vsts-taskexecutor-0-fv-az75-650.log
>
>
> The client is failing while trying to collect data when the job already 
> finished on the cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [flink] FangYongs opened a new pull request, #22819: [FLINK-32370][client] Fix warn log in result fetcher when job is finished

2023-06-18 Thread via GitHub


FangYongs opened a new pull request, #22819:
URL: https://github.com/apache/flink/pull/22819

   ## What is the purpose of the change
   
   This PR aims to fix warn log in result fetcher when job is finished which 
should log debug log in fetcher.
   
   ## Brief change log
 - Use `ExceptionUtils.findThrowableWithMessage` to check exception in 
fetcher instead of `ExceptionUtils.findThrowable`
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - Added `RestClusterClientTest.testSendCoordinationRequestException` to 
check exception message
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no) no
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no) no
 - The serializers: (yes / no / don't know) no
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know) no
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know) 
no
 - The S3 file system connector: (yes / no / don't know) no
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no) no
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] fredia commented on pull request #22799: [FLINK-25872][state/changelog] Hotfix! Fix the description of Changel…

2023-06-18 Thread via GitHub


fredia commented on PR #22799:
URL: https://github.com/apache/flink/pull/22799#issuecomment-1596386751

   @reswqa Thanks for the review and suggestion, I amended the commit message 
as suggested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Reopened] (FLINK-32361) error after replace dependent jar file

2023-06-18 Thread Spongebob (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Spongebob reopened FLINK-32361:
---

Hi [~Adhip] yes I think this is a bug so I report this issue here.

> error after replace dependent jar file
> --
>
> Key: FLINK-32361
> URL: https://issues.apache.org/jira/browse/FLINK-32361
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.5
>Reporter: Spongebob
>Priority: Major
>
> in the standalone session mode. I have one dependent jar file named 'A.jar' 
> in the folder `lib1`, so I submit my app via command `flink run -C 
> file:///lib1/A.jar -c Application ./myApp.jar`.  well it runs normally. 
> And, I have the same jar file named 'A.jar' in the folder `lib2` also which 
> was copied from `lib1`. then I delete A.jar in `lib1`, copy the same jar from 
> `lib2` to `lib1`, re-submit the application. Finally I would get an 
> ClassNotFoundException which class refer to A.jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-32361) error after replace dependent jar file

2023-06-18 Thread Spongebob (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17733980#comment-17733980
 ] 

Spongebob edited comment on FLINK-32361 at 6/19/23 1:57 AM:


Hi [~mapohl] yes I think this is a bug so I report this issue here.


was (Author: spongebobz):
Hi [~Adhip] yes I think this is a bug so I report this issue here.

> error after replace dependent jar file
> --
>
> Key: FLINK-32361
> URL: https://issues.apache.org/jira/browse/FLINK-32361
> Project: Flink
>  Issue Type: Bug
>  Components: API / Core
>Affects Versions: 1.14.5
>Reporter: Spongebob
>Priority: Major
>
> in the standalone session mode. I have one dependent jar file named 'A.jar' 
> in the folder `lib1`, so I submit my app via command `flink run -C 
> file:///lib1/A.jar -c Application ./myApp.jar`.  well it runs normally. 
> And, I have the same jar file named 'A.jar' in the folder `lib2` also which 
> was copied from `lib1`. then I delete A.jar in `lib1`, copy the same jar from 
> `lib2` to `lib1`, re-submit the application. Finally I would get an 
> ClassNotFoundException which class refer to A.jar.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (FLINK-32374) ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for overwriting

2023-06-18 Thread luoyuxia (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

luoyuxia reassigned FLINK-32374:


Assignee: Jane Chan

> ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for 
> overwriting
> --
>
> Key: FLINK-32374
> URL: https://issues.apache.org/jira/browse/FLINK-32374
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.16.0, 1.17.0, 1.16.1, 1.16.2, 1.18.0, 1.17.1
>Reporter: Jane Chan
>Assignee: Jane Chan
>Priority: Major
> Fix For: 1.18.0
>
>
> If the existing JSON plan is not truncated when overwriting, and the newly 
> generated JSON plan contents are shorter than the previous JSON plan content, 
> the plan be an invalid JSON.
> h4. How to reproduce
> {code:sql}
> Flink SQL> create table debug_sink (f0 int, f1 string) with ('connector' = 
> 'blackhole');
> [INFO] Execute statement succeed.
> Flink SQL> create table dummy_source (f0 int, f1 int, f2 string, f3 string) 
> with ('connector' = 'datagen');
> [INFO] Execute statement succeed.
> Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink 
> select if(f0 > f1, f0, f1) as f0, concat(f2, f3) as f1 from dummy_source;
> [INFO] Execute statement succeed.
> Flink SQL> set 'table.plan.force-recompile' = 'true';
> [INFO] Execute statement succeed.
> Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink 
> select * from (values (2, 'bye')) T (id, message);
> [INFO] Execute statement succeed.
> {code}
> cat -n debug.json, and check L#67
> {code:json}
>  1{
>  2  "flinkVersion" : "1.17",
>  3  "nodes" : [ {
>  4"id" : 15,
>  5"type" : "stream-exec-values_1",
>  6"tuples" : [ [ {
>  7  "kind" : "LITERAL",
>  8  "value" : "2",
>  9  "type" : "INT NOT NULL"
> 10}, {
> 11  "kind" : "LITERAL",
> 12  "value" : "bye",
> 13  "type" : "CHAR(3) NOT NULL"
> 14} ] ],
> 15"outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3) 
> NOT NULL>",
> 16"description" : "Values(tuples=[[{ 2, _UTF-16LE'bye' }]])",
> 17"inputProperties" : [ ]
> 18  }, {
> 19"id" : 16,
> 20"type" : "stream-exec-sink_1",
> 21"configuration" : {
> 22  "table.exec.sink.keyed-shuffle" : "AUTO",
> 23  "table.exec.sink.not-null-enforcer" : "ERROR",
> 24  "table.exec.sink.type-length-enforcer" : "IGNORE",
> 25  "table.exec.sink.upsert-materialize" : "AUTO"
> 26},
> 27"dynamicTableSink" : {
> 28  "table" : {
> 29"identifier" : 
> "`default_catalog`.`default_database`.`debug_sink`",
> 30"resolvedTable" : {
> 31  "schema" : {
> 32"columns" : [ {
> 33  "name" : "f0",
> 34  "dataType" : "INT"
> 35}, {
> 36  "name" : "f1",
> 37  "dataType" : "VARCHAR(2147483647)"
> 38} ],
> 39"watermarkSpecs" : [ ]
> 40  },
> 41  "partitionKeys" : [ ],
> 42  "options" : {
> 43"connector" : "blackhole"
> 44  }
> 45}
> 46  }
> 47},
> 48"inputChangelogMode" : [ "INSERT" ],
> 49"inputProperties" : [ {
> 50  "requiredDistribution" : {
> 51"type" : "UNKNOWN"
> 52  },
> 53  "damBehavior" : "PIPELINED",
> 54  "priority" : 0
> 55} ],
> 56"outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3) 
> NOT NULL>",
> 57"description" : 
> "Sink(table=[default_catalog.default_database.debug_sink], fields=[id, 
> message])"
> 58  } ],
> 59  "edges" : [ {
> 60"source" : 15,
> 61"target" : 16,
> 62"shuffle" : {
> 63  "type" : "FORWARD"
> 64},
> 65"shuffleMode" : "PIPELINED"
> 66  } ]
> 67} "$CONCAT$1",
> 68  "operands" : [ {
> 69"kind" : "INPUT_REF",
> 70"inputIndex" : 2,
> 71"type" : "VARCHAR(2147483647)"
> 72  }, {
> 73

[jira] [Updated] (FLINK-32374) ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for overwriting

2023-06-18 Thread Jane Chan (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-32374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jane Chan updated FLINK-32374:
--
Description: 
If the existing JSON plan is not truncated when overwriting, and the newly 
generated JSON plan contents are shorter than the previous JSON plan content, 
the plan be an invalid JSON.
h4. How to reproduce
{code:sql}
Flink SQL> create table debug_sink (f0 int, f1 string) with ('connector' = 
'blackhole');
[INFO] Execute statement succeed.

Flink SQL> create table dummy_source (f0 int, f1 int, f2 string, f3 string) 
with ('connector' = 'datagen');
[INFO] Execute statement succeed.

Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink select 
if(f0 > f1, f0, f1) as f0, concat(f2, f3) as f1 from dummy_source;
[INFO] Execute statement succeed.

Flink SQL> set 'table.plan.force-recompile' = 'true';
[INFO] Execute statement succeed.

Flink SQL> compile plan '/foo/bar/debug.json' for insert into debug_sink select 
* from (values (2, 'bye')) T (id, message);
[INFO] Execute statement succeed.
{code}
cat -n debug.json, and check L#67
{code:json}
 1  {
 2"flinkVersion" : "1.17",
 3"nodes" : [ {
 4  "id" : 15,
 5  "type" : "stream-exec-values_1",
 6  "tuples" : [ [ {
 7"kind" : "LITERAL",
 8"value" : "2",
 9"type" : "INT NOT NULL"
10  }, {
11"kind" : "LITERAL",
12"value" : "bye",
13"type" : "CHAR(3) NOT NULL"
14  } ] ],
15  "outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3) NOT NULL>",
16  "description" : "Values(tuples=[[{ 2, _UTF-16LE'bye' }]])",
17  "inputProperties" : [ ]
18}, {
19  "id" : 16,
20  "type" : "stream-exec-sink_1",
21  "configuration" : {
22"table.exec.sink.keyed-shuffle" : "AUTO",
23"table.exec.sink.not-null-enforcer" : "ERROR",
24"table.exec.sink.type-length-enforcer" : "IGNORE",
25"table.exec.sink.upsert-materialize" : "AUTO"
26  },
27  "dynamicTableSink" : {
28"table" : {
29  "identifier" : 
"`default_catalog`.`default_database`.`debug_sink`",
30  "resolvedTable" : {
31"schema" : {
32  "columns" : [ {
33"name" : "f0",
34"dataType" : "INT"
35  }, {
36"name" : "f1",
37"dataType" : "VARCHAR(2147483647)"
38  } ],
39  "watermarkSpecs" : [ ]
40},
41"partitionKeys" : [ ],
42"options" : {
43  "connector" : "blackhole"
44}
45  }
46}
47  },
48  "inputChangelogMode" : [ "INSERT" ],
49  "inputProperties" : [ {
50"requiredDistribution" : {
51  "type" : "UNKNOWN"
52},
53"damBehavior" : "PIPELINED",
54"priority" : 0
55  } ],
56  "outputType" : "ROW<`id` INT NOT NULL, `message` CHAR(3) NOT NULL>",
57  "description" : 
"Sink(table=[default_catalog.default_database.debug_sink], fields=[id, 
message])"
58} ],
59"edges" : [ {
60  "source" : 15,
61  "target" : 16,
62  "shuffle" : {
63"type" : "FORWARD"
64  },
65  "shuffleMode" : "PIPELINED"
66} ]
67  } "$CONCAT$1",
68"operands" : [ {
69  "kind" : "INPUT_REF",
70  "inputIndex" : 2,
71  "type" : "VARCHAR(2147483647)"
72}, {
73  "kind" : "INPUT_REF",
74  "inputIndex" : 3,
75  "type" : "VARCHAR(2147483647)"
76} ],
77"type" : "VARCHAR(2147483647)"
78  } ],
79  "condition" : null,
80  "inputProperties" : [ {
81"requiredDistribution" : {
82  "type" : "UNKNOWN"
83},
84"damBehavior" : "PIPELINED",
85"priority" : 0
86  } ],
87  "outputType" : "ROW<`f0` INT, `f1` VARCHAR(2147483647)>",
88  "description" : "Calc(select=[IF((f0 > f1), f0, f1) AS f0, 
CONCAT(f2, f3) AS f1])"
89}, {
90  "id" : 14,
91  "type" : "stream-exec-sink_1",
92  "configuration" : {
93"table.exec.sink.keyed-shuffle" : "AUTO",
94"table.exec.sink.not-null-enforcer" : "ERROR",
95"table.exec.sink.type-length-enforcer" : "IGNORE",
96"table.exec.sink.upsert-materialize" : "AUTO"
97  },
98  "dynamicTableSink" : {
99"table" : {
   100  "identifier" : 
"`default_catalog`.`default_database`.`debug_sink`",
   101  "resolvedTable" : {
   102"schema" : {
   103  "columns" : [ {
   104  

[jira] [Created] (FLINK-32374) ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for overwriting

2023-06-18 Thread Jane Chan (Jira)
Jane Chan created FLINK-32374:
-

 Summary: ExecNodeGraphInternalPlan#writeToFile should support 
TRUNCATE_EXISTING for overwriting
 Key: FLINK-32374
 URL: https://issues.apache.org/jira/browse/FLINK-32374
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.17.1, 1.16.2, 1.16.1, 1.17.0, 1.16.0, 1.18.0
Reporter: Jane Chan
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32370) JDBC SQl gateway e2e test is unstable

2023-06-18 Thread Fang Yong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17733975#comment-17733975
 ] 

Fang Yong commented on FLINK-32370:
---

Thanks [~chesnay], I'll look at it and try to fix it :)

> JDBC SQl gateway e2e test is unstable
> -
>
> Key: FLINK-32370
> URL: https://issues.apache.org/jira/browse/FLINK-32370
> Project: Flink
>  Issue Type: Technical Debt
>Affects Versions: 1.18.0
>Reporter: Chesnay Schepler
>Priority: Major
> Fix For: 1.18.0
>
> Attachments: flink-vsts-sql-gateway-0-fv-az75-650.log, 
> flink-vsts-standalonesession-0-fv-az75-650.log, 
> flink-vsts-taskexecutor-0-fv-az75-650.log
>
>
> The client is failing while trying to collect data when the job already 
> finished on the cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-32319) flink can't the partition of network after restart

2023-06-18 Thread wgcn (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-32319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17733968#comment-17733968
 ] 

wgcn commented on FLINK-32319:
--

It work after I increase "taskmanager.memory.network",but I'm not sure why this 
is happening, as the Flink Task was functioning normally when it was initially 
started. After some time, there is a chance that this issue occurs upon 
restart, which has not been encountered in Flink 1.12 version,and I have 
calculatd the number of float buffers, buffer size, and the number of buffers 
for each channel. 600MB should be enough.Is this issue due to a new mechanism 
causing usage problems? or is it an unexpected issue?

> flink can't the partition of network after restart
> --
>
> Key: FLINK-32319
> URL: https://issues.apache.org/jira/browse/FLINK-32319
> Project: Flink
>  Issue Type: Bug
>  Components: Runtime / Network
>Affects Versions: 1.17.1
> Environment: centos 7.
> jdk 8.
> flink1.17.1 application mode on yarn 
> flink configuration :
> ```
> $internal.application.program-argssql2
> $internal.deployment.config-dir   /data/home/flink/wgcn/flink-1.17.1/conf
> $internal.yarn.log-config-file
> /data/home/flink/wgcn/flink-1.17.1/conf/log4j.properties
> akka.ask.timeout  100s
> blob.server.port  15402
> classloader.check-leaked-classloader  false
> classloader.resolve-order parent-first
> env.java.opts.taskmanager -XX:+UseG1GC -XX:MaxGCPauseMillis=1000 
> execution.attachedtrue
> execution.checkpointing.aligned-checkpoint-timeout10 min
> execution.checkpointing.externalized-checkpoint-retention 
> RETAIN_ON_CANCELLATION
> execution.checkpointing.interval  10 min
> execution.checkpointing.min-pause 10 min
> execution.savepoint-restore-mode  NO_CLAIM
> execution.savepoint.ignore-unclaimed-statefalse
> execution.shutdown-on-attached-exit   false
> execution.target  embedded
> high-availability zookeeper
> high-availability.cluster-id  application_1684133071014_7202676
> high-availability.storageDir  hdfs:///user/flink/recovery
> high-availability.zookeeper.path.root /flink
> high-availability.zookeeper.quorumx
> internal.cluster.execution-mode   NORMAL
> internal.io.tmpdirs.use-local-default true
> io.tmp.dirs   
> /data1/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data2/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data3/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data4/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data5/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data6/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data7/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data8/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data9/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data10/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data11/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676,/data12/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676
> jobmanager.execution.failover-strategyregion
> jobmanager.memory.heap.size   9261023232b
> jobmanager.memory.jvm-metaspace.size  268435456b
> jobmanager.memory.jvm-overhead.max1073741824b
> jobmanager.memory.jvm-overhead.min1073741824b
> jobmanager.memory.off-heap.size   134217728b
> jobmanager.memory.process.size10240m
> jobmanager.rpc.address
> jobmanager.rpc.port   31332
> metrics.reporter.promgateway.deleteOnShutdown true
> metrics.reporter.promgateway.factory.class
> org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory
> metrics.reporter.promgateway.hostUrl  :9091
> metrics.reporter.promgateway.interval 60 SECONDS
> metrics.reporter.promgateway.jobName  join_phase3_v7
> metrics.reporter.promgateway.randomJobNameSuffix  false
> parallelism.default   128
> pipeline.classpaths   
> pipeline.jars 
> file:/data2/nm-local-dir/usercache/flink/appcache/application_1684133071014_7202676/container_e16_1684133071014_7202676_01_02/frauddetection-0.1.jar
> rest.address  
> rest.bind-address x
> rest.bind-port5-50500
> rest.flamegraph.enabled   true
> restart-strategy.failure-rate.delay   10 s
> restart-strategy.failure-rate.failure-rate-interval   1 min
> restart-strategy.failure-rate.max-failures-per-interval   6
> restart-strategy.type exponential-delay
> state.backend.typefilesystem
> state.checkpoints.dir hdfs://xx/user/flink/checkpoints-data/wgcn
> state.checkpoints.num-retained3
> taskmanager.memory.managed.fraction   0
> 

[GitHub] [flink] snuyanzin commented on a diff in pull request #22558: [FLINK-28744][table] Upgrade Calcite to 1.31.0

2023-06-18 Thread via GitHub


snuyanzin commented on code in PR #22558:
URL: https://github.com/apache/flink/pull/22558#discussion_r1233372251


##
flink-table/flink-sql-jdbc-driver/pom.xml:
##
@@ -84,6 +84,12 @@
${project.version}
test

+   
+   com.google.guava
+   guava
+   ${guava.version}
+   provided
+   

Review Comment:
   yes, sort of.
   After closer look I realized that before 1.31.0 `FlinkSqlParserImpl` didn't 
depend on guava. And with 1.31.0 after these 2 commits
   
https://github.com/apache/calcite/commit/7c1e2746b94a50f0bb08571287acb5327fdc1b16
   
https://github.com/apache/calcite/commit/75d064f659a038371d84f9fdba7847e1631d1f1d
   it starts depending on guava and as a result starts requiring same 
dependency as for tests 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a diff in pull request #22798: [FLINK-32288][runtime] Improve the scheduling performance of AdaptiveBatchScheduler

2023-06-18 Thread via GitHub


zhuzhurk commented on code in PR #22798:
URL: https://github.com/apache/flink/pull/22798#discussion_r122596


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java:
##
@@ -65,6 +65,18 @@ public boolean isInputConsumable(
 return true;
 }
 
+/**
+ * This method is only used to filter consumable consumed partition group 
in the
+ * onExecutionStateChange method of {@link VertexwiseSchedulingStrategy}. 
For hybrid shuffle
+ * mode, the downstream vertices will be scheduled together with their 
upstreams. Therefore,
+ * only blocking consumed partition group needs to be considered here.
+ */
+@Override
+public boolean isConsumedPartitionGroupConsumable(
+final ConsumedPartitionGroup consumedPartitionGroup) {
+return consumedPartitionGroup.areAllPartitionsFinished();

Review Comment:
   Looks to me the logic here can be 
   ```
   if 
(consumedPartitionGroup.getResultPartitionType().canBePipelinedConsumed()) {
   return false;
   } else {
   return consumedPartitionGroup.areAllPartitionsFinished();
   }
   ```
   
   It returns `return false` because we only expect this method to be invoked 
to find groups that becomes consumable after a vertex finishes. Maybe we can 
name it as `isConsumableBasedOnFinishedProducers`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zhuzhurk commented on a diff in pull request #22798: [FLINK-32288][runtime] Improve the scheduling performance of AdaptiveBatchScheduler

2023-06-18 Thread via GitHub


zhuzhurk commented on code in PR #22798:
URL: https://github.com/apache/flink/pull/22798#discussion_r122596


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/DefaultInputConsumableDecider.java:
##
@@ -65,6 +65,18 @@ public boolean isInputConsumable(
 return true;
 }
 
+/**
+ * This method is only used to filter consumable consumed partition group 
in the
+ * onExecutionStateChange method of {@link VertexwiseSchedulingStrategy}. 
For hybrid shuffle
+ * mode, the downstream vertices will be scheduled together with their 
upstreams. Therefore,
+ * only blocking consumed partition group needs to be considered here.
+ */
+@Override
+public boolean isConsumedPartitionGroupConsumable(
+final ConsumedPartitionGroup consumedPartitionGroup) {
+return consumedPartitionGroup.areAllPartitionsFinished();

Review Comment:
   Looks to me the logic here should be 
   ```
   if 
(consumedPartitionGroup.getResultPartitionType().canBePipelinedConsumed()) {
   return false;
   } else {
   return consumedPartitionGroup.areAllPartitionsFinished();
   }
   ```
   
   It returns `return false` because we only expect this method to be invoked 
to find groups that becomes consumable after a vertex finishes, otherwise it 
can be `return true` because when using `DefaultInputConsumableDecider`, a 
`ConsumedPartitionGroup` is always consumable for a hybrid shuffle downstream 
task. However, `return false` would be helpful to improve the scheduling 
performance for hybrid shuffle.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] lsyldliu commented on pull request #22734: [FLINK-32277][table-runtime] Introduce the basic operator fusion codegen framework

2023-06-18 Thread via GitHub


lsyldliu commented on PR #22734:
URL: https://github.com/apache/flink/pull/22734#issuecomment-1596184352

   > The table module is failed, could you have a check?
   
   Let me take a look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] gaoyunhaii closed pull request #22344: [FLINK-31721][core] Move JobStatusHook to flink-core module

2023-06-18 Thread via GitHub


gaoyunhaii closed pull request #22344: [FLINK-31721][core] Move JobStatusHook 
to flink-core module
URL: https://github.com/apache/flink/pull/22344


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink] zhougit86 commented on pull request #22624: [FLINK-32132][table-planner] Cast function CODEGEN does not work as e…

2023-06-18 Thread via GitHub


zhougit86 commented on PR #22624:
URL: https://github.com/apache/flink/pull/22624#issuecomment-1596157634

   > @zhougit86 Thanks for contribution. But i'm still trying to understand 
what's the problem and why your changes can fix it. Could you please provide a 
reproducible case so that I can try to reproduce it in my local and to 
understand the problem.
   
   @luoyuxia could you please help review? thx


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (FLINK-28620) SQL Client doesn't properly print values of INTERVAL type

2023-06-18 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-28620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17733900#comment-17733900
 ] 

Sergey Nuyanzin commented on FLINK-28620:
-

[~jark] it looks like it was fixed at some  point of time
currently i can not reproduce it...
should we still have it opened?

> SQL Client doesn't properly print values of INTERVAL type
> -
>
> Key: FLINK-28620
> URL: https://issues.apache.org/jira/browse/FLINK-28620
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Client
>Reporter: Jark Wu
>Assignee: Jark Wu
>Priority: Major
> Fix For: 1.18.0
>
>
> The display of values of interval type should follow the CAST rules. However, 
> currently, SQL Client prints it using {{Period.toString()}} and 
> {{Duration.toString()}} which is not SQL standard compliant. 
> {code}
> Flink SQL> select interval '9-11' year to month;
> ++
> | EXPR$0 |
> ++
> |  P119M |
> ++
> 1 row in set
> Flink SQL> select cast(interval '9-11' year to month as varchar);
> ++
> | EXPR$0 |
> ++
> |  +9-11 |
> ++
> 1 row in set
> Flink SQL> select interval '2 1:2:3' day to second;
> +---+
> |EXPR$0 |
> +---+
> | PT49H2M3S |
> +---+
> 1 row in set
> Flink SQL> select cast(interval '2 1:2:3' day to second as varchar);
> +-+
> |  EXPR$0 |
> +-+
> | +2 01:02:03.000 |
> +-+
> 1 row in set
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-17224) Precision of TIME type does not work correctly

2023-06-18 Thread Sergey Nuyanzin (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-17224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17733897#comment-17733897
 ] 

Sergey Nuyanzin commented on FLINK-17224:
-

[~dwysakowicz], [~jark] 
I have it supportedincluding json, csv, avro, related time and agg functions in 
PR. Could you please have a look?

> Precision of TIME type does not work correctly
> --
>
> Key: FLINK-17224
> URL: https://issues.apache.org/jira/browse/FLINK-17224
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table SQL / API
>Reporter: Dawid Wysakowicz
>Assignee: Sergey Nuyanzin
>Priority: Critical
>  Labels: auto-unassigned, pull-request-available
>
> The support for precision in TIME type does not work correctly causing many 
> different often cryptic problems.
> Precision is completely ignored in {{FlinkTypeFactory:440-446}}:
> {code}
>   case TIME =>
> if (relDataType.getPrecision > 3) {
>   throw new TableException(
> s"TIME precision is not supported: ${relDataType.getPrecision}")
> }
> // blink runner support precision 3, but for consistent with flink 
> runner, we set to 0.
> new TimeType()
> {code}
> Example problem:
> {code}
> @Test
> public void testTimeScalarFunction() throws Exception {
>   int nanoOfDay = 10 * 1_000_000;
>   final List sourceData = Collections.singletonList(
>   Row.of(LocalTime.ofNanoOfDay(nanoOfDay))
>   );
>   final List sinkData = Arrays.asList(
>   Row.of(nanoOfDay)
>   );
>   TestCollectionTableFactory.reset();
>   TestCollectionTableFactory.initData(sourceData);
>   tEnv().sqlUpdate("CREATE TABLE SourceTable(s TIME(2)) WITH ('connector' 
> = 'COLLECTION')");
>   tEnv().sqlUpdate("CREATE TABLE SinkTable(s BIGINT) WITH ('connector' = 
> 'COLLECTION')");
>   tEnv().from("SourceTable")
>   .select(call(new TimeScalarFunction(), $("s")))
>   .insertInto("SinkTable");
>   tEnv().execute("Test Job");
>   assertThat(TestCollectionTableFactory.getResult(), equalTo(sinkData));
> }
> public static class TimeScalarFunction extends ScalarFunction {
>   public Long eval(@DataTypeHint("TIME(1)") LocalTime time) {
>   return time.toNanoOfDay();
>   }
> }
> {code}
> fails with:
> {code}
> org.apache.flink.table.api.ValidationException: Invalid function call:
> org$apache$flink$table$planner$runtime$stream$table$FunctionITCase$TimeScalarFunction$a19cd231ba10cbbc0b55ebeda49e2a77(TIME(0))
>   at 
> org.apache.flink.table.types.inference.TypeInferenceUtil.createInvalidCallException(TypeInferenceUtil.java:198)
>   at 
> org.apache.flink.table.planner.functions.inference.TypeInferenceReturnInference.inferReturnType(TypeInferenceReturnInference.java:73)
>   at 
> org.apache.calcite.sql.SqlOperator.inferReturnType(SqlOperator.java:486)
>   at 
> org.apache.calcite.rex.RexBuilder.deriveReturnType(RexBuilder.java:277)
>   at org.apache.calcite.tools.RelBuilder.call(RelBuilder.java:576)
>   at org.apache.calcite.tools.RelBuilder.call(RelBuilder.java:583)
>   at 
> org.apache.flink.table.planner.expressions.converter.FunctionDefinitionConvertRule.convert(FunctionDefinitionConvertRule.java:67)
>   at 
> org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:97)
>   at 
> org.apache.flink.table.planner.expressions.converter.ExpressionConverter.visit(ExpressionConverter.java:72)
>   at 
> org.apache.flink.table.expressions.CallExpression.accept(CallExpression.java:122)
>   at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.convertExprToRexNode(QueryOperationConverter.java:681)
>   at 
> org.apache.flink.table.planner.plan.QueryOperationConverter.access$800(QueryOperationConverter.java:128)
>   at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.lambda$convertToRexNodes$2(QueryOperationConverter.java:487)
>   at 
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>   at 
> org.apache.flink.table.planner.plan.QueryOperationConverter$SingleRelVisitor.convertToRexNodes(QueryOperationConverter.java:488)
>   at 
> 

[GitHub] [flink] xintongsong commented on a diff in pull request #22804: [FLINK-31644][network] Implement the disk tier producer for the tiered storage

2023-06-18 Thread via GitHub


xintongsong commented on code in PR #22804:
URL: https://github.com/apache/flink/pull/22804#discussion_r1233232708


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/file/SegmentNettyPayload.java:
##
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.file;
+
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyPayload;
+
+import java.util.List;
+
+/**
+ * The wrapper class {@link SegmentNettyPayload} for a segment, which holds 
all the {@link
+ * NettyPayload} buffers and the flag that mark whether this segment need to 
be finished.
+ */
+public class SegmentNettyPayload {

Review Comment:
   1. It doesn't make sense that a file-related class is named after netty. I 
see this class internally uses `NettyPayload`. Then it probably means we should 
make `NettyPaylaod` something more general.
   2. These two classes, `SegmentNettyPayload` and `SubpartitionNettyPayload`, 
only serve as the argument type for `PartitionFileWriter`. They barely makes 
any sense independently. I'd suggest to make them inner classes of 
`PartitionFileWriter`.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/file/PartitionFileWriter.java:
##
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.file;

Review Comment:
   I'd suggest `*.hybird.tiered.file`.



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskCacheManager.java:
##
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyPayload;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.file.PartitionFileWriter;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.file.SegmentNettyPayload;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.file.SubpartitionNettyPayload;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import 

[GitHub] [flink] xintongsong commented on a diff in pull request #22733: [FLINK-31642][network] Introduce the MemoryTierConsumerAgent

2023-06-18 Thread via GitHub


xintongsong commented on code in PR #22733:
URL: https://github.com/apache/flink/pull/22733#discussion_r1233228687


##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java:
##
@@ -43,8 +68,25 @@ public void start() {
 }
 
 public Optional getNextBuffer(int subpartitionId) {
-// TODO, the detailed logic will be completed when the memory tier is 
introduced..
-return Optional.empty();
+Optional buffer = Optional.empty();
+for (TierConsumerAgent tiereConsumerAgent : tierConsumerAgents) {
+buffer =
+tiereConsumerAgent.getNextBuffer(
+subpartitionId, 
subpartitionNextSegmentIds[subpartitionId]);
+if (buffer.isPresent()) {
+break;
+}
+}

Review Comment:
   It's weird we have to loop over all tier agents for reading each buffer. 
Shouldn't the storage client remember the current tier that each channel is 
reading from?



##
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java:
##
@@ -43,8 +68,25 @@ public void start() {
 }
 
 public Optional getNextBuffer(int subpartitionId) {
-// TODO, the detailed logic will be completed when the memory tier is 
introduced..
-return Optional.empty();
+Optional buffer = Optional.empty();
+for (TierConsumerAgent tiereConsumerAgent : tierConsumerAgents) {
+buffer =
+tiereConsumerAgent.getNextBuffer(
+subpartitionId, 
subpartitionNextSegmentIds[subpartitionId]);
+if (buffer.isPresent()) {
+break;
+}
+}

Review Comment:
   Also, the argument name seems not right. How could a consumer reads data 
from different subpartitions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org